Compare commits

..

4 Commits

Author SHA1 Message Date
AHMET YILMAZ
65902a4773 feat: Enhance stealth compatibility with new and legacy APIs, add configuration support 2025-07-16 17:41:47 +08:00
AHMET YILMAZ
5c13baf574 feat: Add stealth option to BrowserConfig for enhanced browser behavior 2025-07-15 15:48:23 +08:00
AHMET YILMAZ
d2759824ef fix: Update playwright-stealth to v2.0.0+ compatibility
Fixes #1273

- Replace deprecated stealth_async import with Stealth class
- Add stealth flag to BrowserConfig (default: true)
- Update async_crawler_strategy to use Stealth().apply_stealth_async()
- Remove obsolete StealthConfig from browser_manager
- Maintain backward compatibility with existing stealth functionality

This fixes compatibility issues with playwright-stealth v2.0.0+ where the API changed from stealth_async function to Stealth class.

test: Add comprehensive tests for playwright-stealth v2.0.0+ compatibility

- Test Stealth class import and instantiation
- Test apply_stealth_async method availability
- Test BrowserConfig stealth flag functionality
- Test stealth flag serialization
- Verify backward compatibility with existing stealth functionality
2025-07-15 15:31:15 +08:00
UncleCode
bde1bba6a2 docs: Add missing documentation pages to mkdocs.yml
- Added Adaptive Crawling to Core section
- Added URL Seeding to Core section
- Added Adaptive Strategies to Advanced section
2025-07-12 19:56:33 +08:00
6 changed files with 352 additions and 415 deletions

View File

