Compare commits
4 Commits
fix/json-i
...
fix/playwr
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
65902a4773 | ||
|
|
5c13baf574 | ||
|
|
d2759824ef | ||
|
|
bde1bba6a2 |
@@ -12,6 +12,20 @@ from playwright.async_api import TimeoutError as PlaywrightTimeoutError
|
||||
from io import BytesIO
|
||||
from PIL import Image, ImageDraw, ImageFont
|
||||
import hashlib
|
||||
|
||||
# Backward compatible stealth import
|
||||
try:
|
||||
# Try new tf-playwright-stealth API (Stealth class)
|
||||
from playwright_stealth import Stealth
|
||||
STEALTH_NEW_API = True
|
||||
except ImportError:
|
||||
try:
|
||||
# Try old playwright-stealth API (stealth_async function)
|
||||
from playwright_stealth import stealth_async
|
||||
STEALTH_NEW_API = False
|
||||
except ImportError:
|
||||
# No stealth available
|
||||
STEALTH_NEW_API = None
|
||||
import uuid
|
||||
from .js_snippet import load_js_script
|
||||
from .models import AsyncCrawlResponse
|
||||
@@ -31,6 +45,107 @@ from types import MappingProxyType
|
||||
import contextlib
|
||||
from functools import partial
|
||||
|
||||
|
||||
# Add StealthConfig class for backward compatibility and new features
|
||||
class StealthConfig:
|
||||
"""
|
||||
Configuration class for stealth settings that works with tf-playwright-stealth.
|
||||
This maintains backward compatibility while supporting all tf-playwright-stealth features.
|
||||
"""
|
||||
def __init__(
|
||||
self,
|
||||
# Common settings
|
||||
enabled: bool = True,
|
||||
|
||||
# Core tf-playwright-stealth parameters (matching the actual library)
|
||||
chrome_app: bool = True,
|
||||
chrome_csi: bool = True,
|
||||
chrome_load_times: bool = True,
|
||||
chrome_runtime: bool = False, # Note: library default is False
|
||||
hairline: bool = True,
|
||||
iframe_content_window: bool = True,
|
||||
media_codecs: bool = True,
|
||||
navigator_hardware_concurrency: bool = True,
|
||||
navigator_languages: bool = True,
|
||||
navigator_permissions: bool = True,
|
||||
navigator_platform: bool = True,
|
||||
navigator_plugins: bool = True,
|
||||
navigator_user_agent: bool = True,
|
||||
navigator_vendor: bool = True,
|
||||
navigator_webdriver: bool = True,
|
||||
sec_ch_ua: bool = True,
|
||||
webgl_vendor: bool = True,
|
||||
|
||||
# Override parameters
|
||||
navigator_languages_override: tuple = ("en-US", "en"),
|
||||
navigator_platform_override: str = "Win32",
|
||||
navigator_user_agent_override: str = None,
|
||||
navigator_vendor_override: str = None,
|
||||
sec_ch_ua_override: str = None,
|
||||
webgl_renderer_override: str = None,
|
||||
webgl_vendor_override: str = None,
|
||||
|
||||
# Advanced parameters
|
||||
init_scripts_only: bool = False,
|
||||
script_logging: bool = False,
|
||||
|
||||
# Legacy parameters for backward compatibility
|
||||
webdriver: bool = None, # This will be mapped to navigator_webdriver
|
||||
user_agent_override: bool = None, # This will be mapped to navigator_user_agent
|
||||
window_outerdimensions: bool = None, # This parameter doesn't exist in tf-playwright-stealth
|
||||
):
|
||||
self.enabled = enabled
|
||||
|
||||
# Handle legacy parameter mapping for backward compatibility
|
||||
if webdriver is not None:
|
||||
navigator_webdriver = webdriver
|
||||
if user_agent_override is not None:
|
||||
navigator_user_agent = user_agent_override
|
||||
|
||||
# Store all stealth options for the Stealth class - filter out None values
|
||||
self.stealth_options = {
|
||||
k: v for k, v in {
|
||||
'chrome_app': chrome_app,
|
||||
'chrome_csi': chrome_csi,
|
||||
'chrome_load_times': chrome_load_times,
|
||||
'chrome_runtime': chrome_runtime,
|
||||
'hairline': hairline,
|
||||
'iframe_content_window': iframe_content_window,
|
||||
'media_codecs': media_codecs,
|
||||
'navigator_hardware_concurrency': navigator_hardware_concurrency,
|
||||
'navigator_languages': navigator_languages,
|
||||
'navigator_permissions': navigator_permissions,
|
||||
'navigator_platform': navigator_platform,
|
||||
'navigator_plugins': navigator_plugins,
|
||||
'navigator_user_agent': navigator_user_agent,
|
||||
'navigator_vendor': navigator_vendor,
|
||||
'navigator_webdriver': navigator_webdriver,
|
||||
'sec_ch_ua': sec_ch_ua,
|
||||
'webgl_vendor': webgl_vendor,
|
||||
'navigator_languages_override': navigator_languages_override,
|
||||
'navigator_platform_override': navigator_platform_override,
|
||||
'navigator_user_agent_override': navigator_user_agent_override,
|
||||
'navigator_vendor_override': navigator_vendor_override,
|
||||
'sec_ch_ua_override': sec_ch_ua_override,
|
||||
'webgl_renderer_override': webgl_renderer_override,
|
||||
'webgl_vendor_override': webgl_vendor_override,
|
||||
'init_scripts_only': init_scripts_only,
|
||||
'script_logging': script_logging,
|
||||
}.items() if v is not None
|
||||
}
|
||||
|
||||
@classmethod
|
||||
def from_dict(cls, config_dict: dict) -> 'StealthConfig':
|
||||
"""Create StealthConfig from dictionary for easy configuration"""
|
||||
return cls(**config_dict)
|
||||
|
||||
def to_dict(self) -> dict:
|
||||
"""Convert to dictionary for serialization"""
|
||||
return {
|
||||
'enabled': self.enabled,
|
||||
**self.stealth_options
|
||||
}
|
||||
|
||||
class AsyncCrawlerStrategy(ABC):
|
||||
"""
|
||||
Abstract base class for crawler strategies.
|
||||
@@ -39,7 +154,7 @@ class AsyncCrawlerStrategy(ABC):
|
||||
|
||||
@abstractmethod
|
||||
async def crawl(self, url: str, **kwargs) -> AsyncCrawlResponse:
|
||||
pass # 4 + 3
|
||||
pass # 4 + 3
|
||||
|
||||
class AsyncPlaywrightCrawlerStrategy(AsyncCrawlerStrategy):
|
||||
"""
|
||||
@@ -220,6 +335,79 @@ class AsyncPlaywrightCrawlerStrategy(AsyncCrawlerStrategy):
|
||||
"""
|
||||
self.headers = headers
|
||||
|
||||
async def _apply_stealth(self, page: Page, stealth_config: Optional[StealthConfig] = None):
|
||||
"""
|
||||
Apply stealth measures to the page with backward compatibility and enhanced configuration.
|
||||
|
||||
This method automatically applies stealth measures and now supports configuration
|
||||
through StealthConfig while maintaining backward compatibility.
|
||||
|
||||
Currently supports:
|
||||
- tf-playwright-stealth (Stealth class with extensive configuration)
|
||||
- Old playwright-stealth v1.x (stealth_async function) - legacy support
|
||||
|
||||
Args:
|
||||
page (Page): The Playwright page object
|
||||
stealth_config (Optional[StealthConfig]): Configuration for stealth settings
|
||||
"""
|
||||
if STEALTH_NEW_API is None:
|
||||
# No stealth library available - silently continue
|
||||
if self.logger and hasattr(self.logger, 'debug'):
|
||||
self.logger.debug(
|
||||
message="playwright-stealth not available, skipping stealth measures",
|
||||
tag="STEALTH"
|
||||
)
|
||||
return
|
||||
|
||||
# Use default config if none provided
|
||||
if stealth_config is None:
|
||||
stealth_config = StealthConfig()
|
||||
|
||||
# Skip if stealth is disabled
|
||||
if not stealth_config.enabled:
|
||||
if self.logger and hasattr(self.logger, 'debug'):
|
||||
self.logger.debug(
|
||||
message="Stealth measures disabled in configuration",
|
||||
tag="STEALTH"
|
||||
)
|
||||
return
|
||||
|
||||
try:
|
||||
if STEALTH_NEW_API:
|
||||
# Use tf-playwright-stealth API with configuration support
|
||||
# Filter out any invalid parameters that might cause issues
|
||||
valid_options = {}
|
||||
for key, value in stealth_config.stealth_options.items():
|
||||
# Accept boolean parameters and specific string/tuple parameters
|
||||
if isinstance(value, (bool, str, tuple)):
|
||||
valid_options[key] = value
|
||||
|
||||
stealth = Stealth(**valid_options)
|
||||
await stealth.apply_stealth_async(page)
|
||||
|
||||
config_info = f"with {len(valid_options)} options"
|
||||
else:
|
||||
# Use old API (v1.x) - configuration options are limited
|
||||
await stealth_async(page)
|
||||
config_info = "default (v1.x legacy)"
|
||||
|
||||
# Only log if logger is available and in debug mode
|
||||
if self.logger and hasattr(self.logger, 'debug'):
|
||||
api_version = "tf-playwright-stealth" if STEALTH_NEW_API else "v1.x"
|
||||
self.logger.debug(
|
||||
message="Applied stealth measures using {version} {config}",
|
||||
tag="STEALTH",
|
||||
params={"version": api_version, "config": config_info}
|
||||
)
|
||||
except Exception as e:
|
||||
# Silently continue if stealth fails - don't break the crawling process
|
||||
if self.logger:
|
||||
self.logger.warning(
|
||||
message="Stealth measures failed, continuing without stealth: {error}",
|
||||
tag="STEALTH",
|
||||
params={"error": str(e)}
|
||||
)
|
||||
|
||||
async def smart_wait(self, page: Page, wait_for: str, timeout: float = 30000):
|
||||
"""
|
||||
Wait for a condition in a smart way. This functions works as below:
|
||||
@@ -532,6 +720,24 @@ class AsyncPlaywrightCrawlerStrategy(AsyncCrawlerStrategy):
|
||||
# Get page for session
|
||||
page, context = await self.browser_manager.get_page(crawlerRunConfig=config)
|
||||
|
||||
# Apply stealth measures automatically (backward compatible) with optional config
|
||||
# Check multiple possible locations for stealth config for flexibility
|
||||
stealth_config = None
|
||||
if hasattr(config, 'stealth_config') and config.stealth_config:
|
||||
stealth_config = config.stealth_config
|
||||
elif hasattr(config, 'stealth') and config.stealth:
|
||||
# Alternative attribute name for backward compatibility
|
||||
stealth_config = config.stealth if isinstance(config.stealth, StealthConfig) else StealthConfig.from_dict(config.stealth)
|
||||
elif config.magic:
|
||||
# Enable more aggressive stealth in magic mode
|
||||
stealth_config = StealthConfig(
|
||||
navigator_webdriver=False, # More aggressive stealth
|
||||
webdriver=False,
|
||||
chrome_app=False
|
||||
)
|
||||
|
||||
await self._apply_stealth(page, stealth_config)
|
||||
|
||||
# await page.goto(URL)
|
||||
|
||||
# Add default cookie
|
||||
@@ -933,7 +1139,6 @@ class AsyncPlaywrightCrawlerStrategy(AsyncCrawlerStrategy):
|
||||
tag="VIEWPORT",
|
||||
params={"error": str(e)},
|
||||
)
|
||||
|
||||
# Handle full page scanning
|
||||
if config.scan_full_page:
|
||||
# await self._handle_full_page_scan(page, config.scroll_delay)
|
||||
@@ -1837,8 +2042,6 @@ class AsyncPlaywrightCrawlerStrategy(AsyncCrawlerStrategy):
|
||||
# }}
|
||||
# }})();
|
||||
# """
|
||||
# )
|
||||
|
||||
# """ NEW VERSION:
|
||||
# When {script} contains statements (e.g., const link = …; link.click();),
|
||||
# this forms invalid JavaScript, causing Playwright execution error: SyntaxError: Unexpected token 'const'.
|
||||
|
||||
@@ -14,24 +14,8 @@ import hashlib
|
||||
from .js_snippet import load_js_script
|
||||
from .config import DOWNLOAD_PAGE_TIMEOUT
|
||||
from .async_configs import BrowserConfig, CrawlerRunConfig
|
||||
from playwright_stealth import StealthConfig
|
||||
from .utils import get_chromium_path
|
||||
|
||||
stealth_config = StealthConfig(
|
||||
webdriver=True,
|
||||
chrome_app=True,
|
||||
chrome_csi=True,
|
||||
chrome_load_times=True,
|
||||
chrome_runtime=True,
|
||||
navigator_languages=True,
|
||||
navigator_plugins=True,
|
||||
navigator_permissions=True,
|
||||
webgl_vendor=True,
|
||||
outerdimensions=True,
|
||||
navigator_hardware_concurrency=True,
|
||||
media_codecs=True,
|
||||
)
|
||||
|
||||
BROWSER_DISABLE_OPTIONS = [
|
||||
"--disable-background-networking",
|
||||
"--disable-background-timer-throttling",
|
||||
|
||||
@@ -54,27 +54,6 @@ def _get_memory_mb():
|
||||
logger.warning(f"Could not get memory info: {e}")
|
||||
return None
|
||||
|
||||
# --- Helper to sanitize JSON data ---
|
||||
def sanitize_json_data(data):
|
||||
"""
|
||||
Recursively sanitize data to handle infinity and NaN values that are not JSON compliant.
|
||||
"""
|
||||
import math
|
||||
|
||||
if isinstance(data, dict):
|
||||
return {k: sanitize_json_data(v) for k, v in data.items()}
|
||||
elif isinstance(data, list):
|
||||
return [sanitize_json_data(item) for item in data]
|
||||
elif isinstance(data, float):
|
||||
if math.isinf(data):
|
||||
return "Infinity" if data > 0 else "-Infinity"
|
||||
elif math.isnan(data):
|
||||
return "NaN"
|
||||
else:
|
||||
return data
|
||||
else:
|
||||
return data
|
||||
|
||||
|
||||
async def handle_llm_qa(
|
||||
url: str,
|
||||
@@ -392,10 +371,8 @@ async def stream_results(crawler: AsyncWebCrawler, results_gen: AsyncGenerator)
|
||||
server_memory_mb = _get_memory_mb()
|
||||
result_dict = result.model_dump()
|
||||
result_dict['server_memory_mb'] = server_memory_mb
|
||||
# Sanitize data to handle infinity values
|
||||
sanitized_dict = sanitize_json_data(result_dict)
|
||||
logger.info(f"Streaming result for {sanitized_dict.get('url', 'unknown')}")
|
||||
data = json.dumps(sanitized_dict, default=datetime_handler) + "\n"
|
||||
logger.info(f"Streaming result for {result_dict.get('url', 'unknown')}")
|
||||
data = json.dumps(result_dict, default=datetime_handler) + "\n"
|
||||
yield data.encode('utf-8')
|
||||
except Exception as e:
|
||||
logger.error(f"Serialization error: {e}")
|
||||
@@ -469,7 +446,7 @@ async def handle_crawl_request(
|
||||
|
||||
return {
|
||||
"success": True,
|
||||
"results": [sanitize_json_data(result.model_dump()) for result in results],
|
||||
"results": [result.model_dump() for result in results],
|
||||
"server_processing_time_s": end_time - start_time,
|
||||
"server_memory_delta_mb": mem_delta_mb,
|
||||
"server_peak_memory_mb": peak_mem_mb
|
||||
|
||||
@@ -331,27 +331,6 @@ async def generate_pdf(
|
||||
return {"success": True, "pdf": base64.b64encode(pdf_data).decode()}
|
||||
|
||||
|
||||
def sanitize_json_data(data):
|
||||
"""
|
||||
Recursively sanitize data to handle infinity and NaN values that are not JSON compliant.
|
||||
"""
|
||||
import math
|
||||
|
||||
if isinstance(data, dict):
|
||||
return {k: sanitize_json_data(v) for k, v in data.items()}
|
||||
elif isinstance(data, list):
|
||||
return [sanitize_json_data(item) for item in data]
|
||||
elif isinstance(data, float):
|
||||
if math.isinf(data):
|
||||
return "Infinity" if data > 0 else "-Infinity"
|
||||
elif math.isnan(data):
|
||||
return "NaN"
|
||||
else:
|
||||
return data
|
||||
else:
|
||||
return data
|
||||
|
||||
|
||||
@app.post("/execute_js")
|
||||
@limiter.limit(config["rate_limiting"]["default_limit"])
|
||||
@mcp_tool("execute_js")
|
||||
@@ -410,9 +389,7 @@ async def execute_js(
|
||||
results = await crawler.arun(url=body.url, config=cfg)
|
||||
# Return JSON-serializable dict of the first CrawlResult
|
||||
data = results[0].model_dump()
|
||||
# Sanitize data to handle infinity values
|
||||
sanitized_data = sanitize_json_data(data)
|
||||
return JSONResponse(sanitized_data)
|
||||
return JSONResponse(data)
|
||||
|
||||
|
||||
@app.get("/llm/{url:path}")
|
||||
|
||||
141
test_stealth_compatibility.py
Normal file
141
test_stealth_compatibility.py
Normal file
@@ -0,0 +1,141 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Test suite for playwright-stealth backward compatibility.
|
||||
Tests that stealth functionality works automatically without user configuration.
|
||||
"""
|
||||
|
||||
import pytest
|
||||
import asyncio
|
||||
from unittest.mock import Mock, patch, MagicMock
|
||||
|
||||
|
||||
class TestPlaywrightStealthCompatibility:
|
||||
"""Test playwright-stealth backward compatibility with transparent operation"""
|
||||
|
||||
def test_api_detection_works(self):
|
||||
"""Test that API detection works correctly"""
|
||||
from crawl4ai.async_crawler_strategy import STEALTH_NEW_API
|
||||
# The value depends on which version is installed, but should not be undefined
|
||||
assert STEALTH_NEW_API is not None or STEALTH_NEW_API is False or STEALTH_NEW_API is None
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@patch('crawl4ai.async_crawler_strategy.STEALTH_NEW_API', True)
|
||||
@patch('crawl4ai.async_crawler_strategy.Stealth')
|
||||
async def test_apply_stealth_new_api(self, mock_stealth_class):
|
||||
"""Test stealth application with new API works transparently"""
|
||||
from crawl4ai.async_crawler_strategy import AsyncPlaywrightCrawlerStrategy
|
||||
|
||||
# Setup mock
|
||||
mock_stealth_instance = Mock()
|
||||
mock_stealth_instance.apply_stealth_async = Mock()
|
||||
mock_stealth_class.return_value = mock_stealth_instance
|
||||
|
||||
# Create strategy instance
|
||||
strategy = AsyncPlaywrightCrawlerStrategy()
|
||||
|
||||
# Mock page
|
||||
mock_page = Mock()
|
||||
|
||||
# Test the method - should work transparently
|
||||
await strategy._apply_stealth(mock_page)
|
||||
|
||||
# Verify new API was used
|
||||
mock_stealth_class.assert_called_once()
|
||||
mock_stealth_instance.apply_stealth_async.assert_called_once_with(mock_page)
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@patch('crawl4ai.async_crawler_strategy.STEALTH_NEW_API', False)
|
||||
async def test_apply_stealth_legacy_api(self):
|
||||
"""Test stealth application with legacy API works transparently"""
|
||||
from crawl4ai.async_crawler_strategy import AsyncPlaywrightCrawlerStrategy
|
||||
|
||||
# Mock stealth_async function by setting it as a module attribute
|
||||
mock_stealth_async = Mock()
|
||||
mock_stealth_async.return_value = None
|
||||
|
||||
# Import the module to add the mock function
|
||||
import crawl4ai.async_crawler_strategy
|
||||
crawl4ai.async_crawler_strategy.stealth_async = mock_stealth_async
|
||||
|
||||
try:
|
||||
# Create strategy instance
|
||||
strategy = AsyncPlaywrightCrawlerStrategy()
|
||||
|
||||
# Mock page
|
||||
mock_page = Mock()
|
||||
|
||||
# Test the method - should work transparently
|
||||
await strategy._apply_stealth(mock_page)
|
||||
|
||||
# Verify legacy API was used
|
||||
mock_stealth_async.assert_called_once_with(mock_page)
|
||||
finally:
|
||||
# Clean up
|
||||
if hasattr(crawl4ai.async_crawler_strategy, 'stealth_async'):
|
||||
delattr(crawl4ai.async_crawler_strategy, 'stealth_async')
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@patch('crawl4ai.async_crawler_strategy.STEALTH_NEW_API', None)
|
||||
async def test_apply_stealth_no_library(self):
|
||||
"""Test stealth application when no stealth library is available"""
|
||||
from crawl4ai.async_crawler_strategy import AsyncPlaywrightCrawlerStrategy
|
||||
|
||||
# Create strategy instance
|
||||
strategy = AsyncPlaywrightCrawlerStrategy()
|
||||
|
||||
# Mock page
|
||||
mock_page = Mock()
|
||||
|
||||
# Test the method - should work transparently even without stealth
|
||||
await strategy._apply_stealth(mock_page)
|
||||
|
||||
# Should complete without error even when no stealth is available
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@patch('crawl4ai.async_crawler_strategy.STEALTH_NEW_API', True)
|
||||
@patch('crawl4ai.async_crawler_strategy.Stealth')
|
||||
async def test_stealth_error_handling(self, mock_stealth_class):
|
||||
"""Test that stealth errors are handled gracefully without breaking crawling"""
|
||||
from crawl4ai.async_crawler_strategy import AsyncPlaywrightCrawlerStrategy
|
||||
|
||||
# Setup mock to raise an error
|
||||
mock_stealth_instance = Mock()
|
||||
mock_stealth_instance.apply_stealth_async = Mock(side_effect=Exception("Stealth failed"))
|
||||
mock_stealth_class.return_value = mock_stealth_instance
|
||||
|
||||
# Create strategy instance
|
||||
strategy = AsyncPlaywrightCrawlerStrategy()
|
||||
|
||||
# Mock page
|
||||
mock_page = Mock()
|
||||
|
||||
# Test the method - should not raise an error, continue silently
|
||||
await strategy._apply_stealth(mock_page)
|
||||
|
||||
# Should complete without raising the stealth error
|
||||
|
||||
def test_strategy_creation_without_config(self):
|
||||
"""Test that strategy can be created without any stealth configuration"""
|
||||
from crawl4ai.async_crawler_strategy import AsyncPlaywrightCrawlerStrategy
|
||||
|
||||
# Should work without any stealth-related parameters
|
||||
strategy = AsyncPlaywrightCrawlerStrategy()
|
||||
assert strategy is not None
|
||||
assert hasattr(strategy, '_apply_stealth')
|
||||
|
||||
def test_browser_config_works_without_stealth_param(self):
|
||||
"""Test that BrowserConfig works without stealth parameter"""
|
||||
from crawl4ai.async_configs import BrowserConfig
|
||||
|
||||
# Should work without stealth parameter
|
||||
config = BrowserConfig()
|
||||
assert config is not None
|
||||
|
||||
# Should also work with other parameters
|
||||
config = BrowserConfig(headless=False, browser_type="firefox")
|
||||
assert config.headless == False
|
||||
assert config.browser_type == "firefox"
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
pytest.main([__file__, "-v"])
|
||||
@@ -1,345 +0,0 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Simple API Test for Crawl4AI Docker Server v0.7.0
|
||||
Uses only built-in Python modules to test all endpoints.
|
||||
"""
|
||||
|
||||
import urllib.request
|
||||
import urllib.parse
|
||||
import json
|
||||
import time
|
||||
import sys
|
||||
from typing import Dict, List, Optional
|
||||
|
||||
# Configuration
|
||||
BASE_URL = "http://localhost:11234" # Change to your server URL
|
||||
TEST_TIMEOUT = 30
|
||||
|
||||
class SimpleApiTester:
|
||||
def __init__(self, base_url: str = BASE_URL):
|
||||
self.base_url = base_url
|
||||
self.token = None
|
||||
self.results = []
|
||||
|
||||
def log(self, message: str):
|
||||
print(f"[INFO] {message}")
|
||||
|
||||
def test_get_endpoint(self, endpoint: str) -> Dict:
|
||||
"""Test a GET endpoint"""
|
||||
url = f"{self.base_url}{endpoint}"
|
||||
start_time = time.time()
|
||||
|
||||
try:
|
||||
req = urllib.request.Request(url)
|
||||
if self.token:
|
||||
req.add_header('Authorization', f'Bearer {self.token}')
|
||||
|
||||
with urllib.request.urlopen(req, timeout=TEST_TIMEOUT) as response:
|
||||
response_time = time.time() - start_time
|
||||
status_code = response.getcode()
|
||||
content = response.read().decode('utf-8')
|
||||
|
||||
# Try to parse JSON
|
||||
try:
|
||||
data = json.loads(content)
|
||||
except:
|
||||
data = {"raw_response": content[:200]}
|
||||
|
||||
return {
|
||||
"endpoint": endpoint,
|
||||
"method": "GET",
|
||||
"status": "PASS" if status_code < 400 else "FAIL",
|
||||
"status_code": status_code,
|
||||
"response_time": response_time,
|
||||
"data": data
|
||||
}
|
||||
except Exception as e:
|
||||
response_time = time.time() - start_time
|
||||
return {
|
||||
"endpoint": endpoint,
|
||||
"method": "GET",
|
||||
"status": "FAIL",
|
||||
"status_code": None,
|
||||
"response_time": response_time,
|
||||
"error": str(e)
|
||||
}
|
||||
|
||||
def test_post_endpoint(self, endpoint: str, payload: Dict) -> Dict:
|
||||
"""Test a POST endpoint"""
|
||||
url = f"{self.base_url}{endpoint}"
|
||||
start_time = time.time()
|
||||
|
||||
try:
|
||||
data = json.dumps(payload).encode('utf-8')
|
||||
req = urllib.request.Request(url, data=data, method='POST')
|
||||
req.add_header('Content-Type', 'application/json')
|
||||
|
||||
if self.token:
|
||||
req.add_header('Authorization', f'Bearer {self.token}')
|
||||
|
||||
with urllib.request.urlopen(req, timeout=TEST_TIMEOUT) as response:
|
||||
response_time = time.time() - start_time
|
||||
status_code = response.getcode()
|
||||
content = response.read().decode('utf-8')
|
||||
|
||||
# Try to parse JSON
|
||||
try:
|
||||
data = json.loads(content)
|
||||
except:
|
||||
data = {"raw_response": content[:200]}
|
||||
|
||||
return {
|
||||
"endpoint": endpoint,
|
||||
"method": "POST",
|
||||
"status": "PASS" if status_code < 400 else "FAIL",
|
||||
"status_code": status_code,
|
||||
"response_time": response_time,
|
||||
"data": data
|
||||
}
|
||||
except Exception as e:
|
||||
response_time = time.time() - start_time
|
||||
return {
|
||||
"endpoint": endpoint,
|
||||
"method": "POST",
|
||||
"status": "FAIL",
|
||||
"status_code": None,
|
||||
"response_time": response_time,
|
||||
"error": str(e)
|
||||
}
|
||||
|
||||
def print_result(self, result: Dict):
|
||||
"""Print a formatted test result"""
|
||||
status_color = {
|
||||
"PASS": "✅",
|
||||
"FAIL": "❌",
|
||||
"SKIP": "⏭️"
|
||||
}
|
||||
|
||||
print(f"{status_color[result['status']]} {result['method']} {result['endpoint']} "
|
||||
f"| {result['response_time']:.3f}s | Status: {result['status_code'] or 'N/A'}")
|
||||
|
||||
if result['status'] == 'FAIL' and 'error' in result:
|
||||
print(f" Error: {result['error']}")
|
||||
|
||||
self.results.append(result)
|
||||
|
||||
def run_all_tests(self):
|
||||
"""Run all API tests"""
|
||||
print("🚀 Starting Crawl4AI v0.7.0 API Test Suite")
|
||||
print(f"📡 Testing server at: {self.base_url}")
|
||||
print("=" * 60)
|
||||
|
||||
# # Test basic endpoints
|
||||
# print("\n=== BASIC ENDPOINTS ===")
|
||||
|
||||
# # Health check
|
||||
# result = self.test_get_endpoint("/health")
|
||||
# self.print_result(result)
|
||||
|
||||
|
||||
# # Schema endpoint
|
||||
# result = self.test_get_endpoint("/schema")
|
||||
# self.print_result(result)
|
||||
|
||||
# # Metrics endpoint
|
||||
# result = self.test_get_endpoint("/metrics")
|
||||
# self.print_result(result)
|
||||
|
||||
# # Root redirect
|
||||
# result = self.test_get_endpoint("/")
|
||||
# self.print_result(result)
|
||||
|
||||
# # Test authentication
|
||||
# print("\n=== AUTHENTICATION ===")
|
||||
|
||||
# # Get token
|
||||
# token_payload = {"email": "test@example.com"}
|
||||
# result = self.test_post_endpoint("/token", token_payload)
|
||||
# self.print_result(result)
|
||||
|
||||
# # Extract token if successful
|
||||
# if result['status'] == 'PASS' and 'data' in result:
|
||||
# token = result['data'].get('access_token')
|
||||
# if token:
|
||||
# self.token = token
|
||||
# self.log(f"Successfully obtained auth token: {token[:20]}...")
|
||||
|
||||
# Test core APIs
|
||||
print("\n=== CORE APIs ===")
|
||||
|
||||
test_url = "https://example.com"
|
||||
|
||||
# Test markdown endpoint
|
||||
md_payload = {
|
||||
"url": test_url,
|
||||
"f": "fit",
|
||||
"q": "test query",
|
||||
"c": "0"
|
||||
}
|
||||
result = self.test_post_endpoint("/md", md_payload)
|
||||
# print(result['data'].get('markdown', ''))
|
||||
self.print_result(result)
|
||||
|
||||
# Test HTML endpoint
|
||||
html_payload = {"url": test_url}
|
||||
result = self.test_post_endpoint("/html", html_payload)
|
||||
self.print_result(result)
|
||||
|
||||
# Test screenshot endpoint
|
||||
screenshot_payload = {
|
||||
"url": test_url,
|
||||
"screenshot_wait_for": 2
|
||||
}
|
||||
result = self.test_post_endpoint("/screenshot", screenshot_payload)
|
||||
self.print_result(result)
|
||||
|
||||
# Test PDF endpoint
|
||||
pdf_payload = {"url": test_url}
|
||||
result = self.test_post_endpoint("/pdf", pdf_payload)
|
||||
self.print_result(result)
|
||||
|
||||
# Test JavaScript execution
|
||||
js_payload = {
|
||||
"url": test_url,
|
||||
"scripts": ["(() => document.title)()"]
|
||||
}
|
||||
result = self.test_post_endpoint("/execute_js", js_payload)
|
||||
self.print_result(result)
|
||||
|
||||
# Test crawl endpoint
|
||||
crawl_payload = {
|
||||
"urls": [test_url],
|
||||
"browser_config": {},
|
||||
"crawler_config": {}
|
||||
}
|
||||
result = self.test_post_endpoint("/crawl", crawl_payload)
|
||||
self.print_result(result)
|
||||
|
||||
# Test config dump
|
||||
config_payload = {"code": "CrawlerRunConfig()"}
|
||||
result = self.test_post_endpoint("/config/dump", config_payload)
|
||||
self.print_result(result)
|
||||
|
||||
# Test LLM endpoint
|
||||
llm_endpoint = f"/llm/{test_url}?q=Extract%20main%20content"
|
||||
result = self.test_get_endpoint(llm_endpoint)
|
||||
self.print_result(result)
|
||||
|
||||
# Test ask endpoint
|
||||
ask_endpoint = "/ask?context_type=all&query=crawl4ai&max_results=5"
|
||||
result = self.test_get_endpoint(ask_endpoint)
|
||||
print(result)
|
||||
self.print_result(result)
|
||||
|
||||
# Test job APIs
|
||||
print("\n=== JOB APIs ===")
|
||||
|
||||
# Test LLM job
|
||||
llm_job_payload = {
|
||||
"url": test_url,
|
||||
"q": "Extract main content",
|
||||
"cache": False
|
||||
}
|
||||
result = self.test_post_endpoint("/llm/job", llm_job_payload)
|
||||
self.print_result(result)
|
||||
|
||||
# Test crawl job
|
||||
crawl_job_payload = {
|
||||
"urls": [test_url],
|
||||
"browser_config": {},
|
||||
"crawler_config": {}
|
||||
}
|
||||
result = self.test_post_endpoint("/crawl/job", crawl_job_payload)
|
||||
self.print_result(result)
|
||||
|
||||
# Test MCP
|
||||
print("\n=== MCP APIs ===")
|
||||
|
||||
# Test MCP schema
|
||||
result = self.test_get_endpoint("/mcp/schema")
|
||||
self.print_result(result)
|
||||
|
||||
# Test error handling
|
||||
print("\n=== ERROR HANDLING ===")
|
||||
|
||||
# Test invalid URL
|
||||
invalid_payload = {"url": "invalid-url", "f": "fit"}
|
||||
result = self.test_post_endpoint("/md", invalid_payload)
|
||||
self.print_result(result)
|
||||
|
||||
# Test invalid endpoint
|
||||
result = self.test_get_endpoint("/nonexistent")
|
||||
self.print_result(result)
|
||||
|
||||
# Print summary
|
||||
self.print_summary()
|
||||
|
||||
def print_summary(self):
|
||||
"""Print test results summary"""
|
||||
print("\n" + "=" * 60)
|
||||
print("📊 TEST RESULTS SUMMARY")
|
||||
print("=" * 60)
|
||||
|
||||
total = len(self.results)
|
||||
passed = sum(1 for r in self.results if r['status'] == 'PASS')
|
||||
failed = sum(1 for r in self.results if r['status'] == 'FAIL')
|
||||
|
||||
print(f"Total Tests: {total}")
|
||||
print(f"✅ Passed: {passed}")
|
||||
print(f"❌ Failed: {failed}")
|
||||
print(f"📈 Success Rate: {(passed/total)*100:.1f}%")
|
||||
|
||||
if failed > 0:
|
||||
print("\n❌ FAILED TESTS:")
|
||||
for result in self.results:
|
||||
if result['status'] == 'FAIL':
|
||||
print(f" • {result['method']} {result['endpoint']}")
|
||||
if 'error' in result:
|
||||
print(f" Error: {result['error']}")
|
||||
|
||||
# Performance statistics
|
||||
response_times = [r['response_time'] for r in self.results if r['response_time'] > 0]
|
||||
if response_times:
|
||||
avg_time = sum(response_times) / len(response_times)
|
||||
max_time = max(response_times)
|
||||
print(f"\n⏱️ Average Response Time: {avg_time:.3f}s")
|
||||
print(f"⏱️ Max Response Time: {max_time:.3f}s")
|
||||
|
||||
# Save detailed report
|
||||
report_file = f"crawl4ai_test_report_{int(time.time())}.json"
|
||||
with open(report_file, 'w') as f:
|
||||
json.dump({
|
||||
"timestamp": time.time(),
|
||||
"server_url": self.base_url,
|
||||
"version": "0.7.0",
|
||||
"summary": {
|
||||
"total": total,
|
||||
"passed": passed,
|
||||
"failed": failed
|
||||
},
|
||||
"results": self.results
|
||||
}, f, indent=2)
|
||||
|
||||
print(f"\n📄 Detailed report saved to: {report_file}")
|
||||
|
||||
def main():
|
||||
"""Main test runner"""
|
||||
import argparse
|
||||
|
||||
parser = argparse.ArgumentParser(description='Crawl4AI v0.7.0 API Test Suite')
|
||||
parser.add_argument('--url', default=BASE_URL, help='Base URL of the server')
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
tester = SimpleApiTester(args.url)
|
||||
|
||||
try:
|
||||
tester.run_all_tests()
|
||||
except KeyboardInterrupt:
|
||||
print("\n🛑 Test suite interrupted by user")
|
||||
except Exception as e:
|
||||
print(f"\n💥 Test suite failed with error: {e}")
|
||||
sys.exit(1)
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Reference in New Issue
Block a user