Add demo script for proxy rotation and quick test suite

- Implemented demo_proxy_rotation.py to showcase various proxy rotation strategies and their integration with the API.
- Included multiple demos demonstrating round robin, random, least used, failure-aware, and streaming strategies.
- Added error handling and real-world scenario examples for e-commerce price monitoring.
- Created quick_proxy_test.py to validate API integration without real proxies, testing parameter acceptance, invalid strategy rejection, and optional parameters.
- Ensured both scripts provide informative output and usage instructions.
This commit is contained in:
AHMET YILMAZ
2025-10-06 13:40:38 +08:00
parent 5dc34dd210
commit f00e8cbf35
7 changed files with 1706 additions and 5 deletions

View File

@@ -6,7 +6,7 @@ import time
from base64 import b64encode
from datetime import datetime, timezone
from functools import partial
from typing import AsyncGenerator, Dict, List, Optional, Tuple
from typing import AsyncGenerator, Dict, List, Optional, Tuple, Any
from urllib.parse import unquote
from fastapi import HTTPException, Request, status
from fastapi.background import BackgroundTasks
@@ -55,15 +55,56 @@ from crawl4ai.content_scraping_strategy import LXMLWebScrapingStrategy
from crawl4ai.markdown_generation_strategy import DefaultMarkdownGenerator
from crawl4ai.utils import perform_completion_with_backoff
# Import missing utility functions and types
try:
from utils import (
FilterType, TaskStatus, get_base_url, is_task_id,
get_llm_api_key, get_llm_temperature, get_llm_base_url,
validate_llm_provider
)
except ImportError:
# Fallback definitions for development/testing
from enum import Enum
class FilterType(str, Enum):
RAW = "raw"
FIT = "fit"
BM25 = "bm25"
LLM = "llm"
class TaskStatus(str, Enum):
PROCESSING = "processing"
FAILED = "failed"
COMPLETED = "completed"
def get_base_url(request):
return f"{request.url.scheme}://{request.url.netloc}"
def is_task_id(value: str):
return value.startswith("llm_") and "_" in value
def get_llm_api_key(config, provider=None):
return None
def get_llm_temperature(config, provider=None):
return 0.7
def get_llm_base_url(config, provider=None):
return None
def validate_llm_provider(config, provider):
return True, None
logger = logging.getLogger(__name__)
# --- Helper to get memory ---
def _get_memory_mb():
try:
import psutil
return psutil.Process().memory_info().rss / (1024 * 1024)
except Exception as e:
logger.warning(f"Could not get memory info: {e}")
logger.warning("Could not get memory info: %s", e)
return None
@@ -91,6 +132,63 @@ def _apply_headless_setting(browser_config: BrowserConfig, headless: bool):
return browser_config
# --- Helper to create proxy rotation strategy ---
def _create_proxy_rotation_strategy(
strategy_name: Optional[str],
proxies: Optional[List[Dict[str, Any]]],
failure_threshold: int = 3,
recovery_time: int = 300
):
"""Create proxy rotation strategy from request parameters."""
if not strategy_name or not proxies:
return None
# Import proxy strategies
from crawl4ai.proxy_strategy import (
RoundRobinProxyStrategy, RandomProxyStrategy,
LeastUsedProxyStrategy, FailureAwareProxyStrategy
)
from crawl4ai.async_configs import ProxyConfig
# Convert proxy inputs to ProxyConfig objects
proxy_configs = []
try:
for proxy in proxies:
if isinstance(proxy, dict):
# Validate required fields
if "server" not in proxy:
raise ValueError(f"Proxy configuration missing 'server' field: {proxy}")
proxy_configs.append(ProxyConfig.from_dict(proxy))
else:
raise ValueError(f"Invalid proxy format: {type(proxy)}")
except Exception as e:
raise ValueError(f"Invalid proxy configuration: {str(e)}")
if not proxy_configs:
return None
# Strategy factory with optimized implementations
strategy_name = strategy_name.lower()
if strategy_name == "round_robin":
return RoundRobinProxyStrategy(proxy_configs)
elif strategy_name == "random":
return RandomProxyStrategy(proxy_configs)
elif strategy_name == "least_used":
return LeastUsedProxyStrategy(proxy_configs)
elif strategy_name == "failure_aware":
return FailureAwareProxyStrategy(
proxy_configs,
failure_threshold=failure_threshold,
recovery_time=recovery_time
)
else:
raise ValueError(
f"Unsupported proxy rotation strategy: {strategy_name}. "
f"Available: round_robin, random, least_used, failure_aware"
)
async def handle_llm_qa(url: str, query: str, config: dict) -> str:
"""Process QA using LLM with crawled content as context."""
try:
@@ -498,6 +596,10 @@ async def handle_crawl_request(
hooks_config: Optional[dict] = None,
anti_bot_strategy: str = "default",
headless: bool = True,
proxy_rotation_strategy: Optional[str] = None,
proxies: Optional[List[Dict[str, Any]]] = None,
proxy_failure_threshold: int = 3,
proxy_recovery_time: int = 300,
) -> dict:
"""Handle non-streaming crawl requests with optional hooks."""
start_mem_mb = _get_memory_mb() # <--- Get memory before
@@ -518,6 +620,19 @@ async def handle_crawl_request(
_apply_headless_setting(browser_config, headless)
crawler_config = CrawlerRunConfig.load(crawler_config)
# Configure proxy rotation strategy if specified
if proxy_rotation_strategy and proxies:
try:
proxy_strategy = _create_proxy_rotation_strategy(
proxy_rotation_strategy,
proxies,
proxy_failure_threshold,
proxy_recovery_time
)
crawler_config.proxy_rotation_strategy = proxy_strategy
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
# Configure browser adapter based on anti_bot_strategy
browser_adapter = _get_browser_adapter(anti_bot_strategy, browser_config)
@@ -643,8 +758,6 @@ async def handle_crawl_request(
if isinstance(hook_manager, UserHookManager):
try:
# Ensure all hook data is JSON serializable
import json
hook_data = {
"status": hooks_status,
"execution_log": hook_manager.execution_log,
@@ -706,6 +819,10 @@ async def handle_stream_crawl_request(
hooks_config: Optional[dict] = None,
anti_bot_strategy: str = "default",
headless: bool = True,
proxy_rotation_strategy: Optional[str] = None,
proxies: Optional[List[Dict[str, Any]]] = None,
proxy_failure_threshold: int = 3,
proxy_recovery_time: int = 300,
) -> Tuple[AsyncWebCrawler, AsyncGenerator, Optional[Dict]]:
"""Handle streaming crawl requests with optional hooks."""
hooks_info = None
@@ -718,6 +835,19 @@ async def handle_stream_crawl_request(
crawler_config.scraping_strategy = LXMLWebScrapingStrategy()
crawler_config.stream = True
# Configure proxy rotation strategy if specified
if proxy_rotation_strategy and proxies:
try:
proxy_strategy = _create_proxy_rotation_strategy(
proxy_rotation_strategy,
proxies,
proxy_failure_threshold,
proxy_recovery_time
)
crawler_config.proxy_rotation_strategy = proxy_strategy
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
# Configure browser adapter based on anti_bot_strategy
browser_adapter = _get_browser_adapter(anti_bot_strategy, browser_config)

View File

@@ -14,6 +14,20 @@ class CrawlRequest(BaseModel):
Field("default", description="The anti-bot strategy to use for the crawl.")
)
headless: bool = Field(True, description="Run the browser in headless mode.")
# Proxy rotation configuration
proxy_rotation_strategy: Optional[Literal["round_robin", "random", "least_used", "failure_aware"]] = Field(
None, description="Proxy rotation strategy to use for the crawl."
)
proxies: Optional[List[Dict[str, Any]]] = Field(
None, description="List of proxy configurations (dicts with server, username, password, etc.)"
)
proxy_failure_threshold: Optional[int] = Field(
3, ge=1, le=10, description="Failure threshold for failure_aware strategy"
)
proxy_recovery_time: Optional[int] = Field(
300, ge=60, le=3600, description="Recovery time in seconds for failure_aware strategy"
)
class HookConfig(BaseModel):

View File

@@ -607,6 +607,10 @@ async def crawl(
hooks_config=hooks_config,
anti_bot_strategy=crawl_request.anti_bot_strategy,
headless=crawl_request.headless,
proxy_rotation_strategy=crawl_request.proxy_rotation_strategy,
proxies=crawl_request.proxies,
proxy_failure_threshold=crawl_request.proxy_failure_threshold,
proxy_recovery_time=crawl_request.proxy_recovery_time,
)
# check if all of the results are not successful
if all(not result["success"] for result in results["results"]):
@@ -646,6 +650,10 @@ async def stream_process(crawl_request: CrawlRequestWithHooks):
hooks_config=hooks_config,
anti_bot_strategy=crawl_request.anti_bot_strategy,
headless=crawl_request.headless,
proxy_rotation_strategy=crawl_request.proxy_rotation_strategy,
proxies=crawl_request.proxies,
proxy_failure_threshold=crawl_request.proxy_failure_threshold,
proxy_recovery_time=crawl_request.proxy_recovery_time,
)
# Add hooks info to response headers if available