@@ -12,6 +12,20 @@ from playwright.async_api import TimeoutError as PlaywrightTimeoutError
from io import BytesIO
from PIL import Image, ImageDraw, ImageFont
import hashlib
# Backward compatible stealth import
try:
# Try new tf-playwright-stealth API (Stealth class)
from playwright_stealth import Stealth
STEALTH_NEW_API = True
except ImportError:
try:
# Try old playwright-stealth API (stealth_async function)
from playwright_stealth import stealth_async
STEALTH_NEW_API = False
except ImportError:
# No stealth available
STEALTH_NEW_API = None
import uuid
from .js_snippet import load_js_script
from .models import AsyncCrawlResponse
@@ -31,6 +45,107 @@ from types import MappingProxyType
import contextlib
from functools import partial
# Add StealthConfig class for backward compatibility and new features
class StealthConfig:
"""
Configuration class for stealth settings that works with tf-playwright-stealth.
This maintains backward compatibility while supporting all tf-playwright-stealth features.
"""
def __init__(
self,
# Common settings
enabled: bool = True,
# Core tf-playwright-stealth parameters (matching the actual library)
chrome_app: bool = True,
chrome_csi: bool = True,
chrome_load_times: bool = True,
chrome_runtime: bool = False, # Note: library default is False
hairline: bool = True,
iframe_content_window: bool = True,
media_codecs: bool = True,
navigator_hardware_concurrency: bool = True,
navigator_languages: bool = True,
navigator_permissions: bool = True,
navigator_platform: bool = True,
navigator_plugins: bool = True,
navigator_user_agent: bool = True,
navigator_vendor: bool = True,
navigator_webdriver: bool = True,
sec_ch_ua: bool = True,
webgl_vendor: bool = True,
# Override parameters
navigator_languages_override: tuple = ("en-US", "en"),
navigator_platform_override: str = "Win32",
navigator_user_agent_override: str = None,
navigator_vendor_override: str = None,
sec_ch_ua_override: str = None,
webgl_renderer_override: str = None,
webgl_vendor_override: str = None,
# Advanced parameters
init_scripts_only: bool = False,
script_logging: bool = False,
# Legacy parameters for backward compatibility
webdriver: bool = None, # This will be mapped to navigator_webdriver
user_agent_override: bool = None, # This will be mapped to navigator_user_agent
window_outerdimensions: bool = None, # This parameter doesn't exist in tf-playwright-stealth
):
self.enabled = enabled
# Handle legacy parameter mapping for backward compatibility
if webdriver is not None:
navigator_webdriver = webdriver
if user_agent_override is not None:
navigator_user_agent = user_agent_override
# Store all stealth options for the Stealth class - filter out None values
self.stealth_options = {
k: v for k, v in {
'chrome_app': chrome_app,
'chrome_csi': chrome_csi,
'chrome_load_times': chrome_load_times,
'chrome_runtime': chrome_runtime,
'hairline': hairline,
'iframe_content_window': iframe_content_window,
'media_codecs': media_codecs,
'navigator_hardware_concurrency': navigator_hardware_concurrency,
'navigator_languages': navigator_languages,
'navigator_permissions': navigator_permissions,
'navigator_platform': navigator_platform,
'navigator_plugins': navigator_plugins,
'navigator_user_agent': navigator_user_agent,
'navigator_vendor': navigator_vendor,
'navigator_webdriver': navigator_webdriver,
'sec_ch_ua': sec_ch_ua,
'webgl_vendor': webgl_vendor,
'navigator_languages_override': navigator_languages_override,
'navigator_platform_override': navigator_platform_override,
'navigator_user_agent_override': navigator_user_agent_override,
'navigator_vendor_override': navigator_vendor_override,
'sec_ch_ua_override': sec_ch_ua_override,
'webgl_renderer_override': webgl_renderer_override,
'webgl_vendor_override': webgl_vendor_override,
'init_scripts_only': init_scripts_only,
'script_logging': script_logging,
}.items() if v is not None
}
@classmethod
def from_dict(cls, config_dict: dict) -> 'StealthConfig':
"""Create StealthConfig from dictionary for easy configuration"""
return cls(**config_dict)
def to_dict(self) -> dict:
"""Convert to dictionary for serialization"""
return {
'enabled': self.enabled,
**self.stealth_options
}
class AsyncCrawlerStrategy(ABC):
"""
Abstract base class for crawler strategies.
@@ -39,7 +154,7 @@ class AsyncCrawlerStrategy(ABC):
@abstractmethod
async def crawl(self, url: str, **kwargs) -> AsyncCrawlResponse:
pass # 4 + 3
pass # 4 + 3
class AsyncPlaywrightCrawlerStrategy(AsyncCrawlerStrategy):
"""
@@ -220,6 +335,79 @@ class AsyncPlaywrightCrawlerStrategy(AsyncCrawlerStrategy):
"""
self.headers = headers
async def _apply_stealth(self, page: Page, stealth_config: Optional[StealthConfig] = None):
"""
Apply stealth measures to the page with backward compatibility and enhanced configuration.
This method automatically applies stealth measures and now supports configuration
through StealthConfig while maintaining backward compatibility.
Currently supports:
- tf-playwright-stealth (Stealth class with extensive configuration)
- Old playwright-stealth v1.x (stealth_async function) - legacy support
Args:
page (Page): The Playwright page object
stealth_config (Optional[StealthConfig]): Configuration for stealth settings
"""
if STEALTH_NEW_API is None:
# No stealth library available - silently continue
if self.logger and hasattr(self.logger, 'debug'):
self.logger.debug(
message="playwright-stealth not available, skipping stealth measures",
tag="STEALTH"
)
return
# Use default config if none provided
if stealth_config is None:
stealth_config = StealthConfig()
# Skip if stealth is disabled
if not stealth_config.enabled:
if self.logger and hasattr(self.logger, 'debug'):
self.logger.debug(
message="Stealth measures disabled in configuration",
tag="STEALTH"
)
return
try:
if STEALTH_NEW_API:
# Use tf-playwright-stealth API with configuration support
# Filter out any invalid parameters that might cause issues
valid_options = {}
for key, value in stealth_config.stealth_options.items():
# Accept boolean parameters and specific string/tuple parameters
if isinstance(value, (bool, str, tuple)):
valid_options[key] = value
stealth = Stealth(**valid_options)
await stealth.apply_stealth_async(page)
config_info = f"with {len(valid_options)} options"
else:
# Use old API (v1.x) - configuration options are limited
await stealth_async(page)
config_info = "default (v1.x legacy)"
# Only log if logger is available and in debug mode
if self.logger and hasattr(self.logger, 'debug'):
api_version = "tf-playwright-stealth" if STEALTH_NEW_API else "v1.x"
self.logger.debug(
message="Applied stealth measures using {version} {config}",
tag="STEALTH",
params={"version": api_version, "config": config_info}
)
except Exception as e:
# Silently continue if stealth fails - don't break the crawling process
if self.logger:
self.logger.warning(
message="Stealth measures failed, continuing without stealth: {error}",
tag="STEALTH",
params={"error": str(e)}
)
async def smart_wait(self, page: Page, wait_for: str, timeout: float = 30000):
"""
Wait for a condition in a smart way. This functions works as below:
@@ -532,6 +720,24 @@ class AsyncPlaywrightCrawlerStrategy(AsyncCrawlerStrategy):
# Get page for session
page, context = await self.browser_manager.get_page(crawlerRunConfig=config)
# Apply stealth measures automatically (backward compatible) with optional config
# Check multiple possible locations for stealth config for flexibility
stealth_config = None
if hasattr(config, 'stealth_config') and config.stealth_config:
stealth_config = config.stealth_config
elif hasattr(config, 'stealth') and config.stealth:
# Alternative attribute name for backward compatibility
stealth_config = config.stealth if isinstance(config.stealth, StealthConfig) else StealthConfig.from_dict(config.stealth)
elif config.magic:
# Enable more aggressive stealth in magic mode
stealth_config = StealthConfig(
navigator_webdriver=False, # More aggressive stealth
webdriver=False,
chrome_app=False
)
await self._apply_stealth(page, stealth_config)
# await page.goto(URL)
# Add default cookie
@@ -933,7 +1139,6 @@ class AsyncPlaywrightCrawlerStrategy(AsyncCrawlerStrategy):
tag="VIEWPORT",
params={"error": str(e)},
)
# Handle full page scanning
if config.scan_full_page:
# await self._handle_full_page_scan(page, config.scroll_delay)
@@ -1837,8 +2042,6 @@ class AsyncPlaywrightCrawlerStrategy(AsyncCrawlerStrategy):
# }}
# }})();
# """
# )
# """ NEW VERSION:
# When {script} contains statements (e.g., const link = …; link.click();),
# this forms invalid JavaScript, causing Playwright execution error: SyntaxError: Unexpected token 'const'.

