Compare commits

..

1 Commits

Author SHA1 Message Date
ntohidi
ca100c6518 Release v0.7.6: The 0.7.6 Update
- Updated version to 0.7.6
- Added comprehensive demo and release notes
- Updated all documentation
- Update the veriosn in Dockerfile to 0.7.6
2025-10-22 13:46:54 +02:00
28 changed files with 703 additions and 2413 deletions

View File

@@ -1383,10 +1383,9 @@ class AsyncPlaywrightCrawlerStrategy(AsyncCrawlerStrategy):
try: try:
await self.adapter.evaluate(page, await self.adapter.evaluate(page,
f""" f"""
(async () => {{ (() => {{
try {{ try {{
const removeOverlays = {remove_overlays_js}; {remove_overlays_js}
await removeOverlays();
return {{ success: true }}; return {{ success: true }};
}} catch (error) {{ }} catch (error) {{
return {{ return {{

View File

@@ -617,17 +617,7 @@ class AsyncWebCrawler:
else config.chunking_strategy else config.chunking_strategy
) )
sections = chunking.chunk(content) sections = chunking.chunk(content)
# extracted_content = config.extraction_strategy.run(url, sections) extracted_content = config.extraction_strategy.run(url, sections)
# Use async version if available for better parallelism
if hasattr(config.extraction_strategy, 'arun'):
extracted_content = await config.extraction_strategy.arun(url, sections)
else:
# Fallback to sync version run in thread pool to avoid blocking
extracted_content = await asyncio.to_thread(
config.extraction_strategy.run, url, sections
)
extracted_content = json.dumps( extracted_content = json.dumps(
extracted_content, indent=4, default=str, ensure_ascii=False extracted_content, indent=4, default=str, ensure_ascii=False
) )

View File

@@ -369,9 +369,6 @@ class ManagedBrowser:
] ]
if self.headless: if self.headless:
flags.append("--headless=new") flags.append("--headless=new")
# Add viewport flag if specified in config
if self.browser_config.viewport_height and self.browser_config.viewport_width:
flags.append(f"--window-size={self.browser_config.viewport_width},{self.browser_config.viewport_height}")
# merge common launch flags # merge common launch flags
flags.extend(self.build_browser_flags(self.browser_config)) flags.extend(self.build_browser_flags(self.browser_config))
elif self.browser_type == "firefox": elif self.browser_type == "firefox":

View File

@@ -94,20 +94,6 @@ class ExtractionStrategy(ABC):
extracted_content.extend(future.result()) extracted_content.extend(future.result())
return extracted_content return extracted_content
async def arun(self, url: str, sections: List[str], *q, **kwargs) -> List[Dict[str, Any]]:
"""
Async version: Process sections of text in parallel using asyncio.
Default implementation runs the sync version in a thread pool.
Subclasses can override this for true async processing.
:param url: The URL of the webpage.
:param sections: List of sections (strings) to process.
:return: A list of processed JSON blocks.
"""
import asyncio
return await asyncio.to_thread(self.run, url, sections, *q, **kwargs)
class NoExtractionStrategy(ExtractionStrategy): class NoExtractionStrategy(ExtractionStrategy):
""" """
@@ -794,177 +780,6 @@ class LLMExtractionStrategy(ExtractionStrategy):
return extracted_content return extracted_content
async def aextract(self, url: str, ix: int, html: str) -> List[Dict[str, Any]]:
"""
Async version: Extract meaningful blocks or chunks from the given HTML using an LLM.
How it works:
1. Construct a prompt with variables.
2. Make an async request to the LLM using the prompt.
3. Parse the response and extract blocks or chunks.
Args:
url: The URL of the webpage.
ix: Index of the block.
html: The HTML content of the webpage.
Returns:
A list of extracted blocks or chunks.
"""
from .utils import aperform_completion_with_backoff
if self.verbose:
print(f"[LOG] Call LLM for {url} - block index: {ix}")
variable_values = {
"URL": url,
"HTML": escape_json_string(sanitize_html(html)),
}
prompt_with_variables = PROMPT_EXTRACT_BLOCKS
if self.instruction:
variable_values["REQUEST"] = self.instruction
prompt_with_variables = PROMPT_EXTRACT_BLOCKS_WITH_INSTRUCTION
if self.extract_type == "schema" and self.schema:
variable_values["SCHEMA"] = json.dumps(self.schema, indent=2)
prompt_with_variables = PROMPT_EXTRACT_SCHEMA_WITH_INSTRUCTION
if self.extract_type == "schema" and not self.schema:
prompt_with_variables = PROMPT_EXTRACT_INFERRED_SCHEMA
for variable in variable_values:
prompt_with_variables = prompt_with_variables.replace(
"{" + variable + "}", variable_values[variable]
)
try:
response = await aperform_completion_with_backoff(
self.llm_config.provider,
prompt_with_variables,
self.llm_config.api_token,
base_url=self.llm_config.base_url,
json_response=self.force_json_response,
extra_args=self.extra_args,
)
# Track usage
usage = TokenUsage(
completion_tokens=response.usage.completion_tokens,
prompt_tokens=response.usage.prompt_tokens,
total_tokens=response.usage.total_tokens,
completion_tokens_details=response.usage.completion_tokens_details.__dict__
if response.usage.completion_tokens_details
else {},
prompt_tokens_details=response.usage.prompt_tokens_details.__dict__
if response.usage.prompt_tokens_details
else {},
)
self.usages.append(usage)
# Update totals
self.total_usage.completion_tokens += usage.completion_tokens
self.total_usage.prompt_tokens += usage.prompt_tokens
self.total_usage.total_tokens += usage.total_tokens
try:
content = response.choices[0].message.content
blocks = None
if self.force_json_response:
blocks = json.loads(content)
if isinstance(blocks, dict):
if len(blocks) == 1 and isinstance(list(blocks.values())[0], list):
blocks = list(blocks.values())[0]
else:
blocks = [blocks]
elif isinstance(blocks, list):
blocks = blocks
else:
blocks = extract_xml_data(["blocks"], content)["blocks"]
blocks = json.loads(blocks)
for block in blocks:
block["error"] = False
except Exception:
parsed, unparsed = split_and_parse_json_objects(
response.choices[0].message.content
)
blocks = parsed
if unparsed:
blocks.append(
{"index": 0, "error": True, "tags": ["error"], "content": unparsed}
)
if self.verbose:
print(
"[LOG] Extracted",
len(blocks),
"blocks from URL:",
url,
"block index:",
ix,
)
return blocks
except Exception as e:
if self.verbose:
print(f"[LOG] Error in LLM extraction: {e}")
return [
{
"index": ix,
"error": True,
"tags": ["error"],
"content": str(e),
}
]
async def arun(self, url: str, sections: List[str]) -> List[Dict[str, Any]]:
"""
Async version: Process sections with true parallelism using asyncio.gather.
Args:
url: The URL of the webpage.
sections: List of sections (strings) to process.
Returns:
A list of extracted blocks or chunks.
"""
import asyncio
merged_sections = self._merge(
sections,
self.chunk_token_threshold,
overlap=int(self.chunk_token_threshold * self.overlap_rate),
)
extracted_content = []
# Create tasks for all sections to run in parallel
tasks = [
self.aextract(url, ix, sanitize_input_encode(section))
for ix, section in enumerate(merged_sections)
]
# Execute all tasks concurrently
results = await asyncio.gather(*tasks, return_exceptions=True)
# Process results
for result in results:
if isinstance(result, Exception):
if self.verbose:
print(f"Error in async extraction: {result}")
extracted_content.append(
{
"index": 0,
"error": True,
"tags": ["error"],
"content": str(result),
}
)
else:
extracted_content.extend(result)
return extracted_content
def show_usage(self) -> None: def show_usage(self) -> None:
"""Print a detailed token usage report showing total and per-request usage.""" """Print a detailed token usage report showing total and per-request usage."""
print("\n=== Token Usage Summary ===") print("\n=== Token Usage Summary ===")

View File

@@ -1825,82 +1825,6 @@ def perform_completion_with_backoff(
# ] # ]
async def aperform_completion_with_backoff(
provider,
prompt_with_variables,
api_token,
json_response=False,
base_url=None,
**kwargs,
):
"""
Async version: Perform an API completion request with exponential backoff.
How it works:
1. Sends an async completion request to the API.
2. Retries on rate-limit errors with exponential delays (async).
3. Returns the API response or an error after all retries.
Args:
provider (str): The name of the API provider.
prompt_with_variables (str): The input prompt for the completion request.
api_token (str): The API token for authentication.
json_response (bool): Whether to request a JSON response. Defaults to False.
base_url (Optional[str]): The base URL for the API. Defaults to None.
**kwargs: Additional arguments for the API request.
Returns:
dict: The API response or an error message after all retries.
"""
from litellm import acompletion
from litellm.exceptions import RateLimitError
import asyncio
max_attempts = 3
base_delay = 2 # Base delay in seconds, you can adjust this based on your needs
extra_args = {"temperature": 0.01, "api_key": api_token, "base_url": base_url}
if json_response:
extra_args["response_format"] = {"type": "json_object"}
if kwargs.get("extra_args"):
extra_args.update(kwargs["extra_args"])
for attempt in range(max_attempts):
try:
response = await acompletion(
model=provider,
messages=[{"role": "user", "content": prompt_with_variables}],
**extra_args,
)
return response # Return the successful response
except RateLimitError as e:
print("Rate limit error:", str(e))
if attempt == max_attempts - 1:
# Last attempt failed, raise the error.
raise
# Check if we have exhausted our max attempts
if attempt < max_attempts - 1:
# Calculate the delay and wait
delay = base_delay * (2**attempt) # Exponential backoff formula
print(f"Waiting for {delay} seconds before retrying...")
await asyncio.sleep(delay)
else:
# Return an error response after exhausting all retries
return [
{
"index": 0,
"tags": ["error"],
"content": ["Rate limit error. Please try again later."],
}
]
except Exception as e:
raise e # Raise any other exceptions immediately
def extract_blocks(url, html, provider=DEFAULT_PROVIDER, api_token=None, base_url=None): def extract_blocks(url, html, provider=DEFAULT_PROVIDER, api_token=None, base_url=None):
""" """
Extract content blocks from website HTML using an AI provider. Extract content blocks from website HTML using an AI provider.

View File

@@ -785,54 +785,6 @@ curl http://localhost:11235/crawl/job/crawl_xyz
The response includes `status` field: `"processing"`, `"completed"`, or `"failed"`. The response includes `status` field: `"processing"`, `"completed"`, or `"failed"`.
#### LLM Extraction Jobs with Webhooks
The same webhook system works for LLM extraction jobs via `/llm/job`:
```bash
# Submit LLM extraction job with webhook
curl -X POST http://localhost:11235/llm/job \
-H "Content-Type: application/json" \
-d '{
"url": "https://example.com/article",
"q": "Extract the article title, author, and main points",
"provider": "openai/gpt-4o-mini",
"webhook_config": {
"webhook_url": "https://myapp.com/webhooks/llm-complete",
"webhook_data_in_payload": true,
"webhook_headers": {
"X-Webhook-Secret": "your-secret-token"
}
}
}'
# Response: {"task_id": "llm_1234567890"}
```
**Your webhook receives:**
```json
{
"task_id": "llm_1234567890",
"task_type": "llm_extraction",
"status": "completed",
"timestamp": "2025-10-22T12:30:00.000000+00:00",
"urls": ["https://example.com/article"],
"data": {
"extracted_content": {
"title": "Understanding Web Scraping",
"author": "John Doe",
"main_points": ["Point 1", "Point 2", "Point 3"]
}
}
}
```
**Key Differences for LLM Jobs:**
- Task type is `"llm_extraction"` instead of `"crawl"`
- Extracted data is in `data.extracted_content`
- Single URL only (not an array)
- Supports schema-based extraction with `schema` parameter
> 💡 **Pro tip**: See [WEBHOOK_EXAMPLES.md](./WEBHOOK_EXAMPLES.md) for detailed examples including TypeScript client code, Flask webhook handlers, and failure handling. > 💡 **Pro tip**: See [WEBHOOK_EXAMPLES.md](./WEBHOOK_EXAMPLES.md) for detailed examples including TypeScript client code, Flask webhook handlers, and failure handling.
--- ---

View File

@@ -6,16 +6,15 @@ x-base-config: &base-config
- "11235:11235" # Gunicorn port - "11235:11235" # Gunicorn port
env_file: env_file:
- .llm.env # API keys (create from .llm.env.example) - .llm.env # API keys (create from .llm.env.example)
# Uncomment to set default environment variables (will overwrite .llm.env) environment:
# environment: - OPENAI_API_KEY=${OPENAI_API_KEY:-}
# - OPENAI_API_KEY=${OPENAI_API_KEY:-} - DEEPSEEK_API_KEY=${DEEPSEEK_API_KEY:-}
# - DEEPSEEK_API_KEY=${DEEPSEEK_API_KEY:-} - ANTHROPIC_API_KEY=${ANTHROPIC_API_KEY:-}
# - ANTHROPIC_API_KEY=${ANTHROPIC_API_KEY:-} - GROQ_API_KEY=${GROQ_API_KEY:-}
# - GROQ_API_KEY=${GROQ_API_KEY:-} - TOGETHER_API_KEY=${TOGETHER_API_KEY:-}
# - TOGETHER_API_KEY=${TOGETHER_API_KEY:-} - MISTRAL_API_KEY=${MISTRAL_API_KEY:-}
# - MISTRAL_API_KEY=${MISTRAL_API_KEY:-} - GEMINI_API_TOKEN=${GEMINI_API_TOKEN:-}
# - GEMINI_API_KEY=${GEMINI_API_KEY:-} - LLM_PROVIDER=${LLM_PROVIDER:-} # Optional: Override default provider (e.g., "anthropic/claude-3-opus")
# - LLM_PROVIDER=${LLM_PROVIDER:-} # Optional: Override default provider (e.g., "anthropic/claude-3-opus")
volumes: volumes:
- /dev/shm:/dev/shm # Chromium performance - /dev/shm:/dev/shm # Chromium performance
deploy: deploy:

View File

@@ -18,7 +18,7 @@ A comprehensive web-based tutorial for learning and experimenting with C4A-Scrip
2. **Install Dependencies** 2. **Install Dependencies**
```bash ```bash
pip install -r requirements.txt pip install flask
``` ```
3. **Launch the Server** 3. **Launch the Server**
@@ -28,7 +28,7 @@ A comprehensive web-based tutorial for learning and experimenting with C4A-Scrip
4. **Open in Browser** 4. **Open in Browser**
``` ```
http://localhost:8000 http://localhost:8080
``` ```
**🌐 Try Online**: [Live Demo](https://docs.crawl4ai.com/c4a-script/demo) **🌐 Try Online**: [Live Demo](https://docs.crawl4ai.com/c4a-script/demo)
@@ -325,7 +325,7 @@ Powers the recording functionality:
### Configuration ### Configuration
```python ```python
# server.py configuration # server.py configuration
PORT = 8000 PORT = 8080
DEBUG = True DEBUG = True
THREADED = True THREADED = True
``` ```
@@ -343,9 +343,9 @@ THREADED = True
**Port Already in Use** **Port Already in Use**
```bash ```bash
# Kill existing process # Kill existing process
lsof -ti:8000 | xargs kill -9 lsof -ti:8080 | xargs kill -9
# Or use different port # Or use different port
python server.py --port 8001 python server.py --port 8081
``` ```
**Blockly Not Loading** **Blockly Not Loading**

View File

@@ -216,7 +216,7 @@ def get_examples():
'name': 'Handle Cookie Banner', 'name': 'Handle Cookie Banner',
'description': 'Accept cookies and close newsletter popup', 'description': 'Accept cookies and close newsletter popup',
'script': '''# Handle cookie banner and newsletter 'script': '''# Handle cookie banner and newsletter
GO http://127.0.0.1:8000/playground/ GO http://127.0.0.1:8080/playground/
WAIT `body` 2 WAIT `body` 2
IF (EXISTS `.cookie-banner`) THEN CLICK `.accept` IF (EXISTS `.cookie-banner`) THEN CLICK `.accept`
IF (EXISTS `.newsletter-popup`) THEN CLICK `.close`''' IF (EXISTS `.newsletter-popup`) THEN CLICK `.close`'''

View File

@@ -1,445 +1,229 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
""" """
🚀 Crawl4AI Docker Hooks System - Complete Examples Comprehensive test demonstrating all hook types from hooks_example.py
==================================================== adapted for the Docker API with real URLs
This file demonstrates the Docker Hooks System with three different approaches:
1. String-based hooks for REST API
2. hooks_to_string() utility to convert functions
3. Docker Client with automatic conversion (most convenient)
Requirements:
- Docker container running: docker run -p 11235:11235 unclecode/crawl4ai:latest
- crawl4ai installed: pip install crawl4ai
""" """
import asyncio
import requests import requests
import json import json
import time import time
from typing import Dict, Any from typing import Dict, Any
# Import Crawl4AI components # API_BASE_URL = "http://localhost:11234"
from crawl4ai import hooks_to_string API_BASE_URL = "http://localhost:11235"
from crawl4ai.docker_client import Crawl4aiDockerClient
# Configuration
DOCKER_URL = "http://localhost:11235"
TEST_URLS = [
"https://www.kidocode.com",
"https://quotes.toscrape.com",
"https://httpbin.org/html",
]
def print_section(title: str, description: str = ""): def test_all_hooks_demo():
"""Print a formatted section header""" """Demonstrate all 8 hook types with practical examples"""
print("\n" + "=" * 70) print("=" * 70)
print(f" {title}") print("Testing: All Hooks Comprehensive Demo")
if description: print("=" * 70)
print(f" {description}")
print("=" * 70 + "\n")
hooks_code = {
"on_browser_created": """
async def hook(browser, **kwargs):
# Hook called after browser is created
print("[HOOK] on_browser_created - Browser is ready!")
# Browser-level configurations would go here
return browser
""",
def check_docker_service() -> bool:
"""Check if Docker service is running"""
try:
response = requests.get(f"{DOCKER_URL}/health", timeout=3)
return response.status_code == 200
except:
return False
# ============================================================================
# REUSABLE HOOK LIBRARY
# ============================================================================
async def performance_optimization_hook(page, context, **kwargs):
"""
Performance Hook: Block unnecessary resources to speed up crawling
"""
print(" [Hook] 🚀 Optimizing performance - blocking images and ads...")
# Block images
await context.route(
"**/*.{png,jpg,jpeg,gif,webp,svg,ico}",
lambda route: route.abort()
)
# Block ads and analytics
await context.route("**/analytics/*", lambda route: route.abort())
await context.route("**/ads/*", lambda route: route.abort())
await context.route("**/google-analytics.com/*", lambda route: route.abort())
print(" [Hook] ✓ Performance optimization applied")
return page
async def viewport_setup_hook(page, context, **kwargs):
"""
Viewport Hook: Set consistent viewport size for rendering
"""
print(" [Hook] 🖥️ Setting viewport to 1920x1080...")
await page.set_viewport_size({"width": 1920, "height": 1080})
print(" [Hook] ✓ Viewport configured")
return page
async def authentication_headers_hook(page, context, url, **kwargs):
"""
Headers Hook: Add custom authentication and tracking headers
"""
print(f" [Hook] 🔐 Adding custom headers for {url[:50]}...")
await page.set_extra_http_headers({
'X-Crawl4AI': 'docker-hooks',
'X-Custom-Hook': 'function-based',
'Accept-Language': 'en-US,en;q=0.9',
})
print(" [Hook] ✓ Custom headers added")
return page
async def lazy_loading_handler_hook(page, context, **kwargs):
"""
Content Hook: Handle lazy-loaded content by scrolling
"""
print(" [Hook] 📜 Scrolling to load lazy content...")
# Scroll to bottom
await page.evaluate("window.scrollTo(0, document.body.scrollHeight)")
await page.wait_for_timeout(1000)
# Scroll to middle
await page.evaluate("window.scrollTo(0, document.body.scrollHeight / 2)")
await page.wait_for_timeout(500)
# Scroll back to top
await page.evaluate("window.scrollTo(0, 0)")
await page.wait_for_timeout(500)
print(" [Hook] ✓ Lazy content loaded")
return page
async def page_analytics_hook(page, context, **kwargs):
"""
Analytics Hook: Log page metrics before extraction
"""
print(" [Hook] 📊 Collecting page analytics...")
metrics = await page.evaluate('''
() => ({
title: document.title,
images: document.images.length,
links: document.links.length,
scripts: document.scripts.length,
headings: document.querySelectorAll('h1, h2, h3').length,
paragraphs: document.querySelectorAll('p').length
})
''')
print(f" [Hook] 📈 Page: {metrics['title'][:50]}...")
print(f" Links: {metrics['links']}, Images: {metrics['images']}, "
f"Headings: {metrics['headings']}, Paragraphs: {metrics['paragraphs']}")
return page
# ============================================================================
# APPROACH 1: String-Based Hooks (REST API)
# ============================================================================
def example_1_string_based_hooks():
"""
Demonstrate string-based hooks with REST API
Use this when working with REST API directly or non-Python clients
"""
print_section(
"APPROACH 1: String-Based Hooks (REST API)",
"Define hooks as strings for REST API requests"
)
# Define hooks as strings
hooks_config = {
"on_page_context_created": """ "on_page_context_created": """
async def hook(page, context, **kwargs): async def hook(page, context, **kwargs):
print(" [String Hook] Setting up page context...") # Hook called after a new page and context are created
# Block images for performance print("[HOOK] on_page_context_created - New page created!")
await context.route("**/*.{png,jpg,jpeg,gif,webp}", lambda route: route.abort())
# Set viewport size for consistent rendering
await page.set_viewport_size({"width": 1920, "height": 1080}) await page.set_viewport_size({"width": 1920, "height": 1080})
# Add cookies for the session (using httpbin.org domain)
await context.add_cookies([
{
"name": "test_session",
"value": "abc123xyz",
"domain": ".httpbin.org",
"path": "/",
"httpOnly": True,
"secure": True
}
])
# Block ads and tracking scripts to speed up crawling
await context.route("**/*.{png,jpg,jpeg,gif,webp,svg}", lambda route: route.abort())
await context.route("**/analytics/*", lambda route: route.abort())
await context.route("**/ads/*", lambda route: route.abort())
print("[HOOK] Viewport set, cookies added, and ads blocked")
return page
""",
"on_user_agent_updated": """
async def hook(page, context, user_agent, **kwargs):
# Hook called when user agent is updated
print(f"[HOOK] on_user_agent_updated - User agent: {user_agent[:50]}...")
return page return page
""", """,
"before_goto": """ "before_goto": """
async def hook(page, context, url, **kwargs): async def hook(page, context, url, **kwargs):
print(f" [String Hook] Navigating to {url[:50]}...") # Hook called before navigating to each URL
print(f"[HOOK] before_goto - About to visit: {url}")
# Add custom headers for the request
await page.set_extra_http_headers({ await page.set_extra_http_headers({
'X-Crawl4AI': 'string-based-hooks', "X-Custom-Header": "crawl4ai-test",
"Accept-Language": "en-US,en;q=0.9",
"DNT": "1"
}) })
return page
""",
"after_goto": """
async def hook(page, context, url, response, **kwargs):
# Hook called after navigating to each URL
print(f"[HOOK] after_goto - Successfully loaded: {url}")
# Wait a moment for dynamic content to load
await page.wait_for_timeout(1000)
# Check if specific elements exist (with error handling)
try:
# For httpbin.org, wait for body element
await page.wait_for_selector("body", timeout=2000)
print("[HOOK] Body element found and loaded")
except:
print("[HOOK] Timeout waiting for body, continuing anyway")
return page
""",
"on_execution_started": """
async def hook(page, context, **kwargs):
# Hook called after custom JavaScript execution
print("[HOOK] on_execution_started - Custom JS executed!")
# You could inject additional JavaScript here if needed
await page.evaluate("console.log('[INJECTED] Hook JS running');")
return page return page
""", """,
"before_retrieve_html": """ "before_retrieve_html": """
async def hook(page, context, **kwargs): async def hook(page, context, **kwargs):
print(" [String Hook] Scrolling page...") # Hook called before retrieving the HTML content
await page.evaluate("window.scrollTo(0, document.body.scrollHeight)") print("[HOOK] before_retrieve_html - Preparing to get HTML")
await page.wait_for_timeout(1000)
# Scroll to bottom to trigger lazy loading
await page.evaluate("window.scrollTo(0, document.body.scrollHeight);")
await page.wait_for_timeout(500)
# Scroll back to top
await page.evaluate("window.scrollTo(0, 0);")
await page.wait_for_timeout(500)
# One more scroll to middle for good measure
await page.evaluate("window.scrollTo(0, document.body.scrollHeight / 2);")
print("[HOOK] Scrolling completed for lazy-loaded content")
return page
""",
"before_return_html": """
async def hook(page, context, html, **kwargs):
# Hook called before returning the HTML content
print(f"[HOOK] before_return_html - HTML length: {len(html)} characters")
# Log some page metrics
metrics = await page.evaluate('''() => {
return {
images: document.images.length,
links: document.links.length,
scripts: document.scripts.length
}
}''')
print(f"[HOOK] Page metrics - Images: {metrics['images']}, Links: {metrics['links']}, Scripts: {metrics['scripts']}")
return page return page
""" """
} }
# Prepare request payload # Create request payload
payload = { payload = {
"urls": [TEST_URLS[2]], # httpbin.org "urls": ["https://httpbin.org/html"],
"hooks": { "hooks": {
"code": hooks_config, "code": hooks_code,
"timeout": 30 "timeout": 30
}, },
"crawler_config": { "crawler_config": {
"js_code": "window.scrollTo(0, document.body.scrollHeight);",
"wait_for": "body",
"cache_mode": "bypass" "cache_mode": "bypass"
} }
} }
print(f"🎯 Target URL: {TEST_URLS[2]}") print("\nSending request with all 8 hooks...")
print(f"🔧 Configured {len(hooks_config)} string-based hooks") start_time = time.time()
print(f"📡 Sending request to Docker API...\n")
try: response = requests.post(f"{API_BASE_URL}/crawl", json=payload)
start_time = time.time()
response = requests.post(f"{DOCKER_URL}/crawl", json=payload, timeout=60)
execution_time = time.time() - start_time
if response.status_code == 200: elapsed_time = time.time() - start_time
result = response.json() print(f"Request completed in {elapsed_time:.2f} seconds")
print(f"\n✅ Request successful! (took {execution_time:.2f}s)") if response.status_code == 200:
data = response.json()
print("\n✅ Request successful!")
# Display results # Check hooks execution
if result.get('results') and result['results'][0].get('success'): if 'hooks' in data:
crawl_result = result['results'][0] hooks_info = data['hooks']
html_length = len(crawl_result.get('html', '')) print("\n📊 Hooks Execution Summary:")
markdown_length = len(crawl_result.get('markdown', '')) print(f" Status: {hooks_info['status']['status']}")
print(f" Attached hooks: {len(hooks_info['status']['attached_hooks'])}")
print(f"\n📊 Results:") for hook_name in hooks_info['status']['attached_hooks']:
print(f" • HTML length: {html_length:,} characters") print(f" {hook_name}")
print(f" • Markdown length: {markdown_length:,} characters")
print(f" • URL: {crawl_result.get('url')}")
# Check hooks execution if 'summary' in hooks_info:
if 'hooks' in result: summary = hooks_info['summary']
hooks_info = result['hooks'] print(f"\n📈 Execution Statistics:")
print(f"\n🎣 Hooks Execution:") print(f" Total executions: {summary['total_executions']}")
print(f" • Status: {hooks_info['status']['status']}") print(f" Successful: {summary['successful']}")
print(f" • Attached hooks: {len(hooks_info['status']['attached_hooks'])}") print(f" Failed: {summary['failed']}")
print(f" Timed out: {summary['timed_out']}")
print(f" Success rate: {summary['success_rate']:.1f}%")
if 'summary' in hooks_info: if hooks_info.get('execution_log'):
summary = hooks_info['summary'] print(f"\n📝 Execution Log:")
print(f" • Total executions: {summary['total_executions']}") for log_entry in hooks_info['execution_log']:
print(f" • Successful: {summary['successful']}") status_icon = "" if log_entry['status'] == 'success' else ""
print(f" • Success rate: {summary['success_rate']:.1f}%") exec_time = log_entry.get('execution_time', 0)
else: print(f" {status_icon} {log_entry['hook_point']}: {exec_time:.3f}s")
print(f"⚠️ Crawl completed but no results")
else: # Check crawl results
print(f"❌ Request failed with status {response.status_code}") if 'results' in data and len(data['results']) > 0:
print(f" Error: {response.text[:200]}") print(f"\n📄 Crawl Results:")
for result in data['results']:
print(f" URL: {result['url']}")
print(f" Success: {result.get('success', False)}")
if result.get('html'):
print(f" HTML length: {len(result['html'])} characters")
except requests.exceptions.Timeout: else:
print("⏰ Request timed out after 60 seconds") print(f"❌ Error: {response.status_code}")
except Exception as e: try:
print(f"❌ Error: {str(e)}") error_data = response.json()
print(f"Error details: {json.dumps(error_data, indent=2)}")
print("\n" + "" * 70) except:
print("✓ String-based hooks example complete\n") print(f"Error text: {response.text[:500]}")
# ============================================================================ def test_authentication_flow():
# APPROACH 2: Function-Based Hooks with hooks_to_string() Utility """Test a complete authentication flow with multiple hooks"""
# ============================================================================ print("\n" + "=" * 70)
print("Testing: Authentication Flow with Multiple Hooks")
def example_2_hooks_to_string_utility(): print("=" * 70)
"""
Demonstrate the hooks_to_string() utility for converting functions
Use this when you want to write hooks as functions but use REST API
"""
print_section(
"APPROACH 2: hooks_to_string() Utility",
"Convert Python functions to strings for REST API"
)
print("📦 Creating hook functions...")
print(" • performance_optimization_hook")
print(" • authentication_headers_hook")
print(" • lazy_loading_handler_hook")
# Convert function objects to strings using the utility
print("\n🔄 Converting functions to strings with hooks_to_string()...")
hooks_dict = {
"on_page_context_created": performance_optimization_hook,
"before_goto": authentication_headers_hook,
"before_retrieve_html": lazy_loading_handler_hook,
}
hooks_as_strings = hooks_to_string(hooks_dict)
print(f"✅ Successfully converted {len(hooks_as_strings)} functions to strings")
# Show a preview
print("\n📝 Sample converted hook (first 200 characters):")
print("" * 70)
sample_hook = list(hooks_as_strings.values())[0]
print(sample_hook[:200] + "...")
print("" * 70)
# Use the converted hooks with REST API
print("\n📡 Using converted hooks with REST API...")
payload = {
"urls": [TEST_URLS[2]],
"hooks": {
"code": hooks_as_strings,
"timeout": 30
}
}
try:
start_time = time.time()
response = requests.post(f"{DOCKER_URL}/crawl", json=payload, timeout=60)
execution_time = time.time() - start_time
if response.status_code == 200:
result = response.json()
print(f"\n✅ Request successful! (took {execution_time:.2f}s)")
if result.get('results') and result['results'][0].get('success'):
crawl_result = result['results'][0]
print(f" • HTML length: {len(crawl_result.get('html', '')):,} characters")
print(f" • Hooks executed successfully!")
else:
print(f"❌ Request failed: {response.status_code}")
except Exception as e:
print(f"❌ Error: {str(e)}")
print("\n💡 Benefits of hooks_to_string():")
print(" ✓ Write hooks as regular Python functions")
print(" ✓ Full IDE support (autocomplete, syntax highlighting)")
print(" ✓ Type checking and linting")
print(" ✓ Easy to test and debug")
print(" ✓ Reusable across projects")
print(" ✓ Works with any REST API client")
print("\n" + "" * 70)
print("✓ hooks_to_string() utility example complete\n")
# ============================================================================
# APPROACH 3: Docker Client with Automatic Conversion (RECOMMENDED)
# ============================================================================
async def example_3_docker_client_auto_conversion():
"""
Demonstrate Docker Client with automatic hook conversion (RECOMMENDED)
Use this for the best developer experience with Python
"""
print_section(
"APPROACH 3: Docker Client with Auto-Conversion (RECOMMENDED)",
"Pass function objects directly - conversion happens automatically!"
)
print("🐳 Initializing Crawl4AI Docker Client...")
client = Crawl4aiDockerClient(base_url=DOCKER_URL)
print("✅ Client ready!\n")
# Use our reusable hook library - just pass the function objects!
print("📚 Using reusable hook library:")
print(" • performance_optimization_hook")
print(" • authentication_headers_hook")
print(" • lazy_loading_handler_hook")
print(" • page_analytics_hook")
print("\n🎯 Target URL: " + TEST_URLS[0])
print("🚀 Starting crawl with automatic hook conversion...\n")
try:
start_time = time.time()
# Pass function objects directly - NO manual conversion needed! ✨
results = await client.crawl(
urls=[TEST_URLS[0]],
hooks={
"on_page_context_created": performance_optimization_hook,
"before_goto": authentication_headers_hook,
"before_retrieve_html": lazy_loading_handler_hook,
"before_return_html": page_analytics_hook,
},
hooks_timeout=30
)
execution_time = time.time() - start_time
print(f"\n✅ Crawl completed! (took {execution_time:.2f}s)\n")
# Display results
if results and results.success:
result = results
print(f"📊 Results:")
print(f" • URL: {result.url}")
print(f" • Success: {result.success}")
print(f" • HTML length: {len(result.html):,} characters")
print(f" • Markdown length: {len(result.markdown):,} characters")
# Show metadata
if result.metadata:
print(f"\n📋 Metadata:")
print(f" • Title: {result.metadata.get('title', 'N/A')[:50]}...")
# Show links
if result.links:
internal_count = len(result.links.get('internal', []))
external_count = len(result.links.get('external', []))
print(f"\n🔗 Links Found:")
print(f" • Internal: {internal_count}")
print(f" • External: {external_count}")
else:
print(f"⚠️ Crawl completed but no successful results")
if results:
print(f" Error: {results.error_message}")
except Exception as e:
print(f"❌ Error: {str(e)}")
import traceback
traceback.print_exc()
print("\n🌟 Why Docker Client is RECOMMENDED:")
print(" ✓ Automatic function-to-string conversion")
print(" ✓ No manual hooks_to_string() calls needed")
print(" ✓ Cleaner, more Pythonic code")
print(" ✓ Full type hints and IDE support")
print(" ✓ Built-in error handling")
print(" ✓ Async/await support")
print("\n" + "" * 70)
print("✓ Docker Client auto-conversion example complete\n")
# ============================================================================
# APPROACH 4: Authentication Example
# ============================================================================
def example_4_authentication_flow():
"""
Demonstrate authentication flow with multiple hooks
"""
print_section(
"EXAMPLE 4: Authentication Flow",
"Using hooks for authentication with cookies and headers"
)
hooks_code = { hooks_code = {
"on_page_context_created": """ "on_page_context_created": """
@@ -458,6 +242,12 @@ async def hook(page, context, **kwargs):
} }
]) ])
# Set localStorage items (for SPA authentication)
await page.evaluate('''
localStorage.setItem('user_id', '12345');
localStorage.setItem('auth_time', new Date().toISOString());
''')
return page return page
""", """,
@@ -479,7 +269,9 @@ async def hook(page, context, url, **kwargs):
} }
payload = { payload = {
"urls": ["https://httpbin.org/basic-auth/user/passwd"], "urls": [
"https://httpbin.org/basic-auth/user/passwd"
],
"hooks": { "hooks": {
"code": hooks_code, "code": hooks_code,
"timeout": 15 "timeout": 15
@@ -487,7 +279,7 @@ async def hook(page, context, url, **kwargs):
} }
print("\nTesting authentication with httpbin endpoints...") print("\nTesting authentication with httpbin endpoints...")
response = requests.post(f"{DOCKER_URL}/crawl", json=payload) response = requests.post(f"{API_BASE_URL}/crawl", json=payload)
if response.status_code == 200: if response.status_code == 200:
data = response.json() data = response.json()
@@ -508,120 +300,214 @@ async def hook(page, context, url, **kwargs):
else: else:
print(f"❌ Error: {response.status_code}") print(f"❌ Error: {response.status_code}")
print("\n" + "" * 70)
print("✓ Authentication example complete\n")
def test_performance_optimization_hooks():
# ============================================================================ """Test hooks for performance optimization"""
# MAIN EXECUTION
# ============================================================================
async def main():
"""
Run all example demonstrations
"""
print("\n" + "=" * 70) print("\n" + "=" * 70)
print(" 🚀 Crawl4AI - Docker Hooks System Examples") print("Testing: Performance Optimization Hooks")
print("=" * 70) print("=" * 70)
# Check Docker service hooks_code = {
print("\n🔍 Checking Docker service status...") "on_page_context_created": """
if not check_docker_service(): async def hook(page, context, **kwargs):
print("❌ Docker service is not running!") print("[HOOK] Optimizing page for performance")
print("\n📋 To start the Docker service:")
print(" docker run -p 11235:11235 unclecode/crawl4ai:latest")
print("\nPlease start the service and run this example again.")
return
print("✅ Docker service is running!\n") # Block resource-heavy content
await context.route("**/*.{png,jpg,jpeg,gif,webp,svg,ico}", lambda route: route.abort())
await context.route("**/*.{woff,woff2,ttf,otf}", lambda route: route.abort())
await context.route("**/*.{mp4,webm,ogg,mp3,wav}", lambda route: route.abort())
await context.route("**/googletagmanager.com/*", lambda route: route.abort())
await context.route("**/google-analytics.com/*", lambda route: route.abort())
await context.route("**/doubleclick.net/*", lambda route: route.abort())
await context.route("**/facebook.com/*", lambda route: route.abort())
# Run all examples # Disable animations and transitions
examples = [ await page.add_style_tag(content='''
("String-Based Hooks (REST API)", example_1_string_based_hooks, False), *, *::before, *::after {
("hooks_to_string() Utility", example_2_hooks_to_string_utility, False), animation-duration: 0s !important;
("Docker Client Auto-Conversion (Recommended)", example_3_docker_client_auto_conversion, True), animation-delay: 0s !important;
("Authentication Flow", example_4_authentication_flow, False), transition-duration: 0s !important;
transition-delay: 0s !important;
}
''')
print("[HOOK] Performance optimizations applied")
return page
""",
"before_retrieve_html": """
async def hook(page, context, **kwargs):
print("[HOOK] Removing unnecessary elements before extraction")
# Remove ads, popups, and other unnecessary elements
await page.evaluate('''() => {
// Remove common ad containers
const adSelectors = [
'.ad', '.ads', '.advertisement', '[id*="ad-"]', '[class*="ad-"]',
'.popup', '.modal', '.overlay', '.cookie-banner', '.newsletter-signup'
];
adSelectors.forEach(selector => {
document.querySelectorAll(selector).forEach(el => el.remove());
});
// Remove script tags to clean up HTML
document.querySelectorAll('script').forEach(el => el.remove());
// Remove style tags we don't need
document.querySelectorAll('style').forEach(el => el.remove());
}''')
return page
"""
}
payload = {
"urls": ["https://httpbin.org/html"],
"hooks": {
"code": hooks_code,
"timeout": 10
}
}
print("\nTesting performance optimization hooks...")
start_time = time.time()
response = requests.post(f"{API_BASE_URL}/crawl", json=payload)
elapsed_time = time.time() - start_time
print(f"Request completed in {elapsed_time:.2f} seconds")
if response.status_code == 200:
data = response.json()
print("✅ Performance optimization test completed")
if 'results' in data and len(data['results']) > 0:
result = data['results'][0]
if result.get('html'):
print(f" HTML size: {len(result['html'])} characters")
print(" Resources blocked, ads removed, animations disabled")
else:
print(f"❌ Error: {response.status_code}")
def test_content_extraction_hooks():
"""Test hooks for intelligent content extraction"""
print("\n" + "=" * 70)
print("Testing: Content Extraction Hooks")
print("=" * 70)
hooks_code = {
"after_goto": """
async def hook(page, context, url, response, **kwargs):
print(f"[HOOK] Waiting for dynamic content on {url}")
# Wait for any lazy-loaded content
await page.wait_for_timeout(2000)
# Trigger any "Load More" buttons
try:
load_more = await page.query_selector('[class*="load-more"], [class*="show-more"], button:has-text("Load More")')
if load_more:
await load_more.click()
await page.wait_for_timeout(1000)
print("[HOOK] Clicked 'Load More' button")
except:
pass
return page
""",
"before_retrieve_html": """
async def hook(page, context, **kwargs):
print("[HOOK] Extracting structured data")
# Extract metadata
metadata = await page.evaluate('''() => {
const getMeta = (name) => {
const element = document.querySelector(`meta[name="${name}"], meta[property="${name}"]`);
return element ? element.getAttribute('content') : null;
};
return {
title: document.title,
description: getMeta('description') || getMeta('og:description'),
author: getMeta('author'),
keywords: getMeta('keywords'),
ogTitle: getMeta('og:title'),
ogImage: getMeta('og:image'),
canonical: document.querySelector('link[rel="canonical"]')?.href,
jsonLd: Array.from(document.querySelectorAll('script[type="application/ld+json"]'))
.map(el => el.textContent).filter(Boolean)
};
}''')
print(f"[HOOK] Extracted metadata: {json.dumps(metadata, indent=2)}")
# Infinite scroll handling
for i in range(3):
await page.evaluate("window.scrollTo(0, document.body.scrollHeight);")
await page.wait_for_timeout(1000)
print(f"[HOOK] Scroll iteration {i+1}/3")
return page
"""
}
payload = {
"urls": ["https://httpbin.org/html", "https://httpbin.org/json"],
"hooks": {
"code": hooks_code,
"timeout": 20
}
}
print("\nTesting content extraction hooks...")
response = requests.post(f"{API_BASE_URL}/crawl", json=payload)
if response.status_code == 200:
data = response.json()
print("✅ Content extraction test completed")
if 'hooks' in data and 'summary' in data['hooks']:
summary = data['hooks']['summary']
print(f" Hooks executed: {summary['successful']}/{summary['total_executions']}")
if 'results' in data:
for result in data['results']:
print(f"\n URL: {result['url']}")
print(f" Success: {result.get('success', False)}")
else:
print(f"❌ Error: {response.status_code}")
def main():
"""Run comprehensive hook tests"""
print("🔧 Crawl4AI Docker API - Comprehensive Hooks Testing")
print("Based on docs/examples/hooks_example.py")
print("=" * 70)
tests = [
("All Hooks Demo", test_all_hooks_demo),
("Authentication Flow", test_authentication_flow),
("Performance Optimization", test_performance_optimization_hooks),
("Content Extraction", test_content_extraction_hooks),
] ]
for i, (name, example_func, is_async) in enumerate(examples, 1): for i, (name, test_func) in enumerate(tests, 1):
print(f"\n{'🔷' * 35}") print(f"\n📌 Test {i}/{len(tests)}: {name}")
print(f"Example {i}/{len(examples)}: {name}")
print(f"{'🔷' * 35}\n")
try: try:
if is_async: test_func()
await example_func() print(f"{name} completed")
else:
example_func()
print(f"✅ Example {i} completed successfully!")
# Pause between examples (except the last one)
if i < len(examples):
print("\n⏸️ Press Enter to continue to next example...")
input()
except KeyboardInterrupt:
print(f"\n⏹️ Examples interrupted by user")
break
except Exception as e: except Exception as e:
print(f"\n❌ Example {i} failed: {str(e)}") print(f" {name} failed: {e}")
import traceback import traceback
traceback.print_exc() traceback.print_exc()
print("\nContinuing to next example...\n")
continue
# Final summary
print("\n" + "=" * 70) print("\n" + "=" * 70)
print(" 🎉 All Examples Complete!") print("🎉 All comprehensive hook tests completed!")
print("=" * 70) print("=" * 70)
print("\n📊 Summary - Three Approaches to Docker Hooks:")
print("\n✨ 1. String-Based Hooks:")
print(" • Write hooks as strings directly in JSON")
print(" • Best for: REST API, non-Python clients, simple use cases")
print(" • Cons: No IDE support, harder to debug")
print("\n✨ 2. hooks_to_string() Utility:")
print(" • Write hooks as Python functions, convert to strings")
print(" • Best for: Python with REST API, reusable hook libraries")
print(" • Pros: IDE support, type checking, easy debugging")
print("\n✨ 3. Docker Client (RECOMMENDED):")
print(" • Pass function objects directly, automatic conversion")
print(" • Best for: Python applications, best developer experience")
print(" • Pros: All benefits of #2 + cleaner code, no manual conversion")
print("\n💡 Recommendation:")
print(" Use Docker Client (#3) for Python applications")
print(" Use hooks_to_string() (#2) when you need REST API flexibility")
print(" Use string-based (#1) for non-Python clients or simple scripts")
print("\n🎯 8 Hook Points Available:")
print(" • on_browser_created, on_page_context_created")
print(" • on_user_agent_updated, before_goto, after_goto")
print(" • on_execution_started, before_retrieve_html, before_return_html")
print("\n📚 Resources:")
print(" • Docs: https://docs.crawl4ai.com/core/docker-deployment")
print(" • GitHub: https://github.com/unclecode/crawl4ai")
print(" • Discord: https://discord.gg/jP8KfhDhyN")
print("\n" + "=" * 70)
print(" Happy Crawling! 🕷️")
print("=" * 70 + "\n")
if __name__ == "__main__": if __name__ == "__main__":
print("\n🎬 Starting Crawl4AI Docker Hooks Examples...") main()
print("Press Ctrl+C anytime to exit\n")
try:
asyncio.run(main())
except KeyboardInterrupt:
print("\n\n👋 Examples stopped by user. Thanks for exploring Crawl4AI!")
except Exception as e:
print(f"\n\n❌ Error: {str(e)}")
import traceback
traceback.print_exc()

View File

@@ -82,42 +82,6 @@ If you installed Crawl4AI (which installs Playwright under the hood), you alread
--- ---
### Creating a Profile Using the Crawl4AI CLI (Easiest)
If you prefer a guided, interactive setup, use the built-in CLI to create and manage persistent browser profiles.
1.Launch the profile manager:
```bash
crwl profiles
```
2.Choose "Create new profile" and enter a profile name. A Chromium window opens so you can log in to sites and configure settings. When finished, return to the terminal and press `q` to save the profile.
3.Profiles are saved under `~/.crawl4ai/profiles/<profile_name>` (for example: `/home/<you>/.crawl4ai/profiles/test_profile_1`) along with a `storage_state.json` for cookies and session data.
4.Optionally, choose "List profiles" in the CLI to view available profiles and their paths.
5.Use the saved path with `BrowserConfig.user_data_dir`:
```python
from crawl4ai import AsyncWebCrawler, BrowserConfig
profile_path = "/home/<you>/.crawl4ai/profiles/test_profile_1"
browser_config = BrowserConfig(
headless=True,
use_managed_browser=True,
user_data_dir=profile_path,
browser_type="chromium",
)
async with AsyncWebCrawler(config=browser_config) as crawler:
result = await crawler.arun(url="https://example.com/private")
```
The CLI also supports listing and deleting profiles, and even testing a crawl directly from the menu.
---
## 3. Using Managed Browsers in Crawl4AI ## 3. Using Managed Browsers in Crawl4AI
Once you have a data directory with your session data, pass it to **`BrowserConfig`**: Once you have a data directory with your session data, pass it to **`BrowserConfig`**:

View File

@@ -1,304 +1,98 @@
# Proxy & Security # Proxy
This guide covers proxy configuration and security features in Crawl4AI, including SSL certificate analysis and proxy rotation strategies.
## Understanding Proxy Configuration
Crawl4AI recommends configuring proxies per request through `CrawlerRunConfig.proxy_config`. This gives you precise control, enables rotation strategies, and keeps examples simple enough to copy, paste, and run.
## Basic Proxy Setup ## Basic Proxy Setup
Configure proxies that apply to each crawl operation: Simple proxy configuration with `BrowserConfig`:
```python ```python
import asyncio from crawl4ai.async_configs import BrowserConfig
from crawl4ai import AsyncWebCrawler, BrowserConfig, CrawlerRunConfig, ProxyConfig
run_config = CrawlerRunConfig(proxy_config=ProxyConfig(server="http://proxy.example.com:8080")) # Using HTTP proxy
# run_config = CrawlerRunConfig(proxy_config={"server": "http://proxy.example.com:8080"}) browser_config = BrowserConfig(proxy_config={"server": "http://proxy.example.com:8080"})
# run_config = CrawlerRunConfig(proxy_config="http://proxy.example.com:8080") async with AsyncWebCrawler(config=browser_config) as crawler:
result = await crawler.arun(url="https://example.com")
# Using SOCKS proxy
async def main(): browser_config = BrowserConfig(proxy_config={"server": "socks5://proxy.example.com:1080"})
browser_config = BrowserConfig() async with AsyncWebCrawler(config=browser_config) as crawler:
async with AsyncWebCrawler(config=browser_config) as crawler: result = await crawler.arun(url="https://example.com")
result = await crawler.arun(url="https://example.com", config=run_config)
print(f"Success: {result.success} -> {result.url}")
if __name__ == "__main__":
asyncio.run(main())
``` ```
!!! note "Why request-level?" ## Authenticated Proxy
`CrawlerRunConfig.proxy_config` keeps each request self-contained, so swapping proxies or rotation strategies is just a matter of building a new run configuration.
## Supported Proxy Formats Use an authenticated proxy with `BrowserConfig`:
The `ProxyConfig.from_string()` method supports multiple formats:
```python ```python
from crawl4ai import ProxyConfig from crawl4ai.async_configs import BrowserConfig
# HTTP proxy with authentication browser_config = BrowserConfig(proxy_config={
proxy1 = ProxyConfig.from_string("http://user:pass@192.168.1.1:8080") "server": "http://[host]:[port]",
"username": "[username]",
# HTTPS proxy "password": "[password]",
proxy2 = ProxyConfig.from_string("https://proxy.example.com:8080") })
async with AsyncWebCrawler(config=browser_config) as crawler:
# SOCKS5 proxy result = await crawler.arun(url="https://example.com")
proxy3 = ProxyConfig.from_string("socks5://proxy.example.com:1080")
# Simple IP:port format
proxy4 = ProxyConfig.from_string("192.168.1.1:8080")
# IP:port:user:pass format
proxy5 = ProxyConfig.from_string("192.168.1.1:8080:user:pass")
``` ```
## Authenticated Proxies
For proxies requiring authentication:
```python
import asyncio
from crawl4ai import AsyncWebCrawler,BrowserConfig, CrawlerRunConfig, ProxyConfig
run_config = CrawlerRunConfig(
proxy_config=ProxyConfig(
server="http://proxy.example.com:8080",
username="your_username",
password="your_password",
)
)
# Or dictionary style:
# run_config = CrawlerRunConfig(proxy_config={
# "server": "http://proxy.example.com:8080",
# "username": "your_username",
# "password": "your_password",
# })
async def main():
browser_config = BrowserConfig()
async with AsyncWebCrawler(config=browser_config) as crawler:
result = await crawler.arun(url="https://example.com", config=run_config)
print(f"Success: {result.success} -> {result.url}")
if __name__ == "__main__":
asyncio.run(main())
```
## Environment Variable Configuration
Load proxies from environment variables for easy configuration:
```python
import os
from crawl4ai import ProxyConfig, CrawlerRunConfig
# Set environment variable
os.environ["PROXIES"] = "ip1:port1:user1:pass1,ip2:port2:user2:pass2,ip3:port3"
# Load all proxies
proxies = ProxyConfig.from_env()
print(f"Loaded {len(proxies)} proxies")
# Use first proxy
if proxies:
run_config = CrawlerRunConfig(proxy_config=proxies[0])
```
## Rotating Proxies ## Rotating Proxies
Crawl4AI supports automatic proxy rotation to distribute requests across multiple proxy servers. Rotation is applied per request using a rotation strategy on `CrawlerRunConfig`. Example using a proxy rotation service dynamically:
### Proxy Rotation (recommended)
```python ```python
import asyncio
import re import re
from crawl4ai import AsyncWebCrawler, BrowserConfig, CrawlerRunConfig, CacheMode, ProxyConfig from crawl4ai import (
from crawl4ai.proxy_strategy import RoundRobinProxyStrategy AsyncWebCrawler,
BrowserConfig,
CrawlerRunConfig,
CacheMode,
RoundRobinProxyStrategy,
)
import asyncio
from crawl4ai import ProxyConfig
async def main(): async def main():
# Load proxies from environment # Load proxies and create rotation strategy
proxies = ProxyConfig.from_env() proxies = ProxyConfig.from_env()
#eg: export PROXIES="ip1:port1:username1:password1,ip2:port2:username2:password2"
if not proxies: if not proxies:
print("No proxies found! Set PROXIES environment variable.") print("No proxies found in environment. Set PROXIES env variable!")
return return
# Create rotation strategy
proxy_strategy = RoundRobinProxyStrategy(proxies) proxy_strategy = RoundRobinProxyStrategy(proxies)
# Configure per-request with proxy rotation # Create configs
browser_config = BrowserConfig(headless=True, verbose=False) browser_config = BrowserConfig(headless=True, verbose=False)
run_config = CrawlerRunConfig( run_config = CrawlerRunConfig(
cache_mode=CacheMode.BYPASS, cache_mode=CacheMode.BYPASS,
proxy_rotation_strategy=proxy_strategy, proxy_rotation_strategy=proxy_strategy
) )
async with AsyncWebCrawler(config=browser_config) as crawler: async with AsyncWebCrawler(config=browser_config) as crawler:
urls = ["https://httpbin.org/ip"] * (len(proxies) * 2) # Test each proxy twice urls = ["https://httpbin.org/ip"] * (len(proxies) * 2) # Test each proxy twice
print(f"🚀 Testing {len(proxies)} proxies with rotation...") print("\n📈 Initializing crawler with proxy rotation...")
results = await crawler.arun_many(urls=urls, config=run_config) async with AsyncWebCrawler(config=browser_config) as crawler:
print("\n🚀 Starting batch crawl with proxy rotation...")
results = await crawler.arun_many(
urls=urls,
config=run_config
)
for result in results:
if result.success:
ip_match = re.search(r'(?:[0-9]{1,3}\.){3}[0-9]{1,3}', result.html)
current_proxy = run_config.proxy_config if run_config.proxy_config else None
for i, result in enumerate(results): if current_proxy and ip_match:
if result.success: print(f"URL {result.url}")
# Extract IP from response print(f"Proxy {current_proxy.server} -> Response IP: {ip_match.group(0)}")
ip_match = re.search(r'(?:[0-9]{1,3}\.){3}[0-9]{1,3}', result.html) verified = ip_match.group(0) == current_proxy.ip
if ip_match: if verified:
detected_ip = ip_match.group(0) print(f"✅ Proxy working! IP matches: {current_proxy.ip}")
proxy_index = i % len(proxies) else:
expected_ip = proxies[proxy_index].ip print("❌ Proxy failed or IP mismatch!")
print("---")
print(f"✅ Request {i+1}: Proxy {proxy_index+1} -> IP {detected_ip}") asyncio.run(main())
if detected_ip == expected_ip:
print(" 🎯 IP matches proxy configuration")
else:
print(f" ⚠️ IP mismatch (expected {expected_ip})")
else:
print(f"❌ Request {i+1}: Could not extract IP from response")
else:
print(f"❌ Request {i+1}: Failed - {result.error_message}")
if __name__ == "__main__":
asyncio.run(main())
``` ```
## SSL Certificate Analysis
Combine proxy usage with SSL certificate inspection for enhanced security analysis. SSL certificate fetching is configured per request via `CrawlerRunConfig`.
### Per-Request SSL Certificate Analysis
```python
import asyncio
from crawl4ai import AsyncWebCrawler, BrowserConfig, CrawlerRunConfig
run_config = CrawlerRunConfig(
proxy_config={
"server": "http://proxy.example.com:8080",
"username": "user",
"password": "pass",
},
fetch_ssl_certificate=True, # Enable SSL certificate analysis for this request
)
async def main():
browser_config = BrowserConfig()
async with AsyncWebCrawler(config=browser_config) as crawler:
result = await crawler.arun(url="https://example.com", config=run_config)
if result.success:
print(f"✅ Crawled via proxy: {result.url}")
# Analyze SSL certificate
if result.ssl_certificate:
cert = result.ssl_certificate
print("🔒 SSL Certificate Info:")
print(f" Issuer: {cert.issuer}")
print(f" Subject: {cert.subject}")
print(f" Valid until: {cert.valid_until}")
print(f" Fingerprint: {cert.fingerprint}")
# Export certificate
cert.to_json("certificate.json")
print("💾 Certificate exported to certificate.json")
else:
print("⚠️ No SSL certificate information available")
if __name__ == "__main__":
asyncio.run(main())
```
## Security Best Practices
### 1. Proxy Rotation for Anonymity
```python
from crawl4ai import CrawlerRunConfig, ProxyConfig
from crawl4ai.proxy_strategy import RoundRobinProxyStrategy
# Use multiple proxies to avoid IP blocking
proxies = ProxyConfig.from_env("PROXIES")
strategy = RoundRobinProxyStrategy(proxies)
# Configure rotation per request (recommended)
run_config = CrawlerRunConfig(proxy_rotation_strategy=strategy)
# For a fixed proxy across all requests, just reuse the same run_config instance
static_run_config = run_config
```
### 2. SSL Certificate Verification
```python
from crawl4ai import CrawlerRunConfig
# Always verify SSL certificates when possible
# Per-request (affects specific requests)
run_config = CrawlerRunConfig(fetch_ssl_certificate=True)
```
### 3. Environment Variable Security
```bash
# Use environment variables for sensitive proxy credentials
# Avoid hardcoding usernames/passwords in code
export PROXIES="ip1:port1:user1:pass1,ip2:port2:user2:pass2"
```
### 4. SOCKS5 for Enhanced Security
```python
from crawl4ai import CrawlerRunConfig
# Prefer SOCKS5 proxies for better protocol support
run_config = CrawlerRunConfig(proxy_config="socks5://proxy.example.com:1080")
```
## Migration from Deprecated `proxy` Parameter
!!! warning "Deprecation Notice"
The legacy `proxy` argument on `BrowserConfig` is deprecated. Configure proxies through `CrawlerRunConfig.proxy_config` so each request fully describes its network settings.
```python
# Old (deprecated) approach
# from crawl4ai import BrowserConfig
# browser_config = BrowserConfig(proxy="http://proxy.example.com:8080")
# New (preferred) approach
from crawl4ai import CrawlerRunConfig
run_config = CrawlerRunConfig(proxy_config="http://proxy.example.com:8080")
```
### Safe Logging of Proxies
```python
from crawl4ai import ProxyConfig
def safe_proxy_repr(proxy: ProxyConfig):
if getattr(proxy, "username", None):
return f"{proxy.server} (auth: ****)"
return proxy.server
```
## Troubleshooting
### Common Issues
???+ question "Proxy connection failed"
- Verify the proxy server is reachable from your network.
- Double-check authentication credentials.
- Ensure the protocol matches (`http`, `https`, or `socks5`).
???+ question "SSL certificate errors"
- Some proxies break SSL inspection; switch proxies if you see repeated failures.
- Consider temporarily disabling certificate fetching to isolate the issue.
???+ question "Environment variables not loading"
- Confirm `PROXIES` (or your custom env var) is set before running the script.
- Check formatting: `ip:port:user:pass,ip:port:user:pass`.
???+ question "Proxy rotation not working"
- Ensure `ProxyConfig.from_env()` actually loaded entries (`len(proxies) > 0`).
- Attach `proxy_rotation_strategy` to `CrawlerRunConfig`.
- Validate the proxy definitions you pass into the strategy.

View File

@@ -18,7 +18,7 @@ A comprehensive web-based tutorial for learning and experimenting with C4A-Scrip
2. **Install Dependencies** 2. **Install Dependencies**
```bash ```bash
pip install -r requirements.txt pip install flask
``` ```
3. **Launch the Server** 3. **Launch the Server**
@@ -28,7 +28,7 @@ A comprehensive web-based tutorial for learning and experimenting with C4A-Scrip
4. **Open in Browser** 4. **Open in Browser**
``` ```
http://localhost:8000 http://localhost:8080
``` ```
**🌐 Try Online**: [Live Demo](https://docs.crawl4ai.com/c4a-script/demo) **🌐 Try Online**: [Live Demo](https://docs.crawl4ai.com/c4a-script/demo)
@@ -325,7 +325,7 @@ Powers the recording functionality:
### Configuration ### Configuration
```python ```python
# server.py configuration # server.py configuration
PORT = 8000 PORT = 8080
DEBUG = True DEBUG = True
THREADED = True THREADED = True
``` ```
@@ -343,9 +343,9 @@ THREADED = True
**Port Already in Use** **Port Already in Use**
```bash ```bash
# Kill existing process # Kill existing process
lsof -ti:8000 | xargs kill -9 lsof -ti:8080 | xargs kill -9
# Or use different port # Or use different port
python server.py --port 8001 python server.py --port 8081
``` ```
**Blockly Not Loading** **Blockly Not Loading**

View File

@@ -216,7 +216,7 @@ def get_examples():
'name': 'Handle Cookie Banner', 'name': 'Handle Cookie Banner',
'description': 'Accept cookies and close newsletter popup', 'description': 'Accept cookies and close newsletter popup',
'script': '''# Handle cookie banner and newsletter 'script': '''# Handle cookie banner and newsletter
GO http://127.0.0.1:8000/playground/ GO http://127.0.0.1:8080/playground/
WAIT `body` 2 WAIT `body` 2
IF (EXISTS `.cookie-banner`) THEN CLICK `.accept` IF (EXISTS `.cookie-banner`) THEN CLICK `.accept`
IF (EXISTS `.newsletter-popup`) THEN CLICK `.close`''' IF (EXISTS `.newsletter-popup`) THEN CLICK `.close`'''
@@ -283,7 +283,7 @@ WAIT `.success-message` 5'''
return jsonify(examples) return jsonify(examples)
if __name__ == '__main__': if __name__ == '__main__':
port = int(os.environ.get('PORT', 8000)) port = int(os.environ.get('PORT', 8080))
print(f""" print(f"""
╔══════════════════════════════════════════════════════════╗ ╔══════════════════════════════════════════════════════════╗
║ C4A-Script Interactive Tutorial Server ║ ║ C4A-Script Interactive Tutorial Server ║

View File

@@ -69,12 +69,12 @@ The tutorial includes a Flask-based web interface with:
cd docs/examples/c4a_script/tutorial/ cd docs/examples/c4a_script/tutorial/
# Install dependencies # Install dependencies
pip install -r requirements.txt pip install flask
# Launch the tutorial server # Launch the tutorial server
python server.py python app.py
# Open http://localhost:8000 in your browser # Open http://localhost:5000 in your browser
``` ```
## Core Concepts ## Core Concepts
@@ -111,8 +111,8 @@ CLICK `.submit-btn`
# By attribute # By attribute
CLICK `button[type="submit"]` CLICK `button[type="submit"]`
# By accessible attributes # By text content
CLICK `button[aria-label="Search"][title="Search"]` CLICK `button:contains("Sign In")`
# Complex selectors # Complex selectors
CLICK `.form-container input[name="email"]` CLICK `.form-container input[name="email"]`

View File

@@ -27,14 +27,6 @@
- [Hook Response Information](#hook-response-information) - [Hook Response Information](#hook-response-information)
- [Error Handling](#error-handling) - [Error Handling](#error-handling)
- [Hooks Utility: Function-Based Approach (Python)](#hooks-utility-function-based-approach-python) - [Hooks Utility: Function-Based Approach (Python)](#hooks-utility-function-based-approach-python)
- [Job Queue & Webhook API](#job-queue-webhook-api)
- [Why Use the Job Queue API?](#why-use-the-job-queue-api)
- [Available Endpoints](#available-endpoints)
- [Webhook Configuration](#webhook-configuration)
- [Usage Examples](#usage-examples)
- [Webhook Best Practices](#webhook-best-practices)
- [Use Cases](#use-cases)
- [Troubleshooting](#troubleshooting)
- [Dockerfile Parameters](#dockerfile-parameters) - [Dockerfile Parameters](#dockerfile-parameters)
- [Using the API](#using-the-api) - [Using the API](#using-the-api)
- [Playground Interface](#playground-interface) - [Playground Interface](#playground-interface)
@@ -1118,464 +1110,6 @@ if __name__ == "__main__":
--- ---
## Job Queue & Webhook API
The Docker deployment includes a powerful asynchronous job queue system with webhook support for both crawling and LLM extraction tasks. Instead of waiting for long-running operations to complete, submit jobs and receive real-time notifications via webhooks when they finish.
### Why Use the Job Queue API?
**Traditional Synchronous API (`/crawl`):**
- Client waits for entire crawl to complete
- Timeout issues with long-running crawls
- Resource blocking during execution
- Constant polling required for status updates
**Asynchronous Job Queue API (`/crawl/job`, `/llm/job`):**
- ✅ Submit job and continue immediately
- ✅ No timeout concerns for long operations
- ✅ Real-time webhook notifications on completion
- ✅ Better resource utilization
- ✅ Perfect for batch processing
- ✅ Ideal for microservice architectures
### Available Endpoints
#### 1. Crawl Job Endpoint
```
POST /crawl/job
```
Submit an asynchronous crawl job with optional webhook notification.
**Request Body:**
```json
{
"urls": ["https://example.com"],
"cache_mode": "bypass",
"extraction_strategy": {
"type": "JsonCssExtractionStrategy",
"schema": {
"title": "h1",
"content": ".article-body"
}
},
"webhook_config": {
"webhook_url": "https://your-app.com/webhook/crawl-complete",
"webhook_data_in_payload": true,
"webhook_headers": {
"X-Webhook-Secret": "your-secret-token",
"X-Custom-Header": "value"
}
}
}
```
**Response:**
```json
{
"task_id": "crawl_1698765432",
"message": "Crawl job submitted"
}
```
#### 2. LLM Extraction Job Endpoint
```
POST /llm/job
```
Submit an asynchronous LLM extraction job with optional webhook notification.
**Request Body:**
```json
{
"url": "https://example.com/article",
"q": "Extract the article title, author, publication date, and main points",
"provider": "openai/gpt-4o-mini",
"schema": "{\"title\": \"string\", \"author\": \"string\", \"date\": \"string\", \"points\": [\"string\"]}",
"cache": false,
"webhook_config": {
"webhook_url": "https://your-app.com/webhook/llm-complete",
"webhook_data_in_payload": true,
"webhook_headers": {
"X-Webhook-Secret": "your-secret-token"
}
}
}
```
**Response:**
```json
{
"task_id": "llm_1698765432",
"message": "LLM job submitted"
}
```
#### 3. Job Status Endpoint
```
GET /job/{task_id}
```
Check the status and retrieve results of a submitted job.
**Response (In Progress):**
```json
{
"task_id": "crawl_1698765432",
"status": "processing",
"message": "Job is being processed"
}
```
**Response (Completed):**
```json
{
"task_id": "crawl_1698765432",
"status": "completed",
"result": {
"markdown": "# Page Title\n\nContent...",
"extracted_content": {...},
"links": {...}
}
}
```
### Webhook Configuration
Webhooks provide real-time notifications when your jobs complete, eliminating the need for constant polling.
#### Webhook Config Parameters
| Parameter | Type | Required | Description |
|-----------|------|----------|-------------|
| `webhook_url` | string | Yes | Your HTTP(S) endpoint to receive notifications |
| `webhook_data_in_payload` | boolean | No | Include full result data in webhook payload (default: false) |
| `webhook_headers` | object | No | Custom headers for authentication/identification |
#### Webhook Payload Format
**Success Notification (Crawl Job):**
```json
{
"task_id": "crawl_1698765432",
"task_type": "crawl",
"status": "completed",
"timestamp": "2025-10-22T12:30:00.000000+00:00",
"urls": ["https://example.com"],
"data": {
"markdown": "# Page content...",
"extracted_content": {...},
"links": {...}
}
}
```
**Success Notification (LLM Job):**
```json
{
"task_id": "llm_1698765432",
"task_type": "llm_extraction",
"status": "completed",
"timestamp": "2025-10-22T12:30:00.000000+00:00",
"urls": ["https://example.com/article"],
"data": {
"extracted_content": {
"title": "Understanding Web Scraping",
"author": "John Doe",
"date": "2025-10-22",
"points": ["Point 1", "Point 2"]
}
}
}
```
**Failure Notification:**
```json
{
"task_id": "crawl_1698765432",
"task_type": "crawl",
"status": "failed",
"timestamp": "2025-10-22T12:30:00.000000+00:00",
"urls": ["https://example.com"],
"error": "Connection timeout after 30 seconds"
}
```
#### Webhook Delivery & Retry
- **Delivery Method:** HTTP POST to your `webhook_url`
- **Content-Type:** `application/json`
- **Retry Policy:** Exponential backoff with 5 attempts
- Attempt 1: Immediate
- Attempt 2: 1 second delay
- Attempt 3: 2 seconds delay
- Attempt 4: 4 seconds delay
- Attempt 5: 8 seconds delay
- **Success Status Codes:** 200-299
- **Custom Headers:** Your `webhook_headers` are included in every request
### Usage Examples
#### Example 1: Python with Webhook Handler (Flask)
```python
from flask import Flask, request, jsonify
import requests
app = Flask(__name__)
# Webhook handler
@app.route('/webhook/crawl-complete', methods=['POST'])
def handle_crawl_webhook():
payload = request.json
if payload['status'] == 'completed':
print(f"✅ Job {payload['task_id']} completed!")
print(f"Task type: {payload['task_type']}")
# Access the crawl results
if 'data' in payload:
markdown = payload['data'].get('markdown', '')
extracted = payload['data'].get('extracted_content', {})
print(f"Extracted {len(markdown)} characters")
print(f"Structured data: {extracted}")
else:
print(f"❌ Job {payload['task_id']} failed: {payload.get('error')}")
return jsonify({"status": "received"}), 200
# Submit a crawl job with webhook
def submit_crawl_job():
response = requests.post(
"http://localhost:11235/crawl/job",
json={
"urls": ["https://example.com"],
"extraction_strategy": {
"type": "JsonCssExtractionStrategy",
"schema": {
"name": "Example Schema",
"baseSelector": "body",
"fields": [
{"name": "title", "selector": "h1", "type": "text"},
{"name": "description", "selector": "meta[name='description']", "type": "attribute", "attribute": "content"}
]
}
},
"webhook_config": {
"webhook_url": "https://your-app.com/webhook/crawl-complete",
"webhook_data_in_payload": True,
"webhook_headers": {
"X-Webhook-Secret": "your-secret-token"
}
}
}
)
task_id = response.json()['task_id']
print(f"Job submitted: {task_id}")
return task_id
if __name__ == '__main__':
app.run(port=5000)
```
#### Example 2: LLM Extraction with Webhooks
```python
import requests
def submit_llm_job_with_webhook():
response = requests.post(
"http://localhost:11235/llm/job",
json={
"url": "https://example.com/article",
"q": "Extract the article title, author, and main points",
"provider": "openai/gpt-4o-mini",
"webhook_config": {
"webhook_url": "https://your-app.com/webhook/llm-complete",
"webhook_data_in_payload": True,
"webhook_headers": {
"X-Webhook-Secret": "your-secret-token"
}
}
}
)
task_id = response.json()['task_id']
print(f"LLM job submitted: {task_id}")
return task_id
# Webhook handler for LLM jobs
@app.route('/webhook/llm-complete', methods=['POST'])
def handle_llm_webhook():
payload = request.json
if payload['status'] == 'completed':
extracted = payload['data']['extracted_content']
print(f"✅ LLM extraction completed!")
print(f"Results: {extracted}")
else:
print(f"❌ LLM extraction failed: {payload.get('error')}")
return jsonify({"status": "received"}), 200
```
#### Example 3: Without Webhooks (Polling)
If you don't use webhooks, you can poll for results:
```python
import requests
import time
# Submit job
response = requests.post(
"http://localhost:11235/crawl/job",
json={"urls": ["https://example.com"]}
)
task_id = response.json()['task_id']
# Poll for results
while True:
result = requests.get(f"http://localhost:11235/job/{task_id}")
data = result.json()
if data['status'] == 'completed':
print("Job completed!")
print(data['result'])
break
elif data['status'] == 'failed':
print(f"Job failed: {data.get('error')}")
break
print("Still processing...")
time.sleep(2)
```
#### Example 4: Global Webhook Configuration
Set a default webhook URL in your `config.yml` to avoid repeating it in every request:
```yaml
# config.yml
api:
crawler:
# ... other settings ...
webhook:
default_url: "https://your-app.com/webhook/default"
default_headers:
X-Webhook-Secret: "your-secret-token"
```
Then submit jobs without webhook config:
```python
# Uses the global webhook configuration
response = requests.post(
"http://localhost:11235/crawl/job",
json={"urls": ["https://example.com"]}
)
```
### Webhook Best Practices
1. **Authentication:** Always use custom headers for webhook authentication
```json
"webhook_headers": {
"X-Webhook-Secret": "your-secret-token"
}
```
2. **Idempotency:** Design your webhook handler to be idempotent (safe to receive duplicate notifications)
3. **Fast Response:** Return HTTP 200 quickly; process data asynchronously if needed
```python
@app.route('/webhook', methods=['POST'])
def webhook():
payload = request.json
# Queue for background processing
queue.enqueue(process_webhook, payload)
return jsonify({"status": "received"}), 200
```
4. **Error Handling:** Handle both success and failure notifications
```python
if payload['status'] == 'completed':
# Process success
elif payload['status'] == 'failed':
# Log error, retry, or alert
```
5. **Validation:** Verify webhook authenticity using custom headers
```python
secret = request.headers.get('X-Webhook-Secret')
if secret != os.environ['EXPECTED_SECRET']:
return jsonify({"error": "Unauthorized"}), 401
```
6. **Logging:** Log webhook deliveries for debugging
```python
logger.info(f"Webhook received: {payload['task_id']} - {payload['status']}")
```
### Use Cases
**1. Batch Processing**
Submit hundreds of URLs and get notified as each completes:
```python
urls = ["https://site1.com", "https://site2.com", ...]
for url in urls:
submit_crawl_job(url, webhook_url="https://app.com/webhook")
```
**2. Microservice Integration**
Integrate with event-driven architectures:
```python
# Service A submits job
task_id = submit_crawl_job(url)
# Service B receives webhook and triggers next step
@app.route('/webhook')
def webhook():
process_result(request.json)
trigger_next_service()
return "OK", 200
```
**3. Long-Running Extractions**
Handle complex LLM extractions without timeouts:
```python
submit_llm_job(
url="https://long-article.com",
q="Comprehensive summary with key points and analysis",
webhook_url="https://app.com/webhook/llm"
)
```
### Troubleshooting
**Webhook not receiving notifications?**
- Check your webhook URL is publicly accessible
- Verify firewall/security group settings
- Use webhook testing tools like webhook.site for debugging
- Check server logs for delivery attempts
- Ensure your handler returns 200-299 status code
**Job stuck in processing?**
- Check Redis connection: `docker logs <container_name> | grep redis`
- Verify worker processes: `docker exec <container_name> ps aux | grep worker`
- Check server logs: `docker logs <container_name>`
**Need to cancel a job?**
Jobs are processed asynchronously. If you need to cancel:
- Delete the task from Redis (requires Redis CLI access)
- Or implement a cancellation endpoint in your webhook handler
---
## Dockerfile Parameters ## Dockerfile Parameters
You can customize the image build process using build arguments (`--build-arg`). These are typically used via `docker buildx build` or within the `docker-compose.yml` file. You can customize the image build process using build arguments (`--build-arg`). These are typically used via `docker buildx build` or within the `docker-compose.yml` file.

View File

@@ -57,7 +57,7 @@
Crawl4AI is the #1 trending GitHub repository, actively maintained by a vibrant community. It delivers blazing-fast, AI-ready web crawling tailored for large language models, AI agents, and data pipelines. Fully open source, flexible, and built for real-time performance, **Crawl4AI** empowers developers with unmatched speed, precision, and deployment ease. Crawl4AI is the #1 trending GitHub repository, actively maintained by a vibrant community. It delivers blazing-fast, AI-ready web crawling tailored for large language models, AI agents, and data pipelines. Fully open source, flexible, and built for real-time performance, **Crawl4AI** empowers developers with unmatched speed, precision, and deployment ease.
> Enjoy using Crawl4AI? Consider **[becoming a sponsor](https://github.com/sponsors/unclecode)** to support ongoing development and community growth! > **Note**: If you're looking for the old documentation, you can access it [here](https://old.docs.crawl4ai.com).
## 🆕 AI Assistant Skill Now Available! ## 🆕 AI Assistant Skill Now Available!

View File

@@ -529,19 +529,8 @@ class AdminDashboard {
</label> </label>
</div> </div>
<div class="form-group full-width"> <div class="form-group full-width">
<label>Long Description (Markdown - Overview tab)</label> <label>Integration Guide</label>
<textarea id="form-long-description" rows="10" placeholder="Enter detailed description with markdown formatting...">${app?.long_description || ''}</textarea> <textarea id="form-integration" rows="10">${app?.integration_guide || ''}</textarea>
<small>Markdown support: **bold**, *italic*, [links](url), # headers, code blocks, lists</small>
</div>
<div class="form-group full-width">
<label>Integration Guide (Markdown - Integration tab)</label>
<textarea id="form-integration" rows="20" placeholder="Enter integration guide with installation, examples, and code snippets using markdown...">${app?.integration_guide || ''}</textarea>
<small>Single markdown field with installation, examples, and complete guide. Code blocks get auto copy buttons.</small>
</div>
<div class="form-group full-width">
<label>Documentation (Markdown - Documentation tab)</label>
<textarea id="form-documentation" rows="20" placeholder="Enter documentation with API reference, examples, and best practices using markdown...">${app?.documentation || ''}</textarea>
<small>Full documentation with API reference, examples, best practices, etc.</small>
</div> </div>
</div> </div>
`; `;
@@ -723,9 +712,7 @@ class AdminDashboard {
data.contact_email = document.getElementById('form-email').value; data.contact_email = document.getElementById('form-email').value;
data.featured = document.getElementById('form-featured').checked ? 1 : 0; data.featured = document.getElementById('form-featured').checked ? 1 : 0;
data.sponsored = document.getElementById('form-sponsored').checked ? 1 : 0; data.sponsored = document.getElementById('form-sponsored').checked ? 1 : 0;
data.long_description = document.getElementById('form-long-description').value;
data.integration_guide = document.getElementById('form-integration').value; data.integration_guide = document.getElementById('form-integration').value;
data.documentation = document.getElementById('form-documentation').value;
} else if (type === 'articles') { } else if (type === 'articles') {
data.title = document.getElementById('form-title').value; data.title = document.getElementById('form-title').value;
data.slug = this.generateSlug(data.title); data.slug = this.generateSlug(data.title);

View File

@@ -278,12 +278,12 @@
} }
.tab-content { .tab-content {
display: none !important; display: none;
padding: 2rem; padding: 2rem;
} }
.tab-content.active { .tab-content.active {
display: block !important; display: block;
} }
/* Overview Layout */ /* Overview Layout */
@@ -510,31 +510,6 @@
line-height: 1.5; line-height: 1.5;
} }
/* Markdown rendered code blocks */
.integration-content pre,
.docs-content pre {
background: var(--bg-dark);
border: 1px solid var(--border-color);
margin: 1rem 0;
padding: 1rem;
padding-top: 2.5rem; /* Space for copy button */
overflow-x: auto;
position: relative;
max-height: none; /* Remove any height restrictions */
height: auto; /* Allow content to expand */
}
.integration-content pre code,
.docs-content pre code {
background: transparent;
padding: 0;
color: var(--text-secondary);
font-size: 0.875rem;
line-height: 1.5;
white-space: pre; /* Preserve whitespace and line breaks */
display: block;
}
/* Feature Grid */ /* Feature Grid */
.feature-grid { .feature-grid {
display: grid; display: grid;

View File

@@ -73,14 +73,27 @@
<div class="tabs"> <div class="tabs">
<button class="tab-btn active" data-tab="overview">Overview</button> <button class="tab-btn active" data-tab="overview">Overview</button>
<button class="tab-btn" data-tab="integration">Integration</button> <button class="tab-btn" data-tab="integration">Integration</button>
<!-- <button class="tab-btn" data-tab="docs">Documentation</button> <button class="tab-btn" data-tab="docs">Documentation</button>
<button class="tab-btn" data-tab="support">Support</button> --> <button class="tab-btn" data-tab="support">Support</button>
</div> </div>
<section id="overview-tab" class="tab-content active"> <section id="overview-tab" class="tab-content active">
<div class="overview-columns"> <div class="overview-columns">
<div class="overview-main"> <div class="overview-main">
<h2>Overview</h2>
<div id="app-overview">Overview content goes here.</div> <div id="app-overview">Overview content goes here.</div>
<h3>Key Features</h3>
<ul id="app-features" class="features-list">
<li>Feature 1</li>
<li>Feature 2</li>
<li>Feature 3</li>
</ul>
<h3>Use Cases</h3>
<div id="app-use-cases" class="use-cases">
<p>Describe how this app can help your workflow.</p>
</div>
</div> </div>
<aside class="sidebar"> <aside class="sidebar">
@@ -129,16 +142,37 @@
</section> </section>
<section id="integration-tab" class="tab-content"> <section id="integration-tab" class="tab-content">
<div class="integration-content" id="app-integration"> <div class="integration-content">
<h2>Integration Guide</h2>
<h3>Installation</h3>
<div class="code-block">
<pre><code id="install-code"># Installation instructions will appear here</code></pre>
</div>
<h3>Basic Usage</h3>
<div class="code-block">
<pre><code id="usage-code"># Usage example will appear here</code></pre>
</div>
<h3>Complete Integration Example</h3>
<div class="code-block">
<button class="copy-btn" id="copy-integration">Copy</button>
<pre><code id="integration-code"># Complete integration guide will appear here</code></pre>
</div>
</div> </div>
</section> </section>
<!-- <section id="docs-tab" class="tab-content"> <section id="docs-tab" class="tab-content">
<div class="docs-content" id="app-docs"> <div class="docs-content">
<h2>Documentation</h2>
<div id="app-docs" class="doc-sections">
<p>Documentation coming soon.</p>
</div>
</div> </div>
</section> --> </section>
<!-- <section id="support-tab" class="tab-content"> <section id="support-tab" class="tab-content">
<div class="docs-content"> <div class="docs-content">
<h2>Support</h2> <h2>Support</h2>
<div class="support-grid"> <div class="support-grid">
@@ -156,7 +190,7 @@
</div> </div>
</div> </div>
</div> </div>
</section> --> </section>
</div> </div>
</main> </main>

View File

@@ -112,7 +112,7 @@ class AppDetailPage {
} }
// Contact // Contact
document.getElementById('app-contact') && (document.getElementById('app-contact').textContent = this.appData.contact_email || 'Not available'); document.getElementById('app-contact').textContent = this.appData.contact_email || 'Not available';
// Sidebar info // Sidebar info
document.getElementById('sidebar-downloads').textContent = this.formatNumber(this.appData.downloads || 0); document.getElementById('sidebar-downloads').textContent = this.formatNumber(this.appData.downloads || 0);
@@ -123,134 +123,146 @@ class AppDetailPage {
document.getElementById('sidebar-pricing').textContent = this.appData.pricing || 'Free'; document.getElementById('sidebar-pricing').textContent = this.appData.pricing || 'Free';
document.getElementById('sidebar-contact').textContent = this.appData.contact_email || 'contact@example.com'; document.getElementById('sidebar-contact').textContent = this.appData.contact_email || 'contact@example.com';
// Render tab contents from database fields // Integration guide
this.renderTabContents(); this.renderIntegrationGuide();
} }
renderTabContents() { renderIntegrationGuide() {
// Overview tab - use long_description from database // Installation code
const overviewDiv = document.getElementById('app-overview'); const installCode = document.getElementById('install-code');
if (overviewDiv) { if (installCode) {
if (this.appData.long_description) { if (this.appData.type === 'Open Source' && this.appData.github_url) {
overviewDiv.innerHTML = this.renderMarkdown(this.appData.long_description); installCode.textContent = `# Clone from GitHub
} else { git clone ${this.appData.github_url}
overviewDiv.innerHTML = `<p>${this.appData.description || 'No overview available.'}</p>`;
# Install dependencies
pip install -r requirements.txt`;
} else if (this.appData.name.toLowerCase().includes('api')) {
installCode.textContent = `# Install via pip
pip install ${this.appData.slug}
# Or install from source
pip install git+${this.appData.github_url || 'https://github.com/example/repo'}`;
} }
} }
// Integration tab - use integration_guide field from database // Usage code - customize based on category
const integrationDiv = document.getElementById('app-integration'); const usageCode = document.getElementById('usage-code');
if (integrationDiv) { if (usageCode) {
if (this.appData.integration_guide) { if (this.appData.category === 'Browser Automation') {
integrationDiv.innerHTML = this.renderMarkdown(this.appData.integration_guide); usageCode.textContent = `from crawl4ai import AsyncWebCrawler
// Add copy buttons to all code blocks from ${this.appData.slug.replace(/-/g, '_')} import ${this.appData.name.replace(/\s+/g, '')}
this.addCopyButtonsToCodeBlocks(integrationDiv);
} else { async def main():
integrationDiv.innerHTML = '<p>Integration guide not yet available. Please check the official website for details.</p>'; # Initialize ${this.appData.name}
automation = ${this.appData.name.replace(/\s+/g, '')}()
async with AsyncWebCrawler() as crawler:
result = await crawler.arun(
url="https://example.com",
browser_config=automation.config,
wait_for="css:body"
)
print(result.markdown)`;
} else if (this.appData.category === 'Proxy Services') {
usageCode.textContent = `from crawl4ai import AsyncWebCrawler
import ${this.appData.slug.replace(/-/g, '_')}
# Configure proxy
proxy_config = {
"server": "${this.appData.website_url || 'https://proxy.example.com'}",
"username": "your_username",
"password": "your_password"
}
async with AsyncWebCrawler(proxy=proxy_config) as crawler:
result = await crawler.arun(
url="https://example.com",
bypass_cache=True
)
print(result.status_code)`;
} else if (this.appData.category === 'LLM Integration') {
usageCode.textContent = `from crawl4ai import AsyncWebCrawler
from crawl4ai.extraction_strategy import LLMExtractionStrategy
# Configure LLM extraction
strategy = LLMExtractionStrategy(
provider="${this.appData.name.toLowerCase().includes('gpt') ? 'openai' : 'anthropic'}",
api_key="your-api-key",
model="${this.appData.name.toLowerCase().includes('gpt') ? 'gpt-4' : 'claude-3'}",
instruction="Extract structured data"
)
async with AsyncWebCrawler() as crawler:
result = await crawler.arun(
url="https://example.com",
extraction_strategy=strategy
)
print(result.extracted_content)`;
} }
} }
// Documentation tab - use documentation field from database // Integration example
const docsDiv = document.getElementById('app-docs'); const integrationCode = document.getElementById('integration-code');
if (docsDiv) { if (integrationCode) {
if (this.appData.documentation) { integrationCode.textContent = this.appData.integration_guide ||
docsDiv.innerHTML = this.renderMarkdown(this.appData.documentation); `# Complete ${this.appData.name} Integration Example
// Add copy buttons to all code blocks
this.addCopyButtonsToCodeBlocks(docsDiv); from crawl4ai import AsyncWebCrawler
} else { from crawl4ai.extraction_strategy import JsonCssExtractionStrategy
docsDiv.innerHTML = '<p>Documentation coming soon.</p>'; import json
}
async def crawl_with_${this.appData.slug.replace(/-/g, '_')}():
"""
Complete example showing how to use ${this.appData.name}
with Crawl4AI for production web scraping
"""
# Define extraction schema
schema = {
"name": "ProductList",
"baseSelector": "div.product",
"fields": [
{"name": "title", "selector": "h2", "type": "text"},
{"name": "price", "selector": ".price", "type": "text"},
{"name": "image", "selector": "img", "type": "attribute", "attribute": "src"},
{"name": "link", "selector": "a", "type": "attribute", "attribute": "href"}
]
}
# Initialize crawler with ${this.appData.name}
async with AsyncWebCrawler(
browser_type="chromium",
headless=True,
verbose=True
) as crawler:
# Crawl with extraction
result = await crawler.arun(
url="https://example.com/products",
extraction_strategy=JsonCssExtractionStrategy(schema),
cache_mode="bypass",
wait_for="css:.product",
screenshot=True
)
# Process results
if result.success:
products = json.loads(result.extracted_content)
print(f"Found {len(products)} products")
for product in products[:5]:
print(f"- {product['title']}: {product['price']}")
return products
# Run the crawler
if __name__ == "__main__":
import asyncio
asyncio.run(crawl_with_${this.appData.slug.replace(/-/g, '_')}())`;
} }
} }
addCopyButtonsToCodeBlocks(container) {
// Find all code blocks and add copy buttons
const codeBlocks = container.querySelectorAll('pre code');
codeBlocks.forEach(codeBlock => {
const pre = codeBlock.parentElement;
// Skip if already has a copy button
if (pre.querySelector('.copy-btn')) return;
// Create copy button
const copyBtn = document.createElement('button');
copyBtn.className = 'copy-btn';
copyBtn.textContent = 'Copy';
copyBtn.onclick = () => {
navigator.clipboard.writeText(codeBlock.textContent).then(() => {
copyBtn.textContent = '✓ Copied!';
setTimeout(() => {
copyBtn.textContent = 'Copy';
}, 2000);
});
};
// Add button to pre element
pre.style.position = 'relative';
pre.insertBefore(copyBtn, codeBlock);
});
}
renderMarkdown(text) {
if (!text) return '';
// Store code blocks temporarily to protect them from processing
const codeBlocks = [];
let processed = text.replace(/```(\w+)?\n([\s\S]*?)```/g, (match, lang, code) => {
const placeholder = `___CODE_BLOCK_${codeBlocks.length}___`;
codeBlocks.push(`<pre><code class="language-${lang || ''}">${this.escapeHtml(code)}</code></pre>`);
return placeholder;
});
// Store inline code temporarily
const inlineCodes = [];
processed = processed.replace(/`([^`]+)`/g, (match, code) => {
const placeholder = `___INLINE_CODE_${inlineCodes.length}___`;
inlineCodes.push(`<code>${this.escapeHtml(code)}</code>`);
return placeholder;
});
// Now process the rest of the markdown
processed = processed
// Headers
.replace(/^### (.*$)/gim, '<h3>$1</h3>')
.replace(/^## (.*$)/gim, '<h2>$1</h2>')
.replace(/^# (.*$)/gim, '<h1>$1</h1>')
// Bold
.replace(/\*\*(.*?)\*\*/g, '<strong>$1</strong>')
// Italic
.replace(/\*(.*?)\*/g, '<em>$1</em>')
// Links
.replace(/\[([^\]]+)\]\(([^)]+)\)/g, '<a href="$2" target="_blank">$1</a>')
// Line breaks
.replace(/\n\n/g, '</p><p>')
.replace(/\n/g, '<br>')
// Lists
.replace(/^\* (.*)$/gim, '<li>$1</li>')
.replace(/^- (.*)$/gim, '<li>$1</li>')
// Wrap in paragraphs
.replace(/^(?!<[h|p|pre|ul|ol|li])/gim, '<p>')
.replace(/(?<![>])$/gim, '</p>');
// Restore inline code
inlineCodes.forEach((code, i) => {
processed = processed.replace(`___INLINE_CODE_${i}___`, code);
});
// Restore code blocks
codeBlocks.forEach((block, i) => {
processed = processed.replace(`___CODE_BLOCK_${i}___`, block);
});
return processed;
}
escapeHtml(text) {
const div = document.createElement('div');
div.textContent = text;
return div.innerHTML;
}
formatNumber(num) { formatNumber(num) {
if (num >= 1000000) { if (num >= 1000000) {
return (num / 1000000).toFixed(1) + 'M'; return (num / 1000000).toFixed(1) + 'M';
@@ -263,27 +275,45 @@ class AppDetailPage {
setupEventListeners() { setupEventListeners() {
// Tab switching // Tab switching
const tabs = document.querySelectorAll('.tab-btn'); const tabs = document.querySelectorAll('.tab-btn');
tabs.forEach(tab => { tabs.forEach(tab => {
tab.addEventListener('click', () => { tab.addEventListener('click', () => {
// Update active tab button // Update active tab
tabs.forEach(t => t.classList.remove('active')); tabs.forEach(t => t.classList.remove('active'));
tab.classList.add('active'); tab.classList.add('active');
// Show corresponding content // Show corresponding content
const tabName = tab.dataset.tab; const tabName = tab.dataset.tab;
document.querySelectorAll('.tab-content').forEach(content => {
// Hide all tab contents
const allTabContents = document.querySelectorAll('.tab-content');
allTabContents.forEach(content => {
content.classList.remove('active'); content.classList.remove('active');
}); });
document.getElementById(`${tabName}-tab`).classList.add('active');
});
});
// Show the selected tab content // Copy integration code
const targetTab = document.getElementById(`${tabName}-tab`); document.getElementById('copy-integration').addEventListener('click', () => {
if (targetTab) { const code = document.getElementById('integration-code').textContent;
targetTab.classList.add('active'); navigator.clipboard.writeText(code).then(() => {
} const btn = document.getElementById('copy-integration');
const originalText = btn.innerHTML;
btn.innerHTML = '<span>✓</span> Copied!';
setTimeout(() => {
btn.innerHTML = originalText;
}, 2000);
});
});
// Copy code buttons
document.querySelectorAll('.copy-btn').forEach(btn => {
btn.addEventListener('click', (e) => {
const codeBlock = e.target.closest('.code-block');
const code = codeBlock.querySelector('code').textContent;
navigator.clipboard.writeText(code).then(() => {
btn.textContent = 'Copied!';
setTimeout(() => {
btn.textContent = 'Copy';
}, 2000);
});
}); });
}); });
} }

View File

@@ -471,17 +471,13 @@ async def delete_sponsor(sponsor_id: int):
app.include_router(router) app.include_router(router)
# Version info
VERSION = "1.1.0"
BUILD_DATE = "2025-10-26"
@app.get("/") @app.get("/")
async def root(): async def root():
"""API info""" """API info"""
return { return {
"name": "Crawl4AI Marketplace API", "name": "Crawl4AI Marketplace API",
"version": VERSION, "version": "1.0.0",
"build_date": BUILD_DATE,
"endpoints": [ "endpoints": [
"/marketplace/api/apps", "/marketplace/api/apps",
"/marketplace/api/articles", "/marketplace/api/articles",

View File

@@ -31,7 +31,7 @@ dependencies = [
"rank-bm25~=0.2", "rank-bm25~=0.2",
"snowballstemmer~=2.2", "snowballstemmer~=2.2",
"pydantic>=2.10", "pydantic>=2.10",
"pyOpenSSL>=25.3.0", "pyOpenSSL>=24.3.0",
"psutil>=6.1.1", "psutil>=6.1.1",
"PyYAML>=6.0", "PyYAML>=6.0",
"nltk>=3.9.1", "nltk>=3.9.1",

View File

@@ -19,7 +19,7 @@ rank-bm25~=0.2
colorama~=0.4 colorama~=0.4
snowballstemmer~=2.2 snowballstemmer~=2.2
pydantic>=2.10 pydantic>=2.10
pyOpenSSL>=25.3.0 pyOpenSSL>=24.3.0
psutil>=6.1.1 psutil>=6.1.1
PyYAML>=6.0 PyYAML>=6.0
nltk>=3.9.1 nltk>=3.9.1

View File

@@ -364,19 +364,5 @@ async def test_network_error_handling():
async with AsyncPlaywrightCrawlerStrategy() as strategy: async with AsyncPlaywrightCrawlerStrategy() as strategy:
await strategy.crawl("https://invalid.example.com", config) await strategy.crawl("https://invalid.example.com", config)
@pytest.mark.asyncio
async def test_remove_overlay_elements(crawler_strategy):
config = CrawlerRunConfig(
remove_overlay_elements=True,
delay_before_return_html=5,
)
response = await crawler_strategy.crawl(
"https://www2.hm.com/en_us/index.html",
config
)
assert response.status_code == 200
assert "Accept all cookies" not in response.html
if __name__ == "__main__": if __name__ == "__main__":
pytest.main([__file__, "-v"]) pytest.main([__file__, "-v"])

View File

@@ -1,220 +0,0 @@
"""
Final verification test for Issue #1055 fix
This test demonstrates that LLM extraction now runs in parallel
when using arun_many with multiple URLs.
"""
import os
import sys
import time
import asyncio
grandparent_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
sys.path.append(grandparent_dir)
from crawl4ai import (
AsyncWebCrawler,
BrowserConfig,
CrawlerRunConfig,
CacheMode,
LLMExtractionStrategy,
LLMConfig,
)
from pydantic import BaseModel
class SimpleData(BaseModel):
title: str
summary: str
def print_section(title):
print("\n" + "=" * 80)
print(title)
print("=" * 80 + "\n")
async def test_without_llm():
"""Baseline: Test crawling without LLM extraction"""
print_section("TEST 1: Crawling WITHOUT LLM Extraction")
config = CrawlerRunConfig(
cache_mode=CacheMode.BYPASS,
)
browser_config = BrowserConfig(headless=True, verbose=False)
urls = [
"https://www.example.com",
"https://www.iana.org",
"https://www.wikipedia.org",
]
print(f"Crawling {len(urls)} URLs without LLM extraction...")
print("Expected: Fast and parallel\n")
start_time = time.time()
async with AsyncWebCrawler(config=browser_config) as crawler:
results = await crawler.arun_many(urls=urls, config=config)
duration = time.time() - start_time
print(f"\n✅ Completed in {duration:.2f}s")
print(f" Successful: {sum(1 for r in results if r.success)}/{len(urls)}")
print(f" Average: {duration/len(urls):.2f}s per URL")
return duration
async def test_with_llm_before_fix():
"""Demonstrate the problem: Sequential execution with LLM"""
print_section("TEST 2: What Issue #1055 Reported (LLM Sequential Behavior)")
print("The issue reported that with LLM extraction, URLs would crawl")
print("one after another instead of in parallel.")
print("\nWithout our fix, this would show:")
print(" - URL 1 fetches → extracts → completes")
print(" - URL 2 fetches → extracts → completes")
print(" - URL 3 fetches → extracts → completes")
print("\nTotal time would be approximately sum of all individual times.")
async def test_with_llm_after_fix():
"""Demonstrate the fix: Parallel execution with LLM"""
print_section("TEST 3: After Fix - LLM Extraction in Parallel")
config = CrawlerRunConfig(
cache_mode=CacheMode.BYPASS,
extraction_strategy=LLMExtractionStrategy(
llm_config=LLMConfig(provider="openai/gpt-4o-mini"),
schema=SimpleData.model_json_schema(),
extraction_type="schema",
instruction="Extract title and summary",
)
)
browser_config = BrowserConfig(headless=True, verbose=False)
urls = [
"https://www.example.com",
"https://www.iana.org",
"https://www.wikipedia.org",
]
print(f"Crawling {len(urls)} URLs WITH LLM extraction...")
print("Expected: Parallel execution with our fix\n")
completion_times = {}
start_time = time.time()
async with AsyncWebCrawler(config=browser_config) as crawler:
results = await crawler.arun_many(urls=urls, config=config)
for result in results:
elapsed = time.time() - start_time
completion_times[result.url] = elapsed
print(f" [{elapsed:5.2f}s] ✓ {result.url[:50]}")
duration = time.time() - start_time
print(f"\n✅ Total time: {duration:.2f}s")
print(f" Successful: {sum(1 for url in urls if url in completion_times)}/{len(urls)}")
# Analyze parallelism
times = list(completion_times.values())
if len(times) >= 2:
# If parallel, completion times should be staggered, not evenly spaced
time_diffs = [times[i+1] - times[i] for i in range(len(times)-1)]
avg_diff = sum(time_diffs) / len(time_diffs)
print(f"\nParallelism Analysis:")
print(f" Completion time differences: {[f'{d:.2f}s' for d in time_diffs]}")
print(f" Average difference: {avg_diff:.2f}s")
# In parallel mode, some tasks complete close together
# In sequential mode, they're evenly spaced (avg ~2-3s apart)
if avg_diff < duration / len(urls):
print(f" ✅ PARALLEL: Tasks completed with overlapping execution")
else:
print(f" ⚠️ SEQUENTIAL: Tasks completed one after another")
return duration
async def test_multiple_arun_calls():
"""Test multiple individual arun() calls in parallel"""
print_section("TEST 4: Multiple arun() Calls with asyncio.gather")
config = CrawlerRunConfig(
cache_mode=CacheMode.BYPASS,
extraction_strategy=LLMExtractionStrategy(
llm_config=LLMConfig(provider="openai/gpt-4o-mini"),
schema=SimpleData.model_json_schema(),
extraction_type="schema",
instruction="Extract title and summary",
)
)
browser_config = BrowserConfig(headless=True, verbose=False)
urls = [
"https://www.example.com",
"https://www.iana.org",
"https://www.wikipedia.org",
]
print(f"Running {len(urls)} arun() calls with asyncio.gather()...")
print("Expected: True parallel execution\n")
start_time = time.time()
async with AsyncWebCrawler(config=browser_config) as crawler:
tasks = [crawler.arun(url, config=config) for url in urls]
results = await asyncio.gather(*tasks)
duration = time.time() - start_time
print(f"\n✅ Completed in {duration:.2f}s")
print(f" Successful: {sum(1 for r in results if r.success)}/{len(urls)}")
print(f" This proves the async LLM extraction works correctly")
return duration
async def main():
print("\n" + "🚀" * 40)
print("ISSUE #1055 FIX VERIFICATION")
print("Testing: Sequential → Parallel LLM Extraction")
print("🚀" * 40)
# Run tests
await test_without_llm()
await test_with_llm_before_fix()
time_with_llm = await test_with_llm_after_fix()
time_gather = await test_multiple_arun_calls()
# Final summary
print_section("FINAL VERDICT")
print("✅ Fix Verified!")
print("\nWhat changed:")
print(" • Created aperform_completion_with_backoff() using litellm.acompletion")
print(" • Added arun() method to ExtractionStrategy base class")
print(" • Implemented parallel arun() in LLMExtractionStrategy")
print(" • Updated AsyncWebCrawler to use arun() when available")
print("\nResult:")
print(" • LLM extraction now runs in parallel across multiple URLs")
print(" • Backward compatible - existing strategies still work")
print(" • No breaking changes to the API")
print("\n✨ Issue #1055 is RESOLVED!")
print("\n" + "=" * 80 + "\n")
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -1,168 +0,0 @@
"""
Lightweight test to verify pyOpenSSL security fix (Issue #1545).
This test verifies the security requirements are met:
1. pyOpenSSL >= 25.3.0 is installed
2. cryptography >= 45.0.7 is installed (above vulnerable range)
3. SSL/TLS functionality works correctly
This test can run without full crawl4ai dependencies installed.
"""
import sys
from packaging import version
def test_package_versions():
"""Test that package versions meet security requirements."""
print("=" * 70)
print("TEST: Package Version Security Requirements (Issue #1545)")
print("=" * 70)
all_passed = True
# Test pyOpenSSL version
try:
import OpenSSL
pyopenssl_version = OpenSSL.__version__
print(f"\n✓ pyOpenSSL is installed: {pyopenssl_version}")
if version.parse(pyopenssl_version) >= version.parse("25.3.0"):
print(f" ✓ PASS: pyOpenSSL {pyopenssl_version} >= 25.3.0 (required)")
else:
print(f" ✗ FAIL: pyOpenSSL {pyopenssl_version} < 25.3.0 (required)")
all_passed = False
except ImportError as e:
print(f"\n✗ FAIL: pyOpenSSL not installed - {e}")
all_passed = False
# Test cryptography version
try:
import cryptography
crypto_version = cryptography.__version__
print(f"\n✓ cryptography is installed: {crypto_version}")
# The vulnerable range is >=37.0.0 & <43.0.1
# We need >= 45.0.7 to be safe
if version.parse(crypto_version) >= version.parse("45.0.7"):
print(f" ✓ PASS: cryptography {crypto_version} >= 45.0.7 (secure)")
print(f" ✓ NOT in vulnerable range (37.0.0 to 43.0.0)")
elif version.parse(crypto_version) >= version.parse("37.0.0") and version.parse(crypto_version) < version.parse("43.0.1"):
print(f" ✗ FAIL: cryptography {crypto_version} is VULNERABLE")
print(f" ✗ Version is in vulnerable range (>=37.0.0 & <43.0.1)")
all_passed = False
else:
print(f" ⚠ WARNING: cryptography {crypto_version} < 45.0.7")
print(f" ⚠ May not meet security requirements")
except ImportError as e:
print(f"\n✗ FAIL: cryptography not installed - {e}")
all_passed = False
return all_passed
def test_ssl_basic_functionality():
"""Test that SSL/TLS basic functionality works."""
print("\n" + "=" * 70)
print("TEST: SSL/TLS Basic Functionality")
print("=" * 70)
try:
import OpenSSL.SSL
# Create a basic SSL context to verify functionality
context = OpenSSL.SSL.Context(OpenSSL.SSL.TLSv1_2_METHOD)
print("\n✓ SSL Context created successfully")
print(" ✓ PASS: SSL/TLS functionality is working")
return True
except Exception as e:
print(f"\n✗ FAIL: SSL functionality test failed - {e}")
return False
def test_pyopenssl_crypto_integration():
"""Test that pyOpenSSL and cryptography integration works."""
print("\n" + "=" * 70)
print("TEST: pyOpenSSL <-> cryptography Integration")
print("=" * 70)
try:
from OpenSSL import crypto
# Generate a simple key pair to test integration
key = crypto.PKey()
key.generate_key(crypto.TYPE_RSA, 2048)
print("\n✓ Generated RSA key pair successfully")
print(" ✓ PASS: pyOpenSSL and cryptography are properly integrated")
return True
except Exception as e:
print(f"\n✗ FAIL: Integration test failed - {e}")
import traceback
traceback.print_exc()
return False
def main():
"""Run all security tests."""
print("\n")
print("" + "=" * 68 + "")
print("║ pyOpenSSL Security Fix Verification - Issue #1545 ║")
print("" + "=" * 68 + "")
print("\nVerifying that the pyOpenSSL update resolves the security vulnerability")
print("in the cryptography package (CVE: versions >=37.0.0 & <43.0.1)\n")
results = []
# Test 1: Package versions
results.append(("Package Versions", test_package_versions()))
# Test 2: SSL functionality
results.append(("SSL Functionality", test_ssl_basic_functionality()))
# Test 3: Integration
results.append(("pyOpenSSL-crypto Integration", test_pyopenssl_crypto_integration()))
# Summary
print("\n" + "=" * 70)
print("TEST SUMMARY")
print("=" * 70)
all_passed = True
for test_name, passed in results:
status = "✓ PASS" if passed else "✗ FAIL"
print(f"{status}: {test_name}")
all_passed = all_passed and passed
print("=" * 70)
if all_passed:
print("\n✓✓✓ ALL TESTS PASSED ✓✓✓")
print("✓ Security vulnerability is resolved")
print("✓ pyOpenSSL >= 25.3.0 is working correctly")
print("✓ cryptography >= 45.0.7 (not vulnerable)")
print("\nThe dependency update is safe to merge.\n")
return True
else:
print("\n✗✗✗ SOME TESTS FAILED ✗✗✗")
print("✗ Security requirements not met")
print("\nDo NOT merge until all tests pass.\n")
return False
if __name__ == "__main__":
try:
success = main()
sys.exit(0 if success else 1)
except KeyboardInterrupt:
print("\n\nTest interrupted by user")
sys.exit(1)
except Exception as e:
print(f"\n✗ Unexpected error: {e}")
import traceback
traceback.print_exc()
sys.exit(1)

View File

@@ -1,184 +0,0 @@
"""
Test script to verify pyOpenSSL update doesn't break crawl4ai functionality.
This test verifies:
1. pyOpenSSL and cryptography versions are correct and secure
2. Basic crawling functionality still works
3. HTTPS/SSL connections work properly
4. Stealth mode integration works (uses playwright-stealth internally)
Issue: #1545 - Security vulnerability in cryptography package
Fix: Updated pyOpenSSL from >=24.3.0 to >=25.3.0
Expected: cryptography package should be >=45.0.7 (above vulnerable range)
"""
import asyncio
import sys
from packaging import version
def check_versions():
"""Verify pyOpenSSL and cryptography versions meet security requirements."""
print("=" * 60)
print("STEP 1: Checking Package Versions")
print("=" * 60)
try:
import OpenSSL
pyopenssl_version = OpenSSL.__version__
print(f"✓ pyOpenSSL version: {pyopenssl_version}")
# Check pyOpenSSL >= 25.3.0
if version.parse(pyopenssl_version) >= version.parse("25.3.0"):
print(f" ✓ Version check passed: {pyopenssl_version} >= 25.3.0")
else:
print(f" ✗ Version check FAILED: {pyopenssl_version} < 25.3.0")
return False
except ImportError as e:
print(f"✗ Failed to import pyOpenSSL: {e}")
return False
try:
import cryptography
crypto_version = cryptography.__version__
print(f"✓ cryptography version: {crypto_version}")
# Check cryptography >= 45.0.7 (above vulnerable range)
if version.parse(crypto_version) >= version.parse("45.0.7"):
print(f" ✓ Security check passed: {crypto_version} >= 45.0.7 (not vulnerable)")
else:
print(f" ✗ Security check FAILED: {crypto_version} < 45.0.7 (potentially vulnerable)")
return False
except ImportError as e:
print(f"✗ Failed to import cryptography: {e}")
return False
print("\n✓ All version checks passed!\n")
return True
async def test_basic_crawl():
"""Test basic crawling functionality with HTTPS site."""
print("=" * 60)
print("STEP 2: Testing Basic HTTPS Crawling")
print("=" * 60)
try:
from crawl4ai import AsyncWebCrawler
async with AsyncWebCrawler(verbose=True) as crawler:
# Test with a simple HTTPS site (requires SSL/TLS)
print("Crawling example.com (HTTPS)...")
result = await crawler.arun(
url="https://www.example.com",
bypass_cache=True
)
if result.success:
print(f"✓ Crawl successful!")
print(f" - Status code: {result.status_code}")
print(f" - Content length: {len(result.html)} bytes")
print(f" - SSL/TLS connection: ✓ Working")
return True
else:
print(f"✗ Crawl failed: {result.error_message}")
return False
except Exception as e:
print(f"✗ Test failed with error: {e}")
import traceback
traceback.print_exc()
return False
async def test_stealth_mode():
"""Test stealth mode functionality (depends on playwright-stealth)."""
print("\n" + "=" * 60)
print("STEP 3: Testing Stealth Mode Integration")
print("=" * 60)
try:
from crawl4ai import AsyncWebCrawler, BrowserConfig
# Create browser config with stealth mode
browser_config = BrowserConfig(
headless=True,
verbose=False
)
async with AsyncWebCrawler(config=browser_config, verbose=True) as crawler:
print("Crawling with stealth mode enabled...")
result = await crawler.arun(
url="https://www.example.com",
bypass_cache=True
)
if result.success:
print(f"✓ Stealth crawl successful!")
print(f" - Stealth mode: ✓ Working")
return True
else:
print(f"✗ Stealth crawl failed: {result.error_message}")
return False
except Exception as e:
print(f"✗ Stealth test failed with error: {e}")
import traceback
traceback.print_exc()
return False
async def main():
"""Run all tests."""
print("\n")
print("" + "=" * 58 + "")
print("║ pyOpenSSL Security Update Verification Test (Issue #1545) ║")
print("" + "=" * 58 + "")
print("\n")
# Step 1: Check versions
versions_ok = check_versions()
if not versions_ok:
print("\n✗ FAILED: Version requirements not met")
return False
# Step 2: Test basic crawling
crawl_ok = await test_basic_crawl()
if not crawl_ok:
print("\n✗ FAILED: Basic crawling test failed")
return False
# Step 3: Test stealth mode
stealth_ok = await test_stealth_mode()
if not stealth_ok:
print("\n✗ FAILED: Stealth mode test failed")
return False
# All tests passed
print("\n" + "=" * 60)
print("FINAL RESULT")
print("=" * 60)
print("✓ All tests passed successfully!")
print("✓ pyOpenSSL update is working correctly")
print("✓ No breaking changes detected")
print("✓ Security vulnerability resolved")
print("=" * 60)
print("\n")
return True
if __name__ == "__main__":
try:
success = asyncio.run(main())
sys.exit(0 if success else 1)
except KeyboardInterrupt:
print("\n\nTest interrupted by user")
sys.exit(1)
except Exception as e:
print(f"\n✗ Unexpected error: {e}")
import traceback
traceback.print_exc()
sys.exit(1)