Compare commits

..

1 Commits

Author SHA1 Message Date
ntohidi
ca100c6518 Release v0.7.6: The 0.7.6 Update
- Updated version to 0.7.6
- Added comprehensive demo and release notes
- Updated all documentation
- Update the veriosn in Dockerfile to 0.7.6
2025-10-22 13:46:54 +02:00
28 changed files with 703 additions and 2413 deletions

View File

@@ -1383,10 +1383,9 @@ class AsyncPlaywrightCrawlerStrategy(AsyncCrawlerStrategy):
try:
await self.adapter.evaluate(page,
f"""
(async () => {{
(() => {{
try {{
const removeOverlays = {remove_overlays_js};
await removeOverlays();
{remove_overlays_js}
return {{ success: true }};
}} catch (error) {{
return {{

View File

@@ -617,17 +617,7 @@ class AsyncWebCrawler:
else config.chunking_strategy
)
sections = chunking.chunk(content)
# extracted_content = config.extraction_strategy.run(url, sections)
# Use async version if available for better parallelism
if hasattr(config.extraction_strategy, 'arun'):
extracted_content = await config.extraction_strategy.arun(url, sections)
else:
# Fallback to sync version run in thread pool to avoid blocking
extracted_content = await asyncio.to_thread(
config.extraction_strategy.run, url, sections
)
extracted_content = config.extraction_strategy.run(url, sections)
extracted_content = json.dumps(
extracted_content, indent=4, default=str, ensure_ascii=False
)

View File

@@ -369,9 +369,6 @@ class ManagedBrowser:
]
if self.headless:
flags.append("--headless=new")
# Add viewport flag if specified in config
if self.browser_config.viewport_height and self.browser_config.viewport_width:
flags.append(f"--window-size={self.browser_config.viewport_width},{self.browser_config.viewport_height}")
# merge common launch flags
flags.extend(self.build_browser_flags(self.browser_config))
elif self.browser_type == "firefox":

View File

@@ -94,20 +94,6 @@ class ExtractionStrategy(ABC):
extracted_content.extend(future.result())
return extracted_content
async def arun(self, url: str, sections: List[str], *q, **kwargs) -> List[Dict[str, Any]]:
"""
Async version: Process sections of text in parallel using asyncio.
Default implementation runs the sync version in a thread pool.
Subclasses can override this for true async processing.
:param url: The URL of the webpage.
:param sections: List of sections (strings) to process.
:return: A list of processed JSON blocks.
"""
import asyncio
return await asyncio.to_thread(self.run, url, sections, *q, **kwargs)
class NoExtractionStrategy(ExtractionStrategy):
"""
@@ -794,177 +780,6 @@ class LLMExtractionStrategy(ExtractionStrategy):
return extracted_content
async def aextract(self, url: str, ix: int, html: str) -> List[Dict[str, Any]]:
"""
Async version: Extract meaningful blocks or chunks from the given HTML using an LLM.
How it works:
1. Construct a prompt with variables.
2. Make an async request to the LLM using the prompt.
3. Parse the response and extract blocks or chunks.
Args:
url: The URL of the webpage.
ix: Index of the block.
html: The HTML content of the webpage.
Returns:
A list of extracted blocks or chunks.
"""
from .utils import aperform_completion_with_backoff
if self.verbose:
print(f"[LOG] Call LLM for {url} - block index: {ix}")
variable_values = {
"URL": url,
"HTML": escape_json_string(sanitize_html(html)),
}
prompt_with_variables = PROMPT_EXTRACT_BLOCKS
if self.instruction:
variable_values["REQUEST"] = self.instruction
prompt_with_variables = PROMPT_EXTRACT_BLOCKS_WITH_INSTRUCTION
if self.extract_type == "schema" and self.schema:
variable_values["SCHEMA"] = json.dumps(self.schema, indent=2)
prompt_with_variables = PROMPT_EXTRACT_SCHEMA_WITH_INSTRUCTION
if self.extract_type == "schema" and not self.schema:
prompt_with_variables = PROMPT_EXTRACT_INFERRED_SCHEMA
for variable in variable_values:
prompt_with_variables = prompt_with_variables.replace(
"{" + variable + "}", variable_values[variable]
)
try:
response = await aperform_completion_with_backoff(
self.llm_config.provider,
prompt_with_variables,
self.llm_config.api_token,
base_url=self.llm_config.base_url,
json_response=self.force_json_response,
extra_args=self.extra_args,
)
# Track usage
usage = TokenUsage(
completion_tokens=response.usage.completion_tokens,
prompt_tokens=response.usage.prompt_tokens,
total_tokens=response.usage.total_tokens,
completion_tokens_details=response.usage.completion_tokens_details.__dict__
if response.usage.completion_tokens_details
else {},
prompt_tokens_details=response.usage.prompt_tokens_details.__dict__
if response.usage.prompt_tokens_details
else {},
)
self.usages.append(usage)
# Update totals
self.total_usage.completion_tokens += usage.completion_tokens
self.total_usage.prompt_tokens += usage.prompt_tokens
self.total_usage.total_tokens += usage.total_tokens
try:
content = response.choices[0].message.content
blocks = None
if self.force_json_response:
blocks = json.loads(content)
if isinstance(blocks, dict):
if len(blocks) == 1 and isinstance(list(blocks.values())[0], list):
blocks = list(blocks.values())[0]
else:
blocks = [blocks]
elif isinstance(blocks, list):
blocks = blocks
else:
blocks = extract_xml_data(["blocks"], content)["blocks"]
blocks = json.loads(blocks)
for block in blocks:
block["error"] = False
except Exception:
parsed, unparsed = split_and_parse_json_objects(
response.choices[0].message.content
)
blocks = parsed
if unparsed:
blocks.append(
{"index": 0, "error": True, "tags": ["error"], "content": unparsed}
)
if self.verbose:
print(
"[LOG] Extracted",
len(blocks),
"blocks from URL:",
url,
"block index:",
ix,
)
return blocks
except Exception as e:
if self.verbose:
print(f"[LOG] Error in LLM extraction: {e}")
return [
{
"index": ix,
"error": True,
"tags": ["error"],
"content": str(e),
}
]
async def arun(self, url: str, sections: List[str]) -> List[Dict[str, Any]]:
"""
Async version: Process sections with true parallelism using asyncio.gather.
Args:
url: The URL of the webpage.
sections: List of sections (strings) to process.
Returns:
A list of extracted blocks or chunks.
"""
import asyncio
merged_sections = self._merge(
sections,
self.chunk_token_threshold,
overlap=int(self.chunk_token_threshold * self.overlap_rate),
)
extracted_content = []
# Create tasks for all sections to run in parallel
tasks = [
self.aextract(url, ix, sanitize_input_encode(section))
for ix, section in enumerate(merged_sections)
]
# Execute all tasks concurrently
results = await asyncio.gather(*tasks, return_exceptions=True)
# Process results
for result in results:
if isinstance(result, Exception):
if self.verbose:
print(f"Error in async extraction: {result}")
extracted_content.append(
{
"index": 0,
"error": True,
"tags": ["error"],
"content": str(result),
}
)
else:
extracted_content.extend(result)
return extracted_content
def show_usage(self) -> None:
"""Print a detailed token usage report showing total and per-request usage."""
print("\n=== Token Usage Summary ===")

View File

@@ -1825,82 +1825,6 @@ def perform_completion_with_backoff(
# ]
async def aperform_completion_with_backoff(
provider,
prompt_with_variables,
api_token,
json_response=False,
base_url=None,
**kwargs,
):
"""
Async version: Perform an API completion request with exponential backoff.
How it works:
1. Sends an async completion request to the API.
2. Retries on rate-limit errors with exponential delays (async).
3. Returns the API response or an error after all retries.
Args:
provider (str): The name of the API provider.
prompt_with_variables (str): The input prompt for the completion request.
api_token (str): The API token for authentication.
json_response (bool): Whether to request a JSON response. Defaults to False.
base_url (Optional[str]): The base URL for the API. Defaults to None.
**kwargs: Additional arguments for the API request.
Returns:
dict: The API response or an error message after all retries.
"""
from litellm import acompletion
from litellm.exceptions import RateLimitError
import asyncio
max_attempts = 3
base_delay = 2 # Base delay in seconds, you can adjust this based on your needs
extra_args = {"temperature": 0.01, "api_key": api_token, "base_url": base_url}
if json_response:
extra_args["response_format"] = {"type": "json_object"}
if kwargs.get("extra_args"):
extra_args.update(kwargs["extra_args"])
for attempt in range(max_attempts):
try:
response = await acompletion(
model=provider,
messages=[{"role": "user", "content": prompt_with_variables}],
**extra_args,
)
return response # Return the successful response
except RateLimitError as e:
print("Rate limit error:", str(e))
if attempt == max_attempts - 1:
# Last attempt failed, raise the error.
raise
# Check if we have exhausted our max attempts
if attempt < max_attempts - 1:
# Calculate the delay and wait
delay = base_delay * (2**attempt) # Exponential backoff formula
print(f"Waiting for {delay} seconds before retrying...")
await asyncio.sleep(delay)
else:
# Return an error response after exhausting all retries
return [
{
"index": 0,
"tags": ["error"],
"content": ["Rate limit error. Please try again later."],
}
]
except Exception as e:
raise e # Raise any other exceptions immediately
def extract_blocks(url, html, provider=DEFAULT_PROVIDER, api_token=None, base_url=None):
"""
Extract content blocks from website HTML using an AI provider.

View File

@@ -785,54 +785,6 @@ curl http://localhost:11235/crawl/job/crawl_xyz
The response includes `status` field: `"processing"`, `"completed"`, or `"failed"`.
#### LLM Extraction Jobs with Webhooks
The same webhook system works for LLM extraction jobs via `/llm/job`:
```bash
# Submit LLM extraction job with webhook
curl -X POST http://localhost:11235/llm/job \
-H "Content-Type: application/json" \
-d '{
"url": "https://example.com/article",
"q": "Extract the article title, author, and main points",
"provider": "openai/gpt-4o-mini",
"webhook_config": {
"webhook_url": "https://myapp.com/webhooks/llm-complete",
"webhook_data_in_payload": true,
"webhook_headers": {
"X-Webhook-Secret": "your-secret-token"
}
}
}'
# Response: {"task_id": "llm_1234567890"}
```
**Your webhook receives:**
```json
{
"task_id": "llm_1234567890",
"task_type": "llm_extraction",
"status": "completed",
"timestamp": "2025-10-22T12:30:00.000000+00:00",
"urls": ["https://example.com/article"],
"data": {
"extracted_content": {
"title": "Understanding Web Scraping",
"author": "John Doe",
"main_points": ["Point 1", "Point 2", "Point 3"]
}
}
}
```
**Key Differences for LLM Jobs:**
- Task type is `"llm_extraction"` instead of `"crawl"`
- Extracted data is in `data.extracted_content`
- Single URL only (not an array)
- Supports schema-based extraction with `schema` parameter
> 💡 **Pro tip**: See [WEBHOOK_EXAMPLES.md](./WEBHOOK_EXAMPLES.md) for detailed examples including TypeScript client code, Flask webhook handlers, and failure handling.
---

View File

@@ -6,16 +6,15 @@ x-base-config: &base-config
- "11235:11235" # Gunicorn port
env_file:
- .llm.env # API keys (create from .llm.env.example)
# Uncomment to set default environment variables (will overwrite .llm.env)
# environment:
# - OPENAI_API_KEY=${OPENAI_API_KEY:-}
# - DEEPSEEK_API_KEY=${DEEPSEEK_API_KEY:-}
# - ANTHROPIC_API_KEY=${ANTHROPIC_API_KEY:-}
# - GROQ_API_KEY=${GROQ_API_KEY:-}
# - TOGETHER_API_KEY=${TOGETHER_API_KEY:-}
# - MISTRAL_API_KEY=${MISTRAL_API_KEY:-}
# - GEMINI_API_KEY=${GEMINI_API_KEY:-}
# - LLM_PROVIDER=${LLM_PROVIDER:-} # Optional: Override default provider (e.g., "anthropic/claude-3-opus")
environment:
- OPENAI_API_KEY=${OPENAI_API_KEY:-}
- DEEPSEEK_API_KEY=${DEEPSEEK_API_KEY:-}
- ANTHROPIC_API_KEY=${ANTHROPIC_API_KEY:-}
- GROQ_API_KEY=${GROQ_API_KEY:-}
- TOGETHER_API_KEY=${TOGETHER_API_KEY:-}
- MISTRAL_API_KEY=${MISTRAL_API_KEY:-}
- GEMINI_API_TOKEN=${GEMINI_API_TOKEN:-}
- LLM_PROVIDER=${LLM_PROVIDER:-} # Optional: Override default provider (e.g., "anthropic/claude-3-opus")
volumes:
- /dev/shm:/dev/shm # Chromium performance
deploy:

View File

@@ -18,7 +18,7 @@ A comprehensive web-based tutorial for learning and experimenting with C4A-Scrip
2. **Install Dependencies**
```bash
pip install -r requirements.txt
pip install flask
```
3. **Launch the Server**
@@ -28,7 +28,7 @@ A comprehensive web-based tutorial for learning and experimenting with C4A-Scrip
4. **Open in Browser**
```
http://localhost:8000
http://localhost:8080
```
**🌐 Try Online**: [Live Demo](https://docs.crawl4ai.com/c4a-script/demo)
@@ -325,7 +325,7 @@ Powers the recording functionality:
### Configuration
```python
# server.py configuration
PORT = 8000
PORT = 8080
DEBUG = True
THREADED = True
```
@@ -343,9 +343,9 @@ THREADED = True
**Port Already in Use**
```bash
# Kill existing process
lsof -ti:8000 | xargs kill -9
lsof -ti:8080 | xargs kill -9
# Or use different port
python server.py --port 8001
python server.py --port 8081
```
**Blockly Not Loading**

View File

@@ -216,7 +216,7 @@ def get_examples():
'name': 'Handle Cookie Banner',
'description': 'Accept cookies and close newsletter popup',
'script': '''# Handle cookie banner and newsletter
GO http://127.0.0.1:8000/playground/
GO http://127.0.0.1:8080/playground/
WAIT `body` 2
IF (EXISTS `.cookie-banner`) THEN CLICK `.accept`
IF (EXISTS `.newsletter-popup`) THEN CLICK `.close`'''

File diff suppressed because it is too large Load Diff

View File

@@ -82,42 +82,6 @@ If you installed Crawl4AI (which installs Playwright under the hood), you alread
---
### Creating a Profile Using the Crawl4AI CLI (Easiest)
If you prefer a guided, interactive setup, use the built-in CLI to create and manage persistent browser profiles.
1.Launch the profile manager:
```bash
crwl profiles
```
2.Choose "Create new profile" and enter a profile name. A Chromium window opens so you can log in to sites and configure settings. When finished, return to the terminal and press `q` to save the profile.
3.Profiles are saved under `~/.crawl4ai/profiles/<profile_name>` (for example: `/home/<you>/.crawl4ai/profiles/test_profile_1`) along with a `storage_state.json` for cookies and session data.
4.Optionally, choose "List profiles" in the CLI to view available profiles and their paths.
5.Use the saved path with `BrowserConfig.user_data_dir`:
```python
from crawl4ai import AsyncWebCrawler, BrowserConfig
profile_path = "/home/<you>/.crawl4ai/profiles/test_profile_1"
browser_config = BrowserConfig(
headless=True,
use_managed_browser=True,
user_data_dir=profile_path,
browser_type="chromium",
)
async with AsyncWebCrawler(config=browser_config) as crawler:
result = await crawler.arun(url="https://example.com/private")
```
The CLI also supports listing and deleting profiles, and even testing a crawl directly from the menu.
---
## 3. Using Managed Browsers in Crawl4AI
Once you have a data directory with your session data, pass it to **`BrowserConfig`**:

View File

@@ -1,304 +1,98 @@
# Proxy & Security
This guide covers proxy configuration and security features in Crawl4AI, including SSL certificate analysis and proxy rotation strategies.
## Understanding Proxy Configuration
Crawl4AI recommends configuring proxies per request through `CrawlerRunConfig.proxy_config`. This gives you precise control, enables rotation strategies, and keeps examples simple enough to copy, paste, and run.
# Proxy
## Basic Proxy Setup
Configure proxies that apply to each crawl operation:
Simple proxy configuration with `BrowserConfig`:
```python
import asyncio
from crawl4ai import AsyncWebCrawler, BrowserConfig, CrawlerRunConfig, ProxyConfig
from crawl4ai.async_configs import BrowserConfig
run_config = CrawlerRunConfig(proxy_config=ProxyConfig(server="http://proxy.example.com:8080"))
# run_config = CrawlerRunConfig(proxy_config={"server": "http://proxy.example.com:8080"})
# run_config = CrawlerRunConfig(proxy_config="http://proxy.example.com:8080")
# Using HTTP proxy
browser_config = BrowserConfig(proxy_config={"server": "http://proxy.example.com:8080"})
async with AsyncWebCrawler(config=browser_config) as crawler:
result = await crawler.arun(url="https://example.com")
async def main():
browser_config = BrowserConfig()
async with AsyncWebCrawler(config=browser_config) as crawler:
result = await crawler.arun(url="https://example.com", config=run_config)
print(f"Success: {result.success} -> {result.url}")
if __name__ == "__main__":
asyncio.run(main())
# Using SOCKS proxy
browser_config = BrowserConfig(proxy_config={"server": "socks5://proxy.example.com:1080"})
async with AsyncWebCrawler(config=browser_config) as crawler:
result = await crawler.arun(url="https://example.com")
```
!!! note "Why request-level?"
`CrawlerRunConfig.proxy_config` keeps each request self-contained, so swapping proxies or rotation strategies is just a matter of building a new run configuration.
## Authenticated Proxy
## Supported Proxy Formats
The `ProxyConfig.from_string()` method supports multiple formats:
Use an authenticated proxy with `BrowserConfig`:
```python
from crawl4ai import ProxyConfig
from crawl4ai.async_configs import BrowserConfig
# HTTP proxy with authentication
proxy1 = ProxyConfig.from_string("http://user:pass@192.168.1.1:8080")
# HTTPS proxy
proxy2 = ProxyConfig.from_string("https://proxy.example.com:8080")
# SOCKS5 proxy
proxy3 = ProxyConfig.from_string("socks5://proxy.example.com:1080")
# Simple IP:port format
proxy4 = ProxyConfig.from_string("192.168.1.1:8080")
# IP:port:user:pass format
proxy5 = ProxyConfig.from_string("192.168.1.1:8080:user:pass")
browser_config = BrowserConfig(proxy_config={
"server": "http://[host]:[port]",
"username": "[username]",
"password": "[password]",
})
async with AsyncWebCrawler(config=browser_config) as crawler:
result = await crawler.arun(url="https://example.com")
```
## Authenticated Proxies
For proxies requiring authentication:
## Rotating Proxies
Example using a proxy rotation service dynamically:
```python
import asyncio
from crawl4ai import AsyncWebCrawler,BrowserConfig, CrawlerRunConfig, ProxyConfig
run_config = CrawlerRunConfig(
proxy_config=ProxyConfig(
server="http://proxy.example.com:8080",
username="your_username",
password="your_password",
)
)
# Or dictionary style:
# run_config = CrawlerRunConfig(proxy_config={
# "server": "http://proxy.example.com:8080",
# "username": "your_username",
# "password": "your_password",
# })
async def main():
browser_config = BrowserConfig()
async with AsyncWebCrawler(config=browser_config) as crawler:
result = await crawler.arun(url="https://example.com", config=run_config)
print(f"Success: {result.success} -> {result.url}")
if __name__ == "__main__":
asyncio.run(main())
```
## Environment Variable Configuration
Load proxies from environment variables for easy configuration:
```python
import os
from crawl4ai import ProxyConfig, CrawlerRunConfig
# Set environment variable
os.environ["PROXIES"] = "ip1:port1:user1:pass1,ip2:port2:user2:pass2,ip3:port3"
# Load all proxies
proxies = ProxyConfig.from_env()
print(f"Loaded {len(proxies)} proxies")
# Use first proxy
if proxies:
run_config = CrawlerRunConfig(proxy_config=proxies[0])
```
## Rotating Proxies
Crawl4AI supports automatic proxy rotation to distribute requests across multiple proxy servers. Rotation is applied per request using a rotation strategy on `CrawlerRunConfig`.
### Proxy Rotation (recommended)
```python
import asyncio
import re
from crawl4ai import AsyncWebCrawler, BrowserConfig, CrawlerRunConfig, CacheMode, ProxyConfig
from crawl4ai.proxy_strategy import RoundRobinProxyStrategy
from crawl4ai import (
AsyncWebCrawler,
BrowserConfig,
CrawlerRunConfig,
CacheMode,
RoundRobinProxyStrategy,
)
import asyncio
from crawl4ai import ProxyConfig
async def main():
# Load proxies from environment
# Load proxies and create rotation strategy
proxies = ProxyConfig.from_env()
#eg: export PROXIES="ip1:port1:username1:password1,ip2:port2:username2:password2"
if not proxies:
print("No proxies found! Set PROXIES environment variable.")
print("No proxies found in environment. Set PROXIES env variable!")
return
# Create rotation strategy
proxy_strategy = RoundRobinProxyStrategy(proxies)
# Configure per-request with proxy rotation
# Create configs
browser_config = BrowserConfig(headless=True, verbose=False)
run_config = CrawlerRunConfig(
cache_mode=CacheMode.BYPASS,
proxy_rotation_strategy=proxy_strategy,
proxy_rotation_strategy=proxy_strategy
)
async with AsyncWebCrawler(config=browser_config) as crawler:
urls = ["https://httpbin.org/ip"] * (len(proxies) * 2) # Test each proxy twice
print(f"🚀 Testing {len(proxies)} proxies with rotation...")
results = await crawler.arun_many(urls=urls, config=run_config)
print("\n📈 Initializing crawler with proxy rotation...")
async with AsyncWebCrawler(config=browser_config) as crawler:
print("\n🚀 Starting batch crawl with proxy rotation...")
results = await crawler.arun_many(
urls=urls,
config=run_config
)
for result in results:
if result.success:
ip_match = re.search(r'(?:[0-9]{1,3}\.){3}[0-9]{1,3}', result.html)
current_proxy = run_config.proxy_config if run_config.proxy_config else None
for i, result in enumerate(results):
if result.success:
# Extract IP from response
ip_match = re.search(r'(?:[0-9]{1,3}\.){3}[0-9]{1,3}', result.html)
if ip_match:
detected_ip = ip_match.group(0)
proxy_index = i % len(proxies)
expected_ip = proxies[proxy_index].ip
if current_proxy and ip_match:
print(f"URL {result.url}")
print(f"Proxy {current_proxy.server} -> Response IP: {ip_match.group(0)}")
verified = ip_match.group(0) == current_proxy.ip
if verified:
print(f"✅ Proxy working! IP matches: {current_proxy.ip}")
else:
print("❌ Proxy failed or IP mismatch!")
print("---")
print(f"✅ Request {i+1}: Proxy {proxy_index+1} -> IP {detected_ip}")
if detected_ip == expected_ip:
print(" 🎯 IP matches proxy configuration")
else:
print(f" ⚠️ IP mismatch (expected {expected_ip})")
else:
print(f"❌ Request {i+1}: Could not extract IP from response")
else:
print(f"❌ Request {i+1}: Failed - {result.error_message}")
asyncio.run(main())
if __name__ == "__main__":
asyncio.run(main())
```
## SSL Certificate Analysis
Combine proxy usage with SSL certificate inspection for enhanced security analysis. SSL certificate fetching is configured per request via `CrawlerRunConfig`.
### Per-Request SSL Certificate Analysis
```python
import asyncio
from crawl4ai import AsyncWebCrawler, BrowserConfig, CrawlerRunConfig
run_config = CrawlerRunConfig(
proxy_config={
"server": "http://proxy.example.com:8080",
"username": "user",
"password": "pass",
},
fetch_ssl_certificate=True, # Enable SSL certificate analysis for this request
)
async def main():
browser_config = BrowserConfig()
async with AsyncWebCrawler(config=browser_config) as crawler:
result = await crawler.arun(url="https://example.com", config=run_config)
if result.success:
print(f"✅ Crawled via proxy: {result.url}")
# Analyze SSL certificate
if result.ssl_certificate:
cert = result.ssl_certificate
print("🔒 SSL Certificate Info:")
print(f" Issuer: {cert.issuer}")
print(f" Subject: {cert.subject}")
print(f" Valid until: {cert.valid_until}")
print(f" Fingerprint: {cert.fingerprint}")
# Export certificate
cert.to_json("certificate.json")
print("💾 Certificate exported to certificate.json")
else:
print("⚠️ No SSL certificate information available")
if __name__ == "__main__":
asyncio.run(main())
```
## Security Best Practices
### 1. Proxy Rotation for Anonymity
```python
from crawl4ai import CrawlerRunConfig, ProxyConfig
from crawl4ai.proxy_strategy import RoundRobinProxyStrategy
# Use multiple proxies to avoid IP blocking
proxies = ProxyConfig.from_env("PROXIES")
strategy = RoundRobinProxyStrategy(proxies)
# Configure rotation per request (recommended)
run_config = CrawlerRunConfig(proxy_rotation_strategy=strategy)
# For a fixed proxy across all requests, just reuse the same run_config instance
static_run_config = run_config
```
### 2. SSL Certificate Verification
```python
from crawl4ai import CrawlerRunConfig
# Always verify SSL certificates when possible
# Per-request (affects specific requests)
run_config = CrawlerRunConfig(fetch_ssl_certificate=True)
```
### 3. Environment Variable Security
```bash
# Use environment variables for sensitive proxy credentials
# Avoid hardcoding usernames/passwords in code
export PROXIES="ip1:port1:user1:pass1,ip2:port2:user2:pass2"
```
### 4. SOCKS5 for Enhanced Security
```python
from crawl4ai import CrawlerRunConfig
# Prefer SOCKS5 proxies for better protocol support
run_config = CrawlerRunConfig(proxy_config="socks5://proxy.example.com:1080")
```
## Migration from Deprecated `proxy` Parameter
!!! warning "Deprecation Notice"
The legacy `proxy` argument on `BrowserConfig` is deprecated. Configure proxies through `CrawlerRunConfig.proxy_config` so each request fully describes its network settings.
```python
# Old (deprecated) approach
# from crawl4ai import BrowserConfig
# browser_config = BrowserConfig(proxy="http://proxy.example.com:8080")
# New (preferred) approach
from crawl4ai import CrawlerRunConfig
run_config = CrawlerRunConfig(proxy_config="http://proxy.example.com:8080")
```
### Safe Logging of Proxies
```python
from crawl4ai import ProxyConfig
def safe_proxy_repr(proxy: ProxyConfig):
if getattr(proxy, "username", None):
return f"{proxy.server} (auth: ****)"
return proxy.server
```
## Troubleshooting
### Common Issues
???+ question "Proxy connection failed"
- Verify the proxy server is reachable from your network.
- Double-check authentication credentials.
- Ensure the protocol matches (`http`, `https`, or `socks5`).
???+ question "SSL certificate errors"
- Some proxies break SSL inspection; switch proxies if you see repeated failures.
- Consider temporarily disabling certificate fetching to isolate the issue.
???+ question "Environment variables not loading"
- Confirm `PROXIES` (or your custom env var) is set before running the script.
- Check formatting: `ip:port:user:pass,ip:port:user:pass`.
???+ question "Proxy rotation not working"
- Ensure `ProxyConfig.from_env()` actually loaded entries (`len(proxies) > 0`).
- Attach `proxy_rotation_strategy` to `CrawlerRunConfig`.
- Validate the proxy definitions you pass into the strategy.

View File

@@ -18,7 +18,7 @@ A comprehensive web-based tutorial for learning and experimenting with C4A-Scrip
2. **Install Dependencies**
```bash
pip install -r requirements.txt
pip install flask
```
3. **Launch the Server**
@@ -28,7 +28,7 @@ A comprehensive web-based tutorial for learning and experimenting with C4A-Scrip
4. **Open in Browser**
```
http://localhost:8000
http://localhost:8080
```
**🌐 Try Online**: [Live Demo](https://docs.crawl4ai.com/c4a-script/demo)
@@ -325,7 +325,7 @@ Powers the recording functionality:
### Configuration
```python
# server.py configuration
PORT = 8000
PORT = 8080
DEBUG = True
THREADED = True
```
@@ -343,9 +343,9 @@ THREADED = True
**Port Already in Use**
```bash
# Kill existing process
lsof -ti:8000 | xargs kill -9
lsof -ti:8080 | xargs kill -9
# Or use different port
python server.py --port 8001
python server.py --port 8081
```
**Blockly Not Loading**

View File

@@ -216,7 +216,7 @@ def get_examples():
'name': 'Handle Cookie Banner',
'description': 'Accept cookies and close newsletter popup',
'script': '''# Handle cookie banner and newsletter
GO http://127.0.0.1:8000/playground/
GO http://127.0.0.1:8080/playground/
WAIT `body` 2
IF (EXISTS `.cookie-banner`) THEN CLICK `.accept`
IF (EXISTS `.newsletter-popup`) THEN CLICK `.close`'''
@@ -283,7 +283,7 @@ WAIT `.success-message` 5'''
return jsonify(examples)
if __name__ == '__main__':
port = int(os.environ.get('PORT', 8000))
port = int(os.environ.get('PORT', 8080))
print(f"""
╔══════════════════════════════════════════════════════════╗
║ C4A-Script Interactive Tutorial Server ║

View File

@@ -69,12 +69,12 @@ The tutorial includes a Flask-based web interface with:
cd docs/examples/c4a_script/tutorial/
# Install dependencies
pip install -r requirements.txt
pip install flask
# Launch the tutorial server
python server.py
python app.py
# Open http://localhost:8000 in your browser
# Open http://localhost:5000 in your browser
```
## Core Concepts
@@ -111,8 +111,8 @@ CLICK `.submit-btn`
# By attribute
CLICK `button[type="submit"]`
# By accessible attributes
CLICK `button[aria-label="Search"][title="Search"]`
# By text content
CLICK `button:contains("Sign In")`
# Complex selectors
CLICK `.form-container input[name="email"]`

View File

@@ -27,14 +27,6 @@
- [Hook Response Information](#hook-response-information)
- [Error Handling](#error-handling)
- [Hooks Utility: Function-Based Approach (Python)](#hooks-utility-function-based-approach-python)
- [Job Queue & Webhook API](#job-queue-webhook-api)
- [Why Use the Job Queue API?](#why-use-the-job-queue-api)
- [Available Endpoints](#available-endpoints)
- [Webhook Configuration](#webhook-configuration)
- [Usage Examples](#usage-examples)
- [Webhook Best Practices](#webhook-best-practices)
- [Use Cases](#use-cases)
- [Troubleshooting](#troubleshooting)
- [Dockerfile Parameters](#dockerfile-parameters)
- [Using the API](#using-the-api)
- [Playground Interface](#playground-interface)
@@ -1118,464 +1110,6 @@ if __name__ == "__main__":
---
## Job Queue & Webhook API
The Docker deployment includes a powerful asynchronous job queue system with webhook support for both crawling and LLM extraction tasks. Instead of waiting for long-running operations to complete, submit jobs and receive real-time notifications via webhooks when they finish.
### Why Use the Job Queue API?
**Traditional Synchronous API (`/crawl`):**
- Client waits for entire crawl to complete
- Timeout issues with long-running crawls
- Resource blocking during execution
- Constant polling required for status updates
**Asynchronous Job Queue API (`/crawl/job`, `/llm/job`):**
- ✅ Submit job and continue immediately
- ✅ No timeout concerns for long operations
- ✅ Real-time webhook notifications on completion
- ✅ Better resource utilization
- ✅ Perfect for batch processing
- ✅ Ideal for microservice architectures
### Available Endpoints
#### 1. Crawl Job Endpoint
```
POST /crawl/job
```
Submit an asynchronous crawl job with optional webhook notification.
**Request Body:**
```json
{
"urls": ["https://example.com"],
"cache_mode": "bypass",
"extraction_strategy": {
"type": "JsonCssExtractionStrategy",
"schema": {
"title": "h1",
"content": ".article-body"
}
},
"webhook_config": {
"webhook_url": "https://your-app.com/webhook/crawl-complete",
"webhook_data_in_payload": true,
"webhook_headers": {
"X-Webhook-Secret": "your-secret-token",
"X-Custom-Header": "value"
}
}
}
```
**Response:**
```json
{
"task_id": "crawl_1698765432",
"message": "Crawl job submitted"
}
```
#### 2. LLM Extraction Job Endpoint
```
POST /llm/job
```
Submit an asynchronous LLM extraction job with optional webhook notification.
**Request Body:**
```json
{
"url": "https://example.com/article",
"q": "Extract the article title, author, publication date, and main points",
"provider": "openai/gpt-4o-mini",
"schema": "{\"title\": \"string\", \"author\": \"string\", \"date\": \"string\", \"points\": [\"string\"]}",
"cache": false,
"webhook_config": {
"webhook_url": "https://your-app.com/webhook/llm-complete",
"webhook_data_in_payload": true,
"webhook_headers": {
"X-Webhook-Secret": "your-secret-token"
}
}
}
```
**Response:**
```json
{
"task_id": "llm_1698765432",
"message": "LLM job submitted"
}
```
#### 3. Job Status Endpoint
```
GET /job/{task_id}
```
Check the status and retrieve results of a submitted job.
**Response (In Progress):**
```json
{
"task_id": "crawl_1698765432",
"status": "processing",
"message": "Job is being processed"
}
```
**Response (Completed):**
```json
{
"task_id": "crawl_1698765432",
"status": "completed",
"result": {
"markdown": "# Page Title\n\nContent...",
"extracted_content": {...},
"links": {...}
}
}
```
### Webhook Configuration
Webhooks provide real-time notifications when your jobs complete, eliminating the need for constant polling.
#### Webhook Config Parameters
| Parameter | Type | Required | Description |
|-----------|------|----------|-------------|
| `webhook_url` | string | Yes | Your HTTP(S) endpoint to receive notifications |
| `webhook_data_in_payload` | boolean | No | Include full result data in webhook payload (default: false) |
| `webhook_headers` | object | No | Custom headers for authentication/identification |
#### Webhook Payload Format
**Success Notification (Crawl Job):**
```json
{
"task_id": "crawl_1698765432",
"task_type": "crawl",
"status": "completed",
"timestamp": "2025-10-22T12:30:00.000000+00:00",
"urls": ["https://example.com"],
"data": {
"markdown": "# Page content...",
"extracted_content": {...},
"links": {...}
}
}
```
**Success Notification (LLM Job):**
```json
{
"task_id": "llm_1698765432",
"task_type": "llm_extraction",
"status": "completed",
"timestamp": "2025-10-22T12:30:00.000000+00:00",
"urls": ["https://example.com/article"],
"data": {
"extracted_content": {
"title": "Understanding Web Scraping",
"author": "John Doe",
"date": "2025-10-22",
"points": ["Point 1", "Point 2"]
}
}
}
```
**Failure Notification:**
```json
{
"task_id": "crawl_1698765432",
"task_type": "crawl",
"status": "failed",
"timestamp": "2025-10-22T12:30:00.000000+00:00",
"urls": ["https://example.com"],
"error": "Connection timeout after 30 seconds"
}
```
#### Webhook Delivery & Retry
- **Delivery Method:** HTTP POST to your `webhook_url`
- **Content-Type:** `application/json`
- **Retry Policy:** Exponential backoff with 5 attempts
- Attempt 1: Immediate
- Attempt 2: 1 second delay
- Attempt 3: 2 seconds delay
- Attempt 4: 4 seconds delay
- Attempt 5: 8 seconds delay
- **Success Status Codes:** 200-299
- **Custom Headers:** Your `webhook_headers` are included in every request
### Usage Examples
#### Example 1: Python with Webhook Handler (Flask)
```python
from flask import Flask, request, jsonify
import requests
app = Flask(__name__)
# Webhook handler
@app.route('/webhook/crawl-complete', methods=['POST'])
def handle_crawl_webhook():
payload = request.json
if payload['status'] == 'completed':
print(f"✅ Job {payload['task_id']} completed!")
print(f"Task type: {payload['task_type']}")
# Access the crawl results
if 'data' in payload:
markdown = payload['data'].get('markdown', '')
extracted = payload['data'].get('extracted_content', {})
print(f"Extracted {len(markdown)} characters")
print(f"Structured data: {extracted}")
else:
print(f"❌ Job {payload['task_id']} failed: {payload.get('error')}")
return jsonify({"status": "received"}), 200
# Submit a crawl job with webhook
def submit_crawl_job():
response = requests.post(
"http://localhost:11235/crawl/job",
json={
"urls": ["https://example.com"],
"extraction_strategy": {
"type": "JsonCssExtractionStrategy",
"schema": {
"name": "Example Schema",
"baseSelector": "body",
"fields": [
{"name": "title", "selector": "h1", "type": "text"},
{"name": "description", "selector": "meta[name='description']", "type": "attribute", "attribute": "content"}
]
}
},
"webhook_config": {
"webhook_url": "https://your-app.com/webhook/crawl-complete",
"webhook_data_in_payload": True,
"webhook_headers": {
"X-Webhook-Secret": "your-secret-token"
}
}
}
)
task_id = response.json()['task_id']
print(f"Job submitted: {task_id}")
return task_id
if __name__ == '__main__':
app.run(port=5000)
```
#### Example 2: LLM Extraction with Webhooks
```python
import requests
def submit_llm_job_with_webhook():
response = requests.post(
"http://localhost:11235/llm/job",
json={
"url": "https://example.com/article",
"q": "Extract the article title, author, and main points",
"provider": "openai/gpt-4o-mini",
"webhook_config": {
"webhook_url": "https://your-app.com/webhook/llm-complete",
"webhook_data_in_payload": True,
"webhook_headers": {
"X-Webhook-Secret": "your-secret-token"
}
}
}
)
task_id = response.json()['task_id']
print(f"LLM job submitted: {task_id}")
return task_id
# Webhook handler for LLM jobs
@app.route('/webhook/llm-complete', methods=['POST'])
def handle_llm_webhook():
payload = request.json
if payload['status'] == 'completed':
extracted = payload['data']['extracted_content']
print(f"✅ LLM extraction completed!")
print(f"Results: {extracted}")
else:
print(f"❌ LLM extraction failed: {payload.get('error')}")
return jsonify({"status": "received"}), 200
```
#### Example 3: Without Webhooks (Polling)
If you don't use webhooks, you can poll for results:
```python
import requests
import time
# Submit job
response = requests.post(
"http://localhost:11235/crawl/job",
json={"urls": ["https://example.com"]}
)
task_id = response.json()['task_id']
# Poll for results
while True:
result = requests.get(f"http://localhost:11235/job/{task_id}")
data = result.json()
if data['status'] == 'completed':
print("Job completed!")
print(data['result'])
break
elif data['status'] == 'failed':
print(f"Job failed: {data.get('error')}")
break
print("Still processing...")
time.sleep(2)
```
#### Example 4: Global Webhook Configuration
Set a default webhook URL in your `config.yml` to avoid repeating it in every request:
```yaml
# config.yml
api:
crawler:
# ... other settings ...
webhook:
default_url: "https://your-app.com/webhook/default"
default_headers:
X-Webhook-Secret: "your-secret-token"
```
Then submit jobs without webhook config:
```python
# Uses the global webhook configuration
response = requests.post(
"http://localhost:11235/crawl/job",
json={"urls": ["https://example.com"]}
)
```
### Webhook Best Practices
1. **Authentication:** Always use custom headers for webhook authentication
```json
"webhook_headers": {
"X-Webhook-Secret": "your-secret-token"
}
```
2. **Idempotency:** Design your webhook handler to be idempotent (safe to receive duplicate notifications)
3. **Fast Response:** Return HTTP 200 quickly; process data asynchronously if needed
```python
@app.route('/webhook', methods=['POST'])
def webhook():
payload = request.json
# Queue for background processing
queue.enqueue(process_webhook, payload)
return jsonify({"status": "received"}), 200
```
4. **Error Handling:** Handle both success and failure notifications
```python
if payload['status'] == 'completed':
# Process success
elif payload['status'] == 'failed':
# Log error, retry, or alert
```
5. **Validation:** Verify webhook authenticity using custom headers
```python
secret = request.headers.get('X-Webhook-Secret')
if secret != os.environ['EXPECTED_SECRET']:
return jsonify({"error": "Unauthorized"}), 401
```
6. **Logging:** Log webhook deliveries for debugging
```python
logger.info(f"Webhook received: {payload['task_id']} - {payload['status']}")
```
### Use Cases
**1. Batch Processing**
Submit hundreds of URLs and get notified as each completes:
```python
urls = ["https://site1.com", "https://site2.com", ...]
for url in urls:
submit_crawl_job(url, webhook_url="https://app.com/webhook")
```
**2. Microservice Integration**
Integrate with event-driven architectures:
```python
# Service A submits job
task_id = submit_crawl_job(url)
# Service B receives webhook and triggers next step
@app.route('/webhook')
def webhook():
process_result(request.json)
trigger_next_service()
return "OK", 200
```
**3. Long-Running Extractions**
Handle complex LLM extractions without timeouts:
```python
submit_llm_job(
url="https://long-article.com",
q="Comprehensive summary with key points and analysis",
webhook_url="https://app.com/webhook/llm"
)
```
### Troubleshooting
**Webhook not receiving notifications?**
- Check your webhook URL is publicly accessible
- Verify firewall/security group settings
- Use webhook testing tools like webhook.site for debugging
- Check server logs for delivery attempts
- Ensure your handler returns 200-299 status code
**Job stuck in processing?**
- Check Redis connection: `docker logs <container_name> | grep redis`
- Verify worker processes: `docker exec <container_name> ps aux | grep worker`
- Check server logs: `docker logs <container_name>`
**Need to cancel a job?**
Jobs are processed asynchronously. If you need to cancel:
- Delete the task from Redis (requires Redis CLI access)
- Or implement a cancellation endpoint in your webhook handler
---
## Dockerfile Parameters
You can customize the image build process using build arguments (`--build-arg`). These are typically used via `docker buildx build` or within the `docker-compose.yml` file.

View File

@@ -57,7 +57,7 @@
Crawl4AI is the #1 trending GitHub repository, actively maintained by a vibrant community. It delivers blazing-fast, AI-ready web crawling tailored for large language models, AI agents, and data pipelines. Fully open source, flexible, and built for real-time performance, **Crawl4AI** empowers developers with unmatched speed, precision, and deployment ease.
> Enjoy using Crawl4AI? Consider **[becoming a sponsor](https://github.com/sponsors/unclecode)** to support ongoing development and community growth!
> **Note**: If you're looking for the old documentation, you can access it [here](https://old.docs.crawl4ai.com).
## 🆕 AI Assistant Skill Now Available!

View File

@@ -529,19 +529,8 @@ class AdminDashboard {
</label>
</div>
<div class="form-group full-width">
<label>Long Description (Markdown - Overview tab)</label>
<textarea id="form-long-description" rows="10" placeholder="Enter detailed description with markdown formatting...">${app?.long_description || ''}</textarea>
<small>Markdown support: **bold**, *italic*, [links](url), # headers, code blocks, lists</small>
</div>
<div class="form-group full-width">
<label>Integration Guide (Markdown - Integration tab)</label>
<textarea id="form-integration" rows="20" placeholder="Enter integration guide with installation, examples, and code snippets using markdown...">${app?.integration_guide || ''}</textarea>
<small>Single markdown field with installation, examples, and complete guide. Code blocks get auto copy buttons.</small>
</div>
<div class="form-group full-width">
<label>Documentation (Markdown - Documentation tab)</label>
<textarea id="form-documentation" rows="20" placeholder="Enter documentation with API reference, examples, and best practices using markdown...">${app?.documentation || ''}</textarea>
<small>Full documentation with API reference, examples, best practices, etc.</small>
<label>Integration Guide</label>
<textarea id="form-integration" rows="10">${app?.integration_guide || ''}</textarea>
</div>
</div>
`;
@@ -723,9 +712,7 @@ class AdminDashboard {
data.contact_email = document.getElementById('form-email').value;
data.featured = document.getElementById('form-featured').checked ? 1 : 0;
data.sponsored = document.getElementById('form-sponsored').checked ? 1 : 0;
data.long_description = document.getElementById('form-long-description').value;
data.integration_guide = document.getElementById('form-integration').value;
data.documentation = document.getElementById('form-documentation').value;
} else if (type === 'articles') {
data.title = document.getElementById('form-title').value;
data.slug = this.generateSlug(data.title);

View File

@@ -278,12 +278,12 @@
}
.tab-content {
display: none !important;
display: none;
padding: 2rem;
}
.tab-content.active {
display: block !important;
display: block;
}
/* Overview Layout */
@@ -510,31 +510,6 @@
line-height: 1.5;
}
/* Markdown rendered code blocks */
.integration-content pre,
.docs-content pre {
background: var(--bg-dark);
border: 1px solid var(--border-color);
margin: 1rem 0;
padding: 1rem;
padding-top: 2.5rem; /* Space for copy button */
overflow-x: auto;
position: relative;
max-height: none; /* Remove any height restrictions */
height: auto; /* Allow content to expand */
}
.integration-content pre code,
.docs-content pre code {
background: transparent;
padding: 0;
color: var(--text-secondary);
font-size: 0.875rem;
line-height: 1.5;
white-space: pre; /* Preserve whitespace and line breaks */
display: block;
}
/* Feature Grid */
.feature-grid {
display: grid;

View File

@@ -73,14 +73,27 @@
<div class="tabs">
<button class="tab-btn active" data-tab="overview">Overview</button>
<button class="tab-btn" data-tab="integration">Integration</button>
<!-- <button class="tab-btn" data-tab="docs">Documentation</button>
<button class="tab-btn" data-tab="support">Support</button> -->
<button class="tab-btn" data-tab="docs">Documentation</button>
<button class="tab-btn" data-tab="support">Support</button>
</div>
<section id="overview-tab" class="tab-content active">
<div class="overview-columns">
<div class="overview-main">
<h2>Overview</h2>
<div id="app-overview">Overview content goes here.</div>
<h3>Key Features</h3>
<ul id="app-features" class="features-list">
<li>Feature 1</li>
<li>Feature 2</li>
<li>Feature 3</li>
</ul>
<h3>Use Cases</h3>
<div id="app-use-cases" class="use-cases">
<p>Describe how this app can help your workflow.</p>
</div>
</div>
<aside class="sidebar">
@@ -129,16 +142,37 @@
</section>
<section id="integration-tab" class="tab-content">
<div class="integration-content" id="app-integration">
<div class="integration-content">
<h2>Integration Guide</h2>
<h3>Installation</h3>
<div class="code-block">
<pre><code id="install-code"># Installation instructions will appear here</code></pre>
</div>
<h3>Basic Usage</h3>
<div class="code-block">
<pre><code id="usage-code"># Usage example will appear here</code></pre>
</div>
<h3>Complete Integration Example</h3>
<div class="code-block">
<button class="copy-btn" id="copy-integration">Copy</button>
<pre><code id="integration-code"># Complete integration guide will appear here</code></pre>
</div>
</div>
</section>
<!-- <section id="docs-tab" class="tab-content">
<div class="docs-content" id="app-docs">
<section id="docs-tab" class="tab-content">
<div class="docs-content">
<h2>Documentation</h2>
<div id="app-docs" class="doc-sections">
<p>Documentation coming soon.</p>
</div>
</div>
</section> -->
</section>
<!-- <section id="support-tab" class="tab-content">
<section id="support-tab" class="tab-content">
<div class="docs-content">
<h2>Support</h2>
<div class="support-grid">
@@ -156,7 +190,7 @@
</div>
</div>
</div>
</section> -->
</section>
</div>
</main>

View File

@@ -112,7 +112,7 @@ class AppDetailPage {
}
// Contact
document.getElementById('app-contact') && (document.getElementById('app-contact').textContent = this.appData.contact_email || 'Not available');
document.getElementById('app-contact').textContent = this.appData.contact_email || 'Not available';
// Sidebar info
document.getElementById('sidebar-downloads').textContent = this.formatNumber(this.appData.downloads || 0);
@@ -123,134 +123,146 @@ class AppDetailPage {
document.getElementById('sidebar-pricing').textContent = this.appData.pricing || 'Free';
document.getElementById('sidebar-contact').textContent = this.appData.contact_email || 'contact@example.com';
// Render tab contents from database fields
this.renderTabContents();
// Integration guide
this.renderIntegrationGuide();
}
renderTabContents() {
// Overview tab - use long_description from database
const overviewDiv = document.getElementById('app-overview');
if (overviewDiv) {
if (this.appData.long_description) {
overviewDiv.innerHTML = this.renderMarkdown(this.appData.long_description);
} else {
overviewDiv.innerHTML = `<p>${this.appData.description || 'No overview available.'}</p>`;
renderIntegrationGuide() {
// Installation code
const installCode = document.getElementById('install-code');
if (installCode) {
if (this.appData.type === 'Open Source' && this.appData.github_url) {
installCode.textContent = `# Clone from GitHub
git clone ${this.appData.github_url}
# Install dependencies
pip install -r requirements.txt`;
} else if (this.appData.name.toLowerCase().includes('api')) {
installCode.textContent = `# Install via pip
pip install ${this.appData.slug}
# Or install from source
pip install git+${this.appData.github_url || 'https://github.com/example/repo'}`;
}
}
// Integration tab - use integration_guide field from database
const integrationDiv = document.getElementById('app-integration');
if (integrationDiv) {
if (this.appData.integration_guide) {
integrationDiv.innerHTML = this.renderMarkdown(this.appData.integration_guide);
// Add copy buttons to all code blocks
this.addCopyButtonsToCodeBlocks(integrationDiv);
} else {
integrationDiv.innerHTML = '<p>Integration guide not yet available. Please check the official website for details.</p>';
// Usage code - customize based on category
const usageCode = document.getElementById('usage-code');
if (usageCode) {
if (this.appData.category === 'Browser Automation') {
usageCode.textContent = `from crawl4ai import AsyncWebCrawler
from ${this.appData.slug.replace(/-/g, '_')} import ${this.appData.name.replace(/\s+/g, '')}
async def main():
# Initialize ${this.appData.name}
automation = ${this.appData.name.replace(/\s+/g, '')}()
async with AsyncWebCrawler() as crawler:
result = await crawler.arun(
url="https://example.com",
browser_config=automation.config,
wait_for="css:body"
)
print(result.markdown)`;
} else if (this.appData.category === 'Proxy Services') {
usageCode.textContent = `from crawl4ai import AsyncWebCrawler
import ${this.appData.slug.replace(/-/g, '_')}
# Configure proxy
proxy_config = {
"server": "${this.appData.website_url || 'https://proxy.example.com'}",
"username": "your_username",
"password": "your_password"
}
async with AsyncWebCrawler(proxy=proxy_config) as crawler:
result = await crawler.arun(
url="https://example.com",
bypass_cache=True
)
print(result.status_code)`;
} else if (this.appData.category === 'LLM Integration') {
usageCode.textContent = `from crawl4ai import AsyncWebCrawler
from crawl4ai.extraction_strategy import LLMExtractionStrategy
# Configure LLM extraction
strategy = LLMExtractionStrategy(
provider="${this.appData.name.toLowerCase().includes('gpt') ? 'openai' : 'anthropic'}",
api_key="your-api-key",
model="${this.appData.name.toLowerCase().includes('gpt') ? 'gpt-4' : 'claude-3'}",
instruction="Extract structured data"
)
async with AsyncWebCrawler() as crawler:
result = await crawler.arun(
url="https://example.com",
extraction_strategy=strategy
)
print(result.extracted_content)`;
}
}
// Documentation tab - use documentation field from database
const docsDiv = document.getElementById('app-docs');
if (docsDiv) {
if (this.appData.documentation) {
docsDiv.innerHTML = this.renderMarkdown(this.appData.documentation);
// Add copy buttons to all code blocks
this.addCopyButtonsToCodeBlocks(docsDiv);
} else {
docsDiv.innerHTML = '<p>Documentation coming soon.</p>';
}
// Integration example
const integrationCode = document.getElementById('integration-code');
if (integrationCode) {
integrationCode.textContent = this.appData.integration_guide ||
`# Complete ${this.appData.name} Integration Example
from crawl4ai import AsyncWebCrawler
from crawl4ai.extraction_strategy import JsonCssExtractionStrategy
import json
async def crawl_with_${this.appData.slug.replace(/-/g, '_')}():
"""
Complete example showing how to use ${this.appData.name}
with Crawl4AI for production web scraping
"""
# Define extraction schema
schema = {
"name": "ProductList",
"baseSelector": "div.product",
"fields": [
{"name": "title", "selector": "h2", "type": "text"},
{"name": "price", "selector": ".price", "type": "text"},
{"name": "image", "selector": "img", "type": "attribute", "attribute": "src"},
{"name": "link", "selector": "a", "type": "attribute", "attribute": "href"}
]
}
# Initialize crawler with ${this.appData.name}
async with AsyncWebCrawler(
browser_type="chromium",
headless=True,
verbose=True
) as crawler:
# Crawl with extraction
result = await crawler.arun(
url="https://example.com/products",
extraction_strategy=JsonCssExtractionStrategy(schema),
cache_mode="bypass",
wait_for="css:.product",
screenshot=True
)
# Process results
if result.success:
products = json.loads(result.extracted_content)
print(f"Found {len(products)} products")
for product in products[:5]:
print(f"- {product['title']}: {product['price']}")
return products
# Run the crawler
if __name__ == "__main__":
import asyncio
asyncio.run(crawl_with_${this.appData.slug.replace(/-/g, '_')}())`;
}
}
addCopyButtonsToCodeBlocks(container) {
// Find all code blocks and add copy buttons
const codeBlocks = container.querySelectorAll('pre code');
codeBlocks.forEach(codeBlock => {
const pre = codeBlock.parentElement;
// Skip if already has a copy button
if (pre.querySelector('.copy-btn')) return;
// Create copy button
const copyBtn = document.createElement('button');
copyBtn.className = 'copy-btn';
copyBtn.textContent = 'Copy';
copyBtn.onclick = () => {
navigator.clipboard.writeText(codeBlock.textContent).then(() => {
copyBtn.textContent = '✓ Copied!';
setTimeout(() => {
copyBtn.textContent = 'Copy';
}, 2000);
});
};
// Add button to pre element
pre.style.position = 'relative';
pre.insertBefore(copyBtn, codeBlock);
});
}
renderMarkdown(text) {
if (!text) return '';
// Store code blocks temporarily to protect them from processing
const codeBlocks = [];
let processed = text.replace(/```(\w+)?\n([\s\S]*?)```/g, (match, lang, code) => {
const placeholder = `___CODE_BLOCK_${codeBlocks.length}___`;
codeBlocks.push(`<pre><code class="language-${lang || ''}">${this.escapeHtml(code)}</code></pre>`);
return placeholder;
});
// Store inline code temporarily
const inlineCodes = [];
processed = processed.replace(/`([^`]+)`/g, (match, code) => {
const placeholder = `___INLINE_CODE_${inlineCodes.length}___`;
inlineCodes.push(`<code>${this.escapeHtml(code)}</code>`);
return placeholder;
});
// Now process the rest of the markdown
processed = processed
// Headers
.replace(/^### (.*$)/gim, '<h3>$1</h3>')
.replace(/^## (.*$)/gim, '<h2>$1</h2>')
.replace(/^# (.*$)/gim, '<h1>$1</h1>')
// Bold
.replace(/\*\*(.*?)\*\*/g, '<strong>$1</strong>')
// Italic
.replace(/\*(.*?)\*/g, '<em>$1</em>')
// Links
.replace(/\[([^\]]+)\]\(([^)]+)\)/g, '<a href="$2" target="_blank">$1</a>')
// Line breaks
.replace(/\n\n/g, '</p><p>')
.replace(/\n/g, '<br>')
// Lists
.replace(/^\* (.*)$/gim, '<li>$1</li>')
.replace(/^- (.*)$/gim, '<li>$1</li>')
// Wrap in paragraphs
.replace(/^(?!<[h|p|pre|ul|ol|li])/gim, '<p>')
.replace(/(?<![>])$/gim, '</p>');
// Restore inline code
inlineCodes.forEach((code, i) => {
processed = processed.replace(`___INLINE_CODE_${i}___`, code);
});
// Restore code blocks
codeBlocks.forEach((block, i) => {
processed = processed.replace(`___CODE_BLOCK_${i}___`, block);
});
return processed;
}
escapeHtml(text) {
const div = document.createElement('div');
div.textContent = text;
return div.innerHTML;
}
formatNumber(num) {
if (num >= 1000000) {
return (num / 1000000).toFixed(1) + 'M';
@@ -263,27 +275,45 @@ class AppDetailPage {
setupEventListeners() {
// Tab switching
const tabs = document.querySelectorAll('.tab-btn');
tabs.forEach(tab => {
tab.addEventListener('click', () => {
// Update active tab button
// Update active tab
tabs.forEach(t => t.classList.remove('active'));
tab.classList.add('active');
// Show corresponding content
const tabName = tab.dataset.tab;
// Hide all tab contents
const allTabContents = document.querySelectorAll('.tab-content');
allTabContents.forEach(content => {
document.querySelectorAll('.tab-content').forEach(content => {
content.classList.remove('active');
});
document.getElementById(`${tabName}-tab`).classList.add('active');
});
});
// Show the selected tab content
const targetTab = document.getElementById(`${tabName}-tab`);
if (targetTab) {
targetTab.classList.add('active');
}
// Copy integration code
document.getElementById('copy-integration').addEventListener('click', () => {
const code = document.getElementById('integration-code').textContent;
navigator.clipboard.writeText(code).then(() => {
const btn = document.getElementById('copy-integration');
const originalText = btn.innerHTML;
btn.innerHTML = '<span>✓</span> Copied!';
setTimeout(() => {
btn.innerHTML = originalText;
}, 2000);
});
});
// Copy code buttons
document.querySelectorAll('.copy-btn').forEach(btn => {
btn.addEventListener('click', (e) => {
const codeBlock = e.target.closest('.code-block');
const code = codeBlock.querySelector('code').textContent;
navigator.clipboard.writeText(code).then(() => {
btn.textContent = 'Copied!';
setTimeout(() => {
btn.textContent = 'Copy';
}, 2000);
});
});
});
}

View File

@@ -471,17 +471,13 @@ async def delete_sponsor(sponsor_id: int):
app.include_router(router)
# Version info
VERSION = "1.1.0"
BUILD_DATE = "2025-10-26"
@app.get("/")
async def root():
"""API info"""
return {
"name": "Crawl4AI Marketplace API",
"version": VERSION,
"build_date": BUILD_DATE,
"version": "1.0.0",
"endpoints": [
"/marketplace/api/apps",
"/marketplace/api/articles",

View File

@@ -31,7 +31,7 @@ dependencies = [
"rank-bm25~=0.2",
"snowballstemmer~=2.2",
"pydantic>=2.10",
"pyOpenSSL>=25.3.0",
"pyOpenSSL>=24.3.0",
"psutil>=6.1.1",
"PyYAML>=6.0",
"nltk>=3.9.1",

View File

@@ -19,7 +19,7 @@ rank-bm25~=0.2
colorama~=0.4
snowballstemmer~=2.2
pydantic>=2.10
pyOpenSSL>=25.3.0
pyOpenSSL>=24.3.0
psutil>=6.1.1
PyYAML>=6.0
nltk>=3.9.1

View File

@@ -364,19 +364,5 @@ async def test_network_error_handling():
async with AsyncPlaywrightCrawlerStrategy() as strategy:
await strategy.crawl("https://invalid.example.com", config)
@pytest.mark.asyncio
async def test_remove_overlay_elements(crawler_strategy):
config = CrawlerRunConfig(
remove_overlay_elements=True,
delay_before_return_html=5,
)
response = await crawler_strategy.crawl(
"https://www2.hm.com/en_us/index.html",
config
)
assert response.status_code == 200
assert "Accept all cookies" not in response.html
if __name__ == "__main__":
pytest.main([__file__, "-v"])

View File

@@ -1,220 +0,0 @@
"""
Final verification test for Issue #1055 fix
This test demonstrates that LLM extraction now runs in parallel
when using arun_many with multiple URLs.
"""
import os
import sys
import time
import asyncio
grandparent_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
sys.path.append(grandparent_dir)
from crawl4ai import (
AsyncWebCrawler,
BrowserConfig,
CrawlerRunConfig,
CacheMode,
LLMExtractionStrategy,
LLMConfig,
)
from pydantic import BaseModel
class SimpleData(BaseModel):
title: str
summary: str
def print_section(title):
print("\n" + "=" * 80)
print(title)
print("=" * 80 + "\n")
async def test_without_llm():
"""Baseline: Test crawling without LLM extraction"""
print_section("TEST 1: Crawling WITHOUT LLM Extraction")
config = CrawlerRunConfig(
cache_mode=CacheMode.BYPASS,
)
browser_config = BrowserConfig(headless=True, verbose=False)
urls = [
"https://www.example.com",
"https://www.iana.org",
"https://www.wikipedia.org",
]
print(f"Crawling {len(urls)} URLs without LLM extraction...")
print("Expected: Fast and parallel\n")
start_time = time.time()
async with AsyncWebCrawler(config=browser_config) as crawler:
results = await crawler.arun_many(urls=urls, config=config)
duration = time.time() - start_time
print(f"\n✅ Completed in {duration:.2f}s")
print(f" Successful: {sum(1 for r in results if r.success)}/{len(urls)}")
print(f" Average: {duration/len(urls):.2f}s per URL")
return duration
async def test_with_llm_before_fix():
"""Demonstrate the problem: Sequential execution with LLM"""
print_section("TEST 2: What Issue #1055 Reported (LLM Sequential Behavior)")
print("The issue reported that with LLM extraction, URLs would crawl")
print("one after another instead of in parallel.")
print("\nWithout our fix, this would show:")
print(" - URL 1 fetches → extracts → completes")
print(" - URL 2 fetches → extracts → completes")
print(" - URL 3 fetches → extracts → completes")
print("\nTotal time would be approximately sum of all individual times.")
async def test_with_llm_after_fix():
"""Demonstrate the fix: Parallel execution with LLM"""
print_section("TEST 3: After Fix - LLM Extraction in Parallel")
config = CrawlerRunConfig(
cache_mode=CacheMode.BYPASS,
extraction_strategy=LLMExtractionStrategy(
llm_config=LLMConfig(provider="openai/gpt-4o-mini"),
schema=SimpleData.model_json_schema(),
extraction_type="schema",
instruction="Extract title and summary",
)
)
browser_config = BrowserConfig(headless=True, verbose=False)
urls = [
"https://www.example.com",
"https://www.iana.org",
"https://www.wikipedia.org",
]
print(f"Crawling {len(urls)} URLs WITH LLM extraction...")
print("Expected: Parallel execution with our fix\n")
completion_times = {}
start_time = time.time()
async with AsyncWebCrawler(config=browser_config) as crawler:
results = await crawler.arun_many(urls=urls, config=config)
for result in results:
elapsed = time.time() - start_time
completion_times[result.url] = elapsed
print(f" [{elapsed:5.2f}s] ✓ {result.url[:50]}")
duration = time.time() - start_time
print(f"\n✅ Total time: {duration:.2f}s")
print(f" Successful: {sum(1 for url in urls if url in completion_times)}/{len(urls)}")
# Analyze parallelism
times = list(completion_times.values())
if len(times) >= 2:
# If parallel, completion times should be staggered, not evenly spaced
time_diffs = [times[i+1] - times[i] for i in range(len(times)-1)]
avg_diff = sum(time_diffs) / len(time_diffs)
print(f"\nParallelism Analysis:")
print(f" Completion time differences: {[f'{d:.2f}s' for d in time_diffs]}")
print(f" Average difference: {avg_diff:.2f}s")
# In parallel mode, some tasks complete close together
# In sequential mode, they're evenly spaced (avg ~2-3s apart)
if avg_diff < duration / len(urls):
print(f" ✅ PARALLEL: Tasks completed with overlapping execution")
else:
print(f" ⚠️ SEQUENTIAL: Tasks completed one after another")
return duration
async def test_multiple_arun_calls():
"""Test multiple individual arun() calls in parallel"""
print_section("TEST 4: Multiple arun() Calls with asyncio.gather")
config = CrawlerRunConfig(
cache_mode=CacheMode.BYPASS,
extraction_strategy=LLMExtractionStrategy(
llm_config=LLMConfig(provider="openai/gpt-4o-mini"),
schema=SimpleData.model_json_schema(),
extraction_type="schema",
instruction="Extract title and summary",
)
)
browser_config = BrowserConfig(headless=True, verbose=False)
urls = [
"https://www.example.com",
"https://www.iana.org",
"https://www.wikipedia.org",
]
print(f"Running {len(urls)} arun() calls with asyncio.gather()...")
print("Expected: True parallel execution\n")
start_time = time.time()
async with AsyncWebCrawler(config=browser_config) as crawler:
tasks = [crawler.arun(url, config=config) for url in urls]
results = await asyncio.gather(*tasks)
duration = time.time() - start_time
print(f"\n✅ Completed in {duration:.2f}s")
print(f" Successful: {sum(1 for r in results if r.success)}/{len(urls)}")
print(f" This proves the async LLM extraction works correctly")
return duration
async def main():
print("\n" + "🚀" * 40)
print("ISSUE #1055 FIX VERIFICATION")
print("Testing: Sequential → Parallel LLM Extraction")
print("🚀" * 40)
# Run tests
await test_without_llm()
await test_with_llm_before_fix()
time_with_llm = await test_with_llm_after_fix()
time_gather = await test_multiple_arun_calls()
# Final summary
print_section("FINAL VERDICT")
print("✅ Fix Verified!")
print("\nWhat changed:")
print(" • Created aperform_completion_with_backoff() using litellm.acompletion")
print(" • Added arun() method to ExtractionStrategy base class")
print(" • Implemented parallel arun() in LLMExtractionStrategy")
print(" • Updated AsyncWebCrawler to use arun() when available")
print("\nResult:")
print(" • LLM extraction now runs in parallel across multiple URLs")
print(" • Backward compatible - existing strategies still work")
print(" • No breaking changes to the API")
print("\n✨ Issue #1055 is RESOLVED!")
print("\n" + "=" * 80 + "\n")
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -1,168 +0,0 @@
"""
Lightweight test to verify pyOpenSSL security fix (Issue #1545).
This test verifies the security requirements are met:
1. pyOpenSSL >= 25.3.0 is installed
2. cryptography >= 45.0.7 is installed (above vulnerable range)
3. SSL/TLS functionality works correctly
This test can run without full crawl4ai dependencies installed.
"""
import sys
from packaging import version
def test_package_versions():
"""Test that package versions meet security requirements."""
print("=" * 70)
print("TEST: Package Version Security Requirements (Issue #1545)")
print("=" * 70)
all_passed = True
# Test pyOpenSSL version
try:
import OpenSSL
pyopenssl_version = OpenSSL.__version__
print(f"\n✓ pyOpenSSL is installed: {pyopenssl_version}")
if version.parse(pyopenssl_version) >= version.parse("25.3.0"):
print(f" ✓ PASS: pyOpenSSL {pyopenssl_version} >= 25.3.0 (required)")
else:
print(f" ✗ FAIL: pyOpenSSL {pyopenssl_version} < 25.3.0 (required)")
all_passed = False
except ImportError as e:
print(f"\n✗ FAIL: pyOpenSSL not installed - {e}")
all_passed = False
# Test cryptography version
try:
import cryptography
crypto_version = cryptography.__version__
print(f"\n✓ cryptography is installed: {crypto_version}")
# The vulnerable range is >=37.0.0 & <43.0.1
# We need >= 45.0.7 to be safe
if version.parse(crypto_version) >= version.parse("45.0.7"):
print(f" ✓ PASS: cryptography {crypto_version} >= 45.0.7 (secure)")
print(f" ✓ NOT in vulnerable range (37.0.0 to 43.0.0)")
elif version.parse(crypto_version) >= version.parse("37.0.0") and version.parse(crypto_version) < version.parse("43.0.1"):
print(f" ✗ FAIL: cryptography {crypto_version} is VULNERABLE")
print(f" ✗ Version is in vulnerable range (>=37.0.0 & <43.0.1)")
all_passed = False
else:
print(f" ⚠ WARNING: cryptography {crypto_version} < 45.0.7")
print(f" ⚠ May not meet security requirements")
except ImportError as e:
print(f"\n✗ FAIL: cryptography not installed - {e}")
all_passed = False
return all_passed
def test_ssl_basic_functionality():
"""Test that SSL/TLS basic functionality works."""
print("\n" + "=" * 70)
print("TEST: SSL/TLS Basic Functionality")
print("=" * 70)
try:
import OpenSSL.SSL
# Create a basic SSL context to verify functionality
context = OpenSSL.SSL.Context(OpenSSL.SSL.TLSv1_2_METHOD)
print("\n✓ SSL Context created successfully")
print(" ✓ PASS: SSL/TLS functionality is working")
return True
except Exception as e:
print(f"\n✗ FAIL: SSL functionality test failed - {e}")
return False
def test_pyopenssl_crypto_integration():
"""Test that pyOpenSSL and cryptography integration works."""
print("\n" + "=" * 70)
print("TEST: pyOpenSSL <-> cryptography Integration")
print("=" * 70)
try:
from OpenSSL import crypto
# Generate a simple key pair to test integration
key = crypto.PKey()
key.generate_key(crypto.TYPE_RSA, 2048)
print("\n✓ Generated RSA key pair successfully")
print(" ✓ PASS: pyOpenSSL and cryptography are properly integrated")
return True
except Exception as e:
print(f"\n✗ FAIL: Integration test failed - {e}")
import traceback
traceback.print_exc()
return False
def main():
"""Run all security tests."""
print("\n")
print("" + "=" * 68 + "")
print("║ pyOpenSSL Security Fix Verification - Issue #1545 ║")
print("" + "=" * 68 + "")
print("\nVerifying that the pyOpenSSL update resolves the security vulnerability")
print("in the cryptography package (CVE: versions >=37.0.0 & <43.0.1)\n")
results = []
# Test 1: Package versions
results.append(("Package Versions", test_package_versions()))
# Test 2: SSL functionality
results.append(("SSL Functionality", test_ssl_basic_functionality()))
# Test 3: Integration
results.append(("pyOpenSSL-crypto Integration", test_pyopenssl_crypto_integration()))
# Summary
print("\n" + "=" * 70)
print("TEST SUMMARY")
print("=" * 70)
all_passed = True
for test_name, passed in results:
status = "✓ PASS" if passed else "✗ FAIL"
print(f"{status}: {test_name}")
all_passed = all_passed and passed
print("=" * 70)
if all_passed:
print("\n✓✓✓ ALL TESTS PASSED ✓✓✓")
print("✓ Security vulnerability is resolved")
print("✓ pyOpenSSL >= 25.3.0 is working correctly")
print("✓ cryptography >= 45.0.7 (not vulnerable)")
print("\nThe dependency update is safe to merge.\n")
return True
else:
print("\n✗✗✗ SOME TESTS FAILED ✗✗✗")
print("✗ Security requirements not met")
print("\nDo NOT merge until all tests pass.\n")
return False
if __name__ == "__main__":
try:
success = main()
sys.exit(0 if success else 1)
except KeyboardInterrupt:
print("\n\nTest interrupted by user")
sys.exit(1)
except Exception as e:
print(f"\n✗ Unexpected error: {e}")
import traceback
traceback.print_exc()
sys.exit(1)

View File

@@ -1,184 +0,0 @@
"""
Test script to verify pyOpenSSL update doesn't break crawl4ai functionality.
This test verifies:
1. pyOpenSSL and cryptography versions are correct and secure
2. Basic crawling functionality still works
3. HTTPS/SSL connections work properly
4. Stealth mode integration works (uses playwright-stealth internally)
Issue: #1545 - Security vulnerability in cryptography package
Fix: Updated pyOpenSSL from >=24.3.0 to >=25.3.0
Expected: cryptography package should be >=45.0.7 (above vulnerable range)
"""
import asyncio
import sys
from packaging import version
def check_versions():
"""Verify pyOpenSSL and cryptography versions meet security requirements."""
print("=" * 60)
print("STEP 1: Checking Package Versions")
print("=" * 60)
try:
import OpenSSL
pyopenssl_version = OpenSSL.__version__
print(f"✓ pyOpenSSL version: {pyopenssl_version}")
# Check pyOpenSSL >= 25.3.0
if version.parse(pyopenssl_version) >= version.parse("25.3.0"):
print(f" ✓ Version check passed: {pyopenssl_version} >= 25.3.0")
else:
print(f" ✗ Version check FAILED: {pyopenssl_version} < 25.3.0")
return False
except ImportError as e:
print(f"✗ Failed to import pyOpenSSL: {e}")
return False
try:
import cryptography
crypto_version = cryptography.__version__
print(f"✓ cryptography version: {crypto_version}")
# Check cryptography >= 45.0.7 (above vulnerable range)
if version.parse(crypto_version) >= version.parse("45.0.7"):
print(f" ✓ Security check passed: {crypto_version} >= 45.0.7 (not vulnerable)")
else:
print(f" ✗ Security check FAILED: {crypto_version} < 45.0.7 (potentially vulnerable)")
return False
except ImportError as e:
print(f"✗ Failed to import cryptography: {e}")
return False
print("\n✓ All version checks passed!\n")
return True
async def test_basic_crawl():
"""Test basic crawling functionality with HTTPS site."""
print("=" * 60)
print("STEP 2: Testing Basic HTTPS Crawling")
print("=" * 60)
try:
from crawl4ai import AsyncWebCrawler
async with AsyncWebCrawler(verbose=True) as crawler:
# Test with a simple HTTPS site (requires SSL/TLS)
print("Crawling example.com (HTTPS)...")
result = await crawler.arun(
url="https://www.example.com",
bypass_cache=True
)
if result.success:
print(f"✓ Crawl successful!")
print(f" - Status code: {result.status_code}")
print(f" - Content length: {len(result.html)} bytes")
print(f" - SSL/TLS connection: ✓ Working")
return True
else:
print(f"✗ Crawl failed: {result.error_message}")
return False
except Exception as e:
print(f"✗ Test failed with error: {e}")
import traceback
traceback.print_exc()
return False
async def test_stealth_mode():
"""Test stealth mode functionality (depends on playwright-stealth)."""
print("\n" + "=" * 60)
print("STEP 3: Testing Stealth Mode Integration")
print("=" * 60)
try:
from crawl4ai import AsyncWebCrawler, BrowserConfig
# Create browser config with stealth mode
browser_config = BrowserConfig(
headless=True,
verbose=False
)
async with AsyncWebCrawler(config=browser_config, verbose=True) as crawler:
print("Crawling with stealth mode enabled...")
result = await crawler.arun(
url="https://www.example.com",
bypass_cache=True
)
if result.success:
print(f"✓ Stealth crawl successful!")
print(f" - Stealth mode: ✓ Working")
return True
else:
print(f"✗ Stealth crawl failed: {result.error_message}")
return False
except Exception as e:
print(f"✗ Stealth test failed with error: {e}")
import traceback
traceback.print_exc()
return False
async def main():
"""Run all tests."""
print("\n")
print("" + "=" * 58 + "")
print("║ pyOpenSSL Security Update Verification Test (Issue #1545) ║")
print("" + "=" * 58 + "")
print("\n")
# Step 1: Check versions
versions_ok = check_versions()
if not versions_ok:
print("\n✗ FAILED: Version requirements not met")
return False
# Step 2: Test basic crawling
crawl_ok = await test_basic_crawl()
if not crawl_ok:
print("\n✗ FAILED: Basic crawling test failed")
return False
# Step 3: Test stealth mode
stealth_ok = await test_stealth_mode()
if not stealth_ok:
print("\n✗ FAILED: Stealth mode test failed")
return False
# All tests passed
print("\n" + "=" * 60)
print("FINAL RESULT")
print("=" * 60)
print("✓ All tests passed successfully!")
print("✓ pyOpenSSL update is working correctly")
print("✓ No breaking changes detected")
print("✓ Security vulnerability resolved")
print("=" * 60)
print("\n")
return True
if __name__ == "__main__":
try:
success = asyncio.run(main())
sys.exit(0 if success else 1)
except KeyboardInterrupt:
print("\n\nTest interrupted by user")
sys.exit(1)
except Exception as e:
print(f"\n✗ Unexpected error: {e}")
import traceback
traceback.print_exc()
sys.exit(1)