View File

@@ -14,24 +14,8 @@ import hashlib
from .js_snippet import load_js_script
from .config import DOWNLOAD_PAGE_TIMEOUT
from .async_configs import BrowserConfig, CrawlerRunConfig
from playwright_stealth import StealthConfig
from .utils import get_chromium_path
stealth_config = StealthConfig(
webdriver=True,
chrome_app=True,
chrome_csi=True,
chrome_load_times=True,
chrome_runtime=True,
navigator_languages=True,
navigator_plugins=True,
navigator_permissions=True,
webgl_vendor=True,
outerdimensions=True,
navigator_hardware_concurrency=True,
media_codecs=True,
)
BROWSER_DISABLE_OPTIONS = [
"--disable-background-networking",
"--disable-background-timer-throttling",

View File

@@ -54,27 +54,6 @@ def _get_memory_mb():
logger.warning(f"Could not get memory info: {e}")
return None
# --- Helper to sanitize JSON data ---
def sanitize_json_data(data):
"""
Recursively sanitize data to handle infinity and NaN values that are not JSON compliant.
"""
import math
if isinstance(data, dict):
return {k: sanitize_json_data(v) for k, v in data.items()}
elif isinstance(data, list):
return [sanitize_json_data(item) for item in data]
elif isinstance(data, float):
if math.isinf(data):
return "Infinity" if data > 0 else "-Infinity"
elif math.isnan(data):
return "NaN"
else:
return data
else:
return data
async def handle_llm_qa(
url: str,
@@ -392,10 +371,8 @@ async def stream_results(crawler: AsyncWebCrawler, results_gen: AsyncGenerator)
server_memory_mb = _get_memory_mb()
result_dict = result.model_dump()
result_dict['server_memory_mb'] = server_memory_mb
# Sanitize data to handle infinity values
sanitized_dict = sanitize_json_data(result_dict)
logger.info(f"Streaming result for {sanitized_dict.get('url', 'unknown')}")
data = json.dumps(sanitized_dict, default=datetime_handler) + "\n"
logger.info(f"Streaming result for {result_dict.get('url', 'unknown')}")
data = json.dumps(result_dict, default=datetime_handler) + "\n"
yield data.encode('utf-8')
except Exception as e:
logger.error(f"Serialization error: {e}")
@@ -469,7 +446,7 @@ async def handle_crawl_request(
return {
"success": True,
"results": [sanitize_json_data(result.model_dump()) for result in results],
"results": [result.model_dump() for result in results],
"server_processing_time_s": end_time - start_time,
"server_memory_delta_mb": mem_delta_mb,
"server_peak_memory_mb": peak_mem_mb

View File

@@ -331,27 +331,6 @@ async def generate_pdf(
return {"success": True, "pdf": base64.b64encode(pdf_data).decode()}
def sanitize_json_data(data):
"""
Recursively sanitize data to handle infinity and NaN values that are not JSON compliant.
"""
import math
if isinstance(data, dict):
return {k: sanitize_json_data(v) for k, v in data.items()}
elif isinstance(data, list):
return [sanitize_json_data(item) for item in data]
elif isinstance(data, float):
if math.isinf(data):
return "Infinity" if data > 0 else "-Infinity"
elif math.isnan(data):
return "NaN"
else:
return data
else:
return data
@app.post("/execute_js")
@limiter.limit(config["rate_limiting"]["default_limit"])
@mcp_tool("execute_js")
@@ -410,9 +389,7 @@ async def execute_js(
results = await crawler.arun(url=body.url, config=cfg)
# Return JSON-serializable dict of the first CrawlResult
data = results[0].model_dump()
# Sanitize data to handle infinity values
sanitized_data = sanitize_json_data(data)
return JSONResponse(sanitized_data)
return JSONResponse(data)
@app.get("/llm/{url:path}")

View File

@@ -0,0 +1,141 @@
#!/usr/bin/env python3
"""
Test suite for playwright-stealth backward compatibility.
Tests that stealth functionality works automatically without user configuration.
"""
import pytest
import asyncio
from unittest.mock import Mock, patch, MagicMock
class TestPlaywrightStealthCompatibility:
"""Test playwright-stealth backward compatibility with transparent operation"""
def test_api_detection_works(self):
"""Test that API detection works correctly"""
from crawl4ai.async_crawler_strategy import STEALTH_NEW_API
# The value depends on which version is installed, but should not be undefined
assert STEALTH_NEW_API is not None or STEALTH_NEW_API is False or STEALTH_NEW_API is None
@pytest.mark.asyncio
@patch('crawl4ai.async_crawler_strategy.STEALTH_NEW_API', True)
@patch('crawl4ai.async_crawler_strategy.Stealth')
async def test_apply_stealth_new_api(self, mock_stealth_class):
"""Test stealth application with new API works transparently"""
from crawl4ai.async_crawler_strategy import AsyncPlaywrightCrawlerStrategy
# Setup mock
mock_stealth_instance = Mock()
mock_stealth_instance.apply_stealth_async = Mock()
mock_stealth_class.return_value = mock_stealth_instance
# Create strategy instance
strategy = AsyncPlaywrightCrawlerStrategy()
# Mock page
mock_page = Mock()
# Test the method - should work transparently
await strategy._apply_stealth(mock_page)
# Verify new API was used
mock_stealth_class.assert_called_once()
mock_stealth_instance.apply_stealth_async.assert_called_once_with(mock_page)
@pytest.mark.asyncio
@patch('crawl4ai.async_crawler_strategy.STEALTH_NEW_API', False)
async def test_apply_stealth_legacy_api(self):
"""Test stealth application with legacy API works transparently"""
from crawl4ai.async_crawler_strategy import AsyncPlaywrightCrawlerStrategy
# Mock stealth_async function by setting it as a module attribute
mock_stealth_async = Mock()
mock_stealth_async.return_value = None
# Import the module to add the mock function
import crawl4ai.async_crawler_strategy
crawl4ai.async_crawler_strategy.stealth_async = mock_stealth_async
try:
# Create strategy instance
strategy = AsyncPlaywrightCrawlerStrategy()
# Mock page
mock_page = Mock()
# Test the method - should work transparently
await strategy._apply_stealth(mock_page)
# Verify legacy API was used
mock_stealth_async.assert_called_once_with(mock_page)
finally:
# Clean up
if hasattr(crawl4ai.async_crawler_strategy, 'stealth_async'):
delattr(crawl4ai.async_crawler_strategy, 'stealth_async')
@pytest.mark.asyncio
@patch('crawl4ai.async_crawler_strategy.STEALTH_NEW_API', None)
async def test_apply_stealth_no_library(self):
"""Test stealth application when no stealth library is available"""
from crawl4ai.async_crawler_strategy import AsyncPlaywrightCrawlerStrategy
# Create strategy instance
strategy = AsyncPlaywrightCrawlerStrategy()
# Mock page
mock_page = Mock()
# Test the method - should work transparently even without stealth
await strategy._apply_stealth(mock_page)
# Should complete without error even when no stealth is available
@pytest.mark.asyncio
@patch('crawl4ai.async_crawler_strategy.STEALTH_NEW_API', True)
@patch('crawl4ai.async_crawler_strategy.Stealth')
async def test_stealth_error_handling(self, mock_stealth_class):
"""Test that stealth errors are handled gracefully without breaking crawling"""
from crawl4ai.async_crawler_strategy import AsyncPlaywrightCrawlerStrategy
# Setup mock to raise an error
mock_stealth_instance = Mock()
mock_stealth_instance.apply_stealth_async = Mock(side_effect=Exception("Stealth failed"))
mock_stealth_class.return_value = mock_stealth_instance
# Create strategy instance
strategy = AsyncPlaywrightCrawlerStrategy()
# Mock page
mock_page = Mock()
# Test the method - should not raise an error, continue silently
await strategy._apply_stealth(mock_page)
# Should complete without raising the stealth error
def test_strategy_creation_without_config(self):
"""Test that strategy can be created without any stealth configuration"""
from crawl4ai.async_crawler_strategy import AsyncPlaywrightCrawlerStrategy
# Should work without any stealth-related parameters
strategy = AsyncPlaywrightCrawlerStrategy()
assert strategy is not None
assert hasattr(strategy, '_apply_stealth')
def test_browser_config_works_without_stealth_param(self):
"""Test that BrowserConfig works without stealth parameter"""
from crawl4ai.async_configs import BrowserConfig
# Should work without stealth parameter
config = BrowserConfig()
assert config is not None
# Should also work with other parameters
config = BrowserConfig(headless=False, browser_type="firefox")
assert config.headless == False
assert config.browser_type == "firefox"
if __name__ == "__main__":
pytest.main([__file__, "-v"])

View File

@@ -1,345 +0,0 @@
#!/usr/bin/env python3
"""
Simple API Test for Crawl4AI Docker Server v0.7.0
Uses only built-in Python modules to test all endpoints.
"""
import urllib.request
import urllib.parse
import json
import time
import sys
from typing import Dict, List, Optional
# Configuration
BASE_URL = "http://localhost:11234" # Change to your server URL
TEST_TIMEOUT = 30
class SimpleApiTester:
def __init__(self, base_url: str = BASE_URL):
self.base_url = base_url
self.token = None
self.results = []
def log(self, message: str):
print(f"[INFO] {message}")
def test_get_endpoint(self, endpoint: str) -> Dict:
"""Test a GET endpoint"""
url = f"{self.base_url}{endpoint}"
start_time = time.time()
try:
req = urllib.request.Request(url)
if self.token:
req.add_header('Authorization', f'Bearer {self.token}')
with urllib.request.urlopen(req, timeout=TEST_TIMEOUT) as response:
response_time = time.time() - start_time
status_code = response.getcode()
content = response.read().decode('utf-8')
# Try to parse JSON
try:
data = json.loads(content)
except:
data = {"raw_response": content[:200]}
return {
"endpoint": endpoint,
"method": "GET",
"status": "PASS" if status_code < 400 else "FAIL",
"status_code": status_code,
"response_time": response_time,
"data": data
}
except Exception as e:
response_time = time.time() - start_time
return {
"endpoint": endpoint,
"method": "GET",
"status": "FAIL",
"status_code": None,
"response_time": response_time,
"error": str(e)
}
def test_post_endpoint(self, endpoint: str, payload: Dict) -> Dict:
"""Test a POST endpoint"""
url = f"{self.base_url}{endpoint}"
start_time = time.time()
try:
data = json.dumps(payload).encode('utf-8')
req = urllib.request.Request(url, data=data, method='POST')
req.add_header('Content-Type', 'application/json')
if self.token:
req.add_header('Authorization', f'Bearer {self.token}')
with urllib.request.urlopen(req, timeout=TEST_TIMEOUT) as response:
response_time = time.time() - start_time
status_code = response.getcode()
content = response.read().decode('utf-8')
# Try to parse JSON
try:
data = json.loads(content)
except:
data = {"raw_response": content[:200]}
return {
"endpoint": endpoint,
"method": "POST",
"status": "PASS" if status_code < 400 else "FAIL",
"status_code": status_code,
"response_time": response_time,
"data": data
}
except Exception as e:
response_time = time.time() - start_time
return {
"endpoint": endpoint,
"method": "POST",
"status": "FAIL",
"status_code": None,
"response_time": response_time,
"error": str(e)
}
def print_result(self, result: Dict):
"""Print a formatted test result"""
status_color = {
"PASS": "",
"FAIL": "",
"SKIP": "⏭️"
}
print(f"{status_color[result['status']]} {result['method']} {result['endpoint']} "
f"| {result['response_time']:.3f}s | Status: {result['status_code'] or 'N/A'}")
if result['status'] == 'FAIL' and 'error' in result:
print(f" Error: {result['error']}")
self.results.append(result)
def run_all_tests(self):
"""Run all API tests"""
print("🚀 Starting Crawl4AI v0.7.0 API Test Suite")
print(f"📡 Testing server at: {self.base_url}")
print("=" * 60)
# # Test basic endpoints
# print("\n=== BASIC ENDPOINTS ===")
# # Health check
# result = self.test_get_endpoint("/health")
# self.print_result(result)
# # Schema endpoint
# result = self.test_get_endpoint("/schema")
# self.print_result(result)
# # Metrics endpoint
# result = self.test_get_endpoint("/metrics")
# self.print_result(result)
# # Root redirect
# result = self.test_get_endpoint("/")
# self.print_result(result)
# # Test authentication
# print("\n=== AUTHENTICATION ===")
# # Get token
# token_payload = {"email": "test@example.com"}
# result = self.test_post_endpoint("/token", token_payload)
# self.print_result(result)
# # Extract token if successful
# if result['status'] == 'PASS' and 'data' in result:
# token = result['data'].get('access_token')
# if token:
# self.token = token
# self.log(f"Successfully obtained auth token: {token[:20]}...")
# Test core APIs
print("\n=== CORE APIs ===")
test_url = "https://example.com"
# Test markdown endpoint
md_payload = {
"url": test_url,
"f": "fit",
"q": "test query",
"c": "0"
}
result = self.test_post_endpoint("/md", md_payload)
# print(result['data'].get('markdown', ''))
self.print_result(result)
# Test HTML endpoint
html_payload = {"url": test_url}
result = self.test_post_endpoint("/html", html_payload)
self.print_result(result)
# Test screenshot endpoint
screenshot_payload = {
"url": test_url,
"screenshot_wait_for": 2
}
result = self.test_post_endpoint("/screenshot", screenshot_payload)
self.print_result(result)
# Test PDF endpoint
pdf_payload = {"url": test_url}
result = self.test_post_endpoint("/pdf", pdf_payload)
self.print_result(result)
# Test JavaScript execution
js_payload = {
"url": test_url,
"scripts": ["(() => document.title)()"]
}
result = self.test_post_endpoint("/execute_js", js_payload)
self.print_result(result)
# Test crawl endpoint
crawl_payload = {
"urls": [test_url],
"browser_config": {},
"crawler_config": {}
}
result = self.test_post_endpoint("/crawl", crawl_payload)
self.print_result(result)
# Test config dump
config_payload = {"code": "CrawlerRunConfig()"}
result = self.test_post_endpoint("/config/dump", config_payload)
self.print_result(result)
# Test LLM endpoint
llm_endpoint = f"/llm/{test_url}?q=Extract%20main%20content"
result = self.test_get_endpoint(llm_endpoint)
self.print_result(result)
# Test ask endpoint
ask_endpoint = "/ask?context_type=all&query=crawl4ai&max_results=5"
result = self.test_get_endpoint(ask_endpoint)
print(result)
self.print_result(result)
# Test job APIs
print("\n=== JOB APIs ===")
# Test LLM job
llm_job_payload = {
"url": test_url,
"q": "Extract main content",
"cache": False
}
result = self.test_post_endpoint("/llm/job", llm_job_payload)
self.print_result(result)
# Test crawl job
crawl_job_payload = {
"urls": [test_url],
"browser_config": {},
"crawler_config": {}
}
result = self.test_post_endpoint("/crawl/job", crawl_job_payload)
self.print_result(result)
# Test MCP
print("\n=== MCP APIs ===")
# Test MCP schema
result = self.test_get_endpoint("/mcp/schema")
self.print_result(result)
# Test error handling
print("\n=== ERROR HANDLING ===")
# Test invalid URL
invalid_payload = {"url": "invalid-url", "f": "fit"}
result = self.test_post_endpoint("/md", invalid_payload)
self.print_result(result)
# Test invalid endpoint
result = self.test_get_endpoint("/nonexistent")
self.print_result(result)
# Print summary
self.print_summary()
def print_summary(self):
"""Print test results summary"""
print("\n" + "=" * 60)
print("📊 TEST RESULTS SUMMARY")
print("=" * 60)
total = len(self.results)
passed = sum(1 for r in self.results if r['status'] == 'PASS')
failed = sum(1 for r in self.results if r['status'] == 'FAIL')
print(f"Total Tests: {total}")
print(f"✅ Passed: {passed}")
print(f"❌ Failed: {failed}")
print(f"📈 Success Rate: {(passed/total)*100:.1f}%")
if failed > 0:
print("\n❌ FAILED TESTS:")
for result in self.results:
if result['status'] == 'FAIL':
print(f"{result['method']} {result['endpoint']}")
if 'error' in result:
print(f" Error: {result['error']}")
# Performance statistics
response_times = [r['response_time'] for r in self.results if r['response_time'] > 0]
if response_times:
avg_time = sum(response_times) / len(response_times)
max_time = max(response_times)
print(f"\n⏱️ Average Response Time: {avg_time:.3f}s")
print(f"⏱️ Max Response Time: {max_time:.3f}s")
# Save detailed report
report_file = f"crawl4ai_test_report_{int(time.time())}.json"
with open(report_file, 'w') as f:
json.dump({
"timestamp": time.time(),
"server_url": self.base_url,
"version": "0.7.0",
"summary": {
"total": total,
"passed": passed,
"failed": failed
},
"results": self.results
}, f, indent=2)
print(f"\n📄 Detailed report saved to: {report_file}")
def main():
"""Main test runner"""
import argparse
parser = argparse.ArgumentParser(description='Crawl4AI v0.7.0 API Test Suite')
parser.add_argument('--url', default=BASE_URL, help='Base URL of the server')
args = parser.parse_args()
tester = SimpleApiTester(args.url)
try:
tester.run_all_tests()
except KeyboardInterrupt:
print("\n🛑 Test suite interrupted by user")
except Exception as e:
print(f"\n💥 Test suite failed with error: {e}")
sys.exit(1)
if __name__ == "__main__":
main()