refactor(proxy): move ProxyConfig to async_configs and improve LLM token handling

Moved ProxyConfig class from proxy_strategy.py to async_configs.py for better organization.
Improved LLM token handling with new PROVIDER_MODELS_PREFIXES.
Added test cases for deep crawling and proxy rotation.
Removed docker_config from BrowserConfig as it's handled separately.

BREAKING CHANGE: ProxyConfig import path changed from crawl4ai.proxy_strategy to crawl4ai
This commit is contained in:
UncleCode
2025-04-15 22:27:18 +08:00
parent 793668a413
commit 230f22da86
12 changed files with 1232 additions and 162 deletions

View File

@@ -2,7 +2,7 @@
import warnings import warnings
from .async_webcrawler import AsyncWebCrawler, CacheMode from .async_webcrawler import AsyncWebCrawler, CacheMode
from .async_configs import BrowserConfig, CrawlerRunConfig, HTTPCrawlerConfig, LLMConfig from .async_configs import BrowserConfig, CrawlerRunConfig, HTTPCrawlerConfig, LLMConfig, ProxyConfig
from .content_scraping_strategy import ( from .content_scraping_strategy import (
ContentScrapingStrategy, ContentScrapingStrategy,
@@ -121,6 +121,7 @@ __all__ = [
"Crawl4aiDockerClient", "Crawl4aiDockerClient",
"ProxyRotationStrategy", "ProxyRotationStrategy",
"RoundRobinProxyStrategy", "RoundRobinProxyStrategy",
"ProxyConfig"
] ]

View File

@@ -5,6 +5,7 @@ from .config import (
MIN_WORD_THRESHOLD, MIN_WORD_THRESHOLD,
IMAGE_DESCRIPTION_MIN_WORD_THRESHOLD, IMAGE_DESCRIPTION_MIN_WORD_THRESHOLD,
PROVIDER_MODELS, PROVIDER_MODELS,
PROVIDER_MODELS_PREFIXES,
SCREENSHOT_HEIGHT_TRESHOLD, SCREENSHOT_HEIGHT_TRESHOLD,
PAGE_TIMEOUT, PAGE_TIMEOUT,
IMAGE_SCORE_THRESHOLD, IMAGE_SCORE_THRESHOLD,
@@ -27,11 +28,8 @@ import inspect
from typing import Any, Dict, Optional from typing import Any, Dict, Optional
from enum import Enum from enum import Enum
from .proxy_strategy import ProxyConfig # from .proxy_strategy import ProxyConfig
try:
from .browser.models import DockerConfig
except ImportError:
DockerConfig = None
def to_serializable_dict(obj: Any, ignore_default_value : bool = False) -> Dict: def to_serializable_dict(obj: Any, ignore_default_value : bool = False) -> Dict:
@@ -161,6 +159,117 @@ def is_empty_value(value: Any) -> bool:
return True return True
return False return False
class ProxyConfig:
def __init__(
self,
server: str,
username: Optional[str] = None,
password: Optional[str] = None,
ip: Optional[str] = None,
):
"""Configuration class for a single proxy.
Args:
server: Proxy server URL (e.g., "http://127.0.0.1:8080")
username: Optional username for proxy authentication
password: Optional password for proxy authentication
ip: Optional IP address for verification purposes
"""
self.server = server
self.username = username
self.password = password
# Extract IP from server if not explicitly provided
self.ip = ip or self._extract_ip_from_server()
def _extract_ip_from_server(self) -> Optional[str]:
"""Extract IP address from server URL."""
try:
# Simple extraction assuming http://ip:port format
if "://" in self.server:
parts = self.server.split("://")[1].split(":")
return parts[0]
else:
parts = self.server.split(":")
return parts[0]
except Exception:
return None
@staticmethod
def from_string(proxy_str: str) -> "ProxyConfig":
"""Create a ProxyConfig from a string in the format 'ip:port:username:password'."""
parts = proxy_str.split(":")
if len(parts) == 4: # ip:port:username:password
ip, port, username, password = parts
return ProxyConfig(
server=f"http://{ip}:{port}",
username=username,
password=password,
ip=ip
)
elif len(parts) == 2: # ip:port only
ip, port = parts
return ProxyConfig(
server=f"http://{ip}:{port}",
ip=ip
)
else:
raise ValueError(f"Invalid proxy string format: {proxy_str}")
@staticmethod
def from_dict(proxy_dict: Dict) -> "ProxyConfig":
"""Create a ProxyConfig from a dictionary."""
return ProxyConfig(
server=proxy_dict.get("server"),
username=proxy_dict.get("username"),
password=proxy_dict.get("password"),
ip=proxy_dict.get("ip")
)
@staticmethod
def from_env(env_var: str = "PROXIES") -> List["ProxyConfig"]:
"""Load proxies from environment variable.
Args:
env_var: Name of environment variable containing comma-separated proxy strings
Returns:
List of ProxyConfig objects
"""
proxies = []
try:
proxy_list = os.getenv(env_var, "").split(",")
for proxy in proxy_list:
if not proxy:
continue
proxies.append(ProxyConfig.from_string(proxy))
except Exception as e:
print(f"Error loading proxies from environment: {e}")
return proxies
def to_dict(self) -> Dict:
"""Convert to dictionary representation."""
return {
"server": self.server,
"username": self.username,
"password": self.password,
"ip": self.ip
}
def clone(self, **kwargs) -> "ProxyConfig":
"""Create a copy of this configuration with updated values.
Args:
**kwargs: Key-value pairs of configuration options to update
Returns:
ProxyConfig: A new instance with the specified updates
"""
config_dict = self.to_dict()
config_dict.update(kwargs)
return ProxyConfig.from_dict(config_dict)
class BrowserConfig: class BrowserConfig:
""" """
@@ -197,8 +306,6 @@ class BrowserConfig:
Default: None. Default: None.
proxy_config (ProxyConfig or dict or None): Detailed proxy configuration, e.g. {"server": "...", "username": "..."}. proxy_config (ProxyConfig or dict or None): Detailed proxy configuration, e.g. {"server": "...", "username": "..."}.
If None, no additional proxy config. Default: None. If None, no additional proxy config. Default: None.
docker_config (DockerConfig or dict or None): Configuration for Docker-based browser automation.
Contains settings for Docker container operation. Default: None.
viewport_width (int): Default viewport width for pages. Default: 1080. viewport_width (int): Default viewport width for pages. Default: 1080.
viewport_height (int): Default viewport height for pages. Default: 600. viewport_height (int): Default viewport height for pages. Default: 600.
viewport (dict): Default viewport dimensions for pages. If set, overrides viewport_width and viewport_height. viewport (dict): Default viewport dimensions for pages. If set, overrides viewport_width and viewport_height.
@@ -244,7 +351,6 @@ class BrowserConfig:
channel: str = "chromium", channel: str = "chromium",
proxy: str = None, proxy: str = None,
proxy_config: Union[ProxyConfig, dict, None] = None, proxy_config: Union[ProxyConfig, dict, None] = None,
docker_config: Union[DockerConfig, dict, None] = None,
viewport_width: int = 1080, viewport_width: int = 1080,
viewport_height: int = 600, viewport_height: int = 600,
viewport: dict = None, viewport: dict = None,
@@ -285,15 +391,7 @@ class BrowserConfig:
self.chrome_channel = "" self.chrome_channel = ""
self.proxy = proxy self.proxy = proxy
self.proxy_config = proxy_config self.proxy_config = proxy_config
# Handle docker configuration
if isinstance(docker_config, dict) and DockerConfig is not None:
self.docker_config = DockerConfig.from_kwargs(docker_config)
else:
self.docker_config = docker_config
if self.docker_config:
self.user_data_dir = self.docker_config.user_data_dir
self.viewport_width = viewport_width self.viewport_width = viewport_width
self.viewport_height = viewport_height self.viewport_height = viewport_height
@@ -364,7 +462,6 @@ class BrowserConfig:
channel=kwargs.get("channel", "chromium"), channel=kwargs.get("channel", "chromium"),
proxy=kwargs.get("proxy"), proxy=kwargs.get("proxy"),
proxy_config=kwargs.get("proxy_config", None), proxy_config=kwargs.get("proxy_config", None),
docker_config=kwargs.get("docker_config", None),
viewport_width=kwargs.get("viewport_width", 1080), viewport_width=kwargs.get("viewport_width", 1080),
viewport_height=kwargs.get("viewport_height", 600), viewport_height=kwargs.get("viewport_height", 600),
accept_downloads=kwargs.get("accept_downloads", False), accept_downloads=kwargs.get("accept_downloads", False),
@@ -421,13 +518,7 @@ class BrowserConfig:
"debugging_port": self.debugging_port, "debugging_port": self.debugging_port,
"host": self.host, "host": self.host,
} }
# Include docker_config if it exists
if hasattr(self, "docker_config") and self.docker_config is not None:
if hasattr(self.docker_config, "to_dict"):
result["docker_config"] = self.docker_config.to_dict()
else:
result["docker_config"] = self.docker_config
return result return result
@@ -1180,9 +1271,18 @@ class LLMConfig:
elif api_token and api_token.startswith("env:"): elif api_token and api_token.startswith("env:"):
self.api_token = os.getenv(api_token[4:]) self.api_token = os.getenv(api_token[4:])
else: else:
self.api_token = PROVIDER_MODELS.get(provider, "no-token") or os.getenv( # Check if given provider starts with any of key in PROVIDER_MODELS_PREFIXES
DEFAULT_PROVIDER_API_KEY # If not, check if it is in PROVIDER_MODELS
) prefixes = PROVIDER_MODELS_PREFIXES.keys()
if any(provider.startswith(prefix) for prefix in prefixes):
selected_prefix = next(
(prefix for prefix in prefixes if provider.startswith(prefix)),
None,
)
self.api_token = PROVIDER_MODELS_PREFIXES.get(selected_prefix)
else:
self.provider = DEFAULT_PROVIDER
self.api_token = os.getenv(DEFAULT_PROVIDER_API_KEY)
self.base_url = base_url self.base_url = base_url
self.temprature = temprature self.temprature = temprature
self.max_tokens = max_tokens self.max_tokens = max_tokens

View File

@@ -36,7 +36,7 @@ from .markdown_generation_strategy import (
) )
from .deep_crawling import DeepCrawlDecorator from .deep_crawling import DeepCrawlDecorator
from .async_logger import AsyncLogger, AsyncLoggerBase from .async_logger import AsyncLogger, AsyncLoggerBase
from .async_configs import BrowserConfig, CrawlerRunConfig from .async_configs import BrowserConfig, CrawlerRunConfig, ProxyConfig
from .async_dispatcher import * # noqa: F403 from .async_dispatcher import * # noqa: F403
from .async_dispatcher import BaseDispatcher, MemoryAdaptiveDispatcher, RateLimiter from .async_dispatcher import BaseDispatcher, MemoryAdaptiveDispatcher, RateLimiter
@@ -291,12 +291,12 @@ class AsyncWebCrawler:
# Update proxy configuration from rotation strategy if available # Update proxy configuration from rotation strategy if available
if config and config.proxy_rotation_strategy: if config and config.proxy_rotation_strategy:
next_proxy = await config.proxy_rotation_strategy.get_next_proxy() next_proxy : ProxyConfig = await config.proxy_rotation_strategy.get_next_proxy()
if next_proxy: if next_proxy:
self.logger.info( self.logger.info(
message="Switch proxy: {proxy}", message="Switch proxy: {proxy}",
tag="PROXY", tag="PROXY",
params={"proxy": next_proxy.server}, params={"proxy": next_proxy.server}
) )
config.proxy_config = next_proxy config.proxy_config = next_proxy
# config = config.clone(proxy_config=next_proxy) # config = config.clone(proxy_config=next_proxy)

View File

@@ -94,6 +94,7 @@ class ManagedBrowser:
host: str = "localhost", host: str = "localhost",
debugging_port: int = 9222, debugging_port: int = 9222,
cdp_url: Optional[str] = None, cdp_url: Optional[str] = None,
browser_config: Optional[BrowserConfig] = None,
): ):
""" """
Initialize the ManagedBrowser instance. Initialize the ManagedBrowser instance.
@@ -109,17 +110,19 @@ class ManagedBrowser:
host (str): Host for debugging the browser. Default: "localhost". host (str): Host for debugging the browser. Default: "localhost".
debugging_port (int): Port for debugging the browser. Default: 9222. debugging_port (int): Port for debugging the browser. Default: 9222.
cdp_url (str or None): CDP URL to connect to the browser. Default: None. cdp_url (str or None): CDP URL to connect to the browser. Default: None.
browser_config (BrowserConfig): Configuration object containing all browser settings. Default: None.
""" """
self.browser_type = browser_type self.browser_type = browser_config.browser_type
self.user_data_dir = user_data_dir self.user_data_dir = browser_config.user_data_dir
self.headless = headless self.headless = browser_config.headless
self.browser_process = None self.browser_process = None
self.temp_dir = None self.temp_dir = None
self.debugging_port = debugging_port self.debugging_port = browser_config.debugging_port
self.host = host self.host = browser_config.host
self.logger = logger self.logger = logger
self.shutting_down = False self.shutting_down = False
self.cdp_url = cdp_url self.cdp_url = browser_config.cdp_url
self.browser_config = browser_config
async def start(self) -> str: async def start(self) -> str:
""" """
@@ -142,6 +145,9 @@ class ManagedBrowser:
# Get browser path and args based on OS and browser type # Get browser path and args based on OS and browser type
# browser_path = self._get_browser_path() # browser_path = self._get_browser_path()
args = await self._get_browser_args() args = await self._get_browser_args()
if self.browser_config.extra_args:
args.extend(self.browser_config.extra_args)
# Start browser process # Start browser process
try: try:
@@ -477,6 +483,7 @@ class BrowserManager:
logger=self.logger, logger=self.logger,
debugging_port=self.config.debugging_port, debugging_port=self.config.debugging_port,
cdp_url=self.config.cdp_url, cdp_url=self.config.cdp_url,
browser_config=self.config,
) )
async def start(self): async def start(self):

View File

@@ -29,6 +29,14 @@ PROVIDER_MODELS = {
'gemini/gemini-2.0-flash-lite-preview-02-05': os.getenv("GEMINI_API_KEY"), 'gemini/gemini-2.0-flash-lite-preview-02-05': os.getenv("GEMINI_API_KEY"),
"deepseek/deepseek-chat": os.getenv("DEEPSEEK_API_KEY"), "deepseek/deepseek-chat": os.getenv("DEEPSEEK_API_KEY"),
} }
PROVIDER_MODELS_PREFIXES = {
"ollama": "no-token-needed", # Any model from Ollama no need for API token
"groq": os.getenv("GROQ_API_KEY"),
"openai": os.getenv("OPENAI_API_KEY"),
"anthropic": os.getenv("ANTHROPIC_API_KEY"),
"gemini": os.getenv("GEMINI_API_KEY"),
"deepseek": os.getenv("DEEPSEEK_API_KEY"),
}
# Chunk token threshold # Chunk token threshold
CHUNK_TOKEN_THRESHOLD = 2**11 # 2048 tokens CHUNK_TOKEN_THRESHOLD = 2**11 # 2048 tokens

View File

@@ -4,6 +4,9 @@ from itertools import cycle
import os import os
########### ATTENTION PEOPLE OF EARTH ###########
# I have moved this config to async_configs.py, kept it here, in case someone still importing it, however
# be a dear and follow `from crawl4ai import ProxyConfig` instead :)
class ProxyConfig: class ProxyConfig:
def __init__( def __init__(
self, self,
@@ -119,12 +122,12 @@ class ProxyRotationStrategy(ABC):
"""Base abstract class for proxy rotation strategies""" """Base abstract class for proxy rotation strategies"""
@abstractmethod @abstractmethod
async def get_next_proxy(self) -> Optional[Dict]: async def get_next_proxy(self) -> Optional[ProxyConfig]:
"""Get next proxy configuration from the strategy""" """Get next proxy configuration from the strategy"""
pass pass
@abstractmethod @abstractmethod
def add_proxies(self, proxies: List[Dict]): def add_proxies(self, proxies: List[ProxyConfig]):
"""Add proxy configurations to the strategy""" """Add proxy configurations to the strategy"""
pass pass

View File

@@ -9,83 +9,44 @@ from urllib.parse import urlparse
import OpenSSL.crypto import OpenSSL.crypto
from pathlib import Path from pathlib import Path
# === Inherit from dict ===
class SSLCertificate: class SSLCertificate(dict):
""" """
A class representing an SSL certificate with methods to export in various formats. A class representing an SSL certificate, behaving like a dictionary
for direct JSON serialization. It stores the certificate information internally
and provides methods for export and property access.
Attributes: Inherits from dict, so instances are directly JSON serializable.
cert_info (Dict[str, Any]): The certificate information.
Methods:
from_url(url: str, timeout: int = 10) -> Optional['SSLCertificate']: Create SSLCertificate instance from a URL.
from_file(file_path: str) -> Optional['SSLCertificate']: Create SSLCertificate instance from a file.
from_binary(binary_data: bytes) -> Optional['SSLCertificate']: Create SSLCertificate instance from binary data.
export_as_pem() -> str: Export the certificate as PEM format.
export_as_der() -> bytes: Export the certificate as DER format.
export_as_json() -> Dict[str, Any]: Export the certificate as JSON format.
export_as_text() -> str: Export the certificate as text format.
""" """
# Use __slots__ for potential memory optimization if desired, though less common when inheriting dict
# __slots__ = ("_cert_info",) # If using slots, be careful with dict inheritance interaction
def __init__(self, cert_info: Dict[str, Any]): def __init__(self, cert_info: Dict[str, Any]):
self._cert_info = self._decode_cert_data(cert_info)
@staticmethod
def from_url(url: str, timeout: int = 10) -> Optional["SSLCertificate"]:
""" """
Create SSLCertificate instance from a URL. Initializes the SSLCertificate object.
Args: Args:
url (str): URL of the website. cert_info (Dict[str, Any]): The raw certificate dictionary.
timeout (int): Timeout for the connection (default: 10).
Returns:
Optional[SSLCertificate]: SSLCertificate instance if successful, None otherwise.
""" """
try: # 1. Decode the data (handle bytes -> str)
hostname = urlparse(url).netloc decoded_info = self._decode_cert_data(cert_info)
if ":" in hostname:
hostname = hostname.split(":")[0]
context = ssl.create_default_context() # 2. Store the decoded info internally (optional but good practice)
with socket.create_connection((hostname, 443), timeout=timeout) as sock: # self._cert_info = decoded_info # You can keep this if methods rely on it
with context.wrap_socket(sock, server_hostname=hostname) as ssock:
cert_binary = ssock.getpeercert(binary_form=True)
x509 = OpenSSL.crypto.load_certificate(
OpenSSL.crypto.FILETYPE_ASN1, cert_binary
)
cert_info = { # 3. Initialize the dictionary part of the object with the decoded data
"subject": dict(x509.get_subject().get_components()), super().__init__(decoded_info)
"issuer": dict(x509.get_issuer().get_components()),
"version": x509.get_version(),
"serial_number": hex(x509.get_serial_number()),
"not_before": x509.get_notBefore(),
"not_after": x509.get_notAfter(),
"fingerprint": x509.digest("sha256").hex(),
"signature_algorithm": x509.get_signature_algorithm(),
"raw_cert": base64.b64encode(cert_binary),
}
# Add extensions
extensions = []
for i in range(x509.get_extension_count()):
ext = x509.get_extension(i)
extensions.append(
{"name": ext.get_short_name(), "value": str(ext)}
)
cert_info["extensions"] = extensions
return SSLCertificate(cert_info)
except Exception:
return None
@staticmethod @staticmethod
def _decode_cert_data(data: Any) -> Any: def _decode_cert_data(data: Any) -> Any:
"""Helper method to decode bytes in certificate data.""" """Helper method to decode bytes in certificate data."""
if isinstance(data, bytes): if isinstance(data, bytes):
return data.decode("utf-8") try:
# Try UTF-8 first, fallback to latin-1 for arbitrary bytes
return data.decode("utf-8")
except UnicodeDecodeError:
return data.decode("latin-1") # Or handle as needed, maybe hex representation
elif isinstance(data, dict): elif isinstance(data, dict):
return { return {
( (
@@ -97,36 +58,119 @@ class SSLCertificate:
return [SSLCertificate._decode_cert_data(item) for item in data] return [SSLCertificate._decode_cert_data(item) for item in data]
return data return data
@staticmethod
def from_url(url: str, timeout: int = 10) -> Optional["SSLCertificate"]:
"""
Create SSLCertificate instance from a URL. Fetches cert info and initializes.
(Fetching logic remains the same)
"""
cert_info_raw = None # Variable to hold the fetched dict
try:
hostname = urlparse(url).netloc
if ":" in hostname:
hostname = hostname.split(":")[0]
context = ssl.create_default_context()
# Set check_hostname to False and verify_mode to CERT_NONE temporarily
# for potentially problematic certificates during fetch, but parse the result regardless.
# context.check_hostname = False
# context.verify_mode = ssl.CERT_NONE
with socket.create_connection((hostname, 443), timeout=timeout) as sock:
with context.wrap_socket(sock, server_hostname=hostname) as ssock:
cert_binary = ssock.getpeercert(binary_form=True)
if not cert_binary:
print(f"Warning: No certificate returned for {hostname}")
return None
x509 = OpenSSL.crypto.load_certificate(
OpenSSL.crypto.FILETYPE_ASN1, cert_binary
)
# Create the dictionary directly
cert_info_raw = {
"subject": dict(x509.get_subject().get_components()),
"issuer": dict(x509.get_issuer().get_components()),
"version": x509.get_version(),
"serial_number": hex(x509.get_serial_number()),
"not_before": x509.get_notBefore(), # Keep as bytes initially, _decode handles it
"not_after": x509.get_notAfter(), # Keep as bytes initially
"fingerprint": x509.digest("sha256").hex(), # hex() is already string
"signature_algorithm": x509.get_signature_algorithm(), # Keep as bytes
"raw_cert": base64.b64encode(cert_binary), # Base64 is bytes, _decode handles it
}
# Add extensions
extensions = []
for i in range(x509.get_extension_count()):
ext = x509.get_extension(i)
# get_short_name() returns bytes, str(ext) handles value conversion
extensions.append(
{"name": ext.get_short_name(), "value": str(ext)}
)
cert_info_raw["extensions"] = extensions
except ssl.SSLCertVerificationError as e:
print(f"SSL Verification Error for {url}: {e}")
# Decide if you want to proceed or return None based on your needs
# You might try fetching without verification here if needed, but be cautious.
return None
except socket.gaierror:
print(f"Could not resolve hostname: {hostname}")
return None
except socket.timeout:
print(f"Connection timed out for {url}")
return None
except Exception as e:
print(f"Error fetching/processing certificate for {url}: {e}")
# Log the full error details if needed: logging.exception("Cert fetch error")
return None
# If successful, create the SSLCertificate instance from the dictionary
if cert_info_raw:
return SSLCertificate(cert_info_raw)
else:
return None
# --- Properties now access the dictionary items directly via self[] ---
@property
def issuer(self) -> Dict[str, str]:
return self.get("issuer", {}) # Use self.get for safety
@property
def subject(self) -> Dict[str, str]:
return self.get("subject", {})
@property
def valid_from(self) -> str:
return self.get("not_before", "")
@property
def valid_until(self) -> str:
return self.get("not_after", "")
@property
def fingerprint(self) -> str:
return self.get("fingerprint", "")
# --- Export methods can use `self` directly as it is the dict ---
def to_json(self, filepath: Optional[str] = None) -> Optional[str]: def to_json(self, filepath: Optional[str] = None) -> Optional[str]:
""" """Export certificate as JSON."""
Export certificate as JSON. # `self` is already the dictionary we want to serialize
json_str = json.dumps(self, indent=2, ensure_ascii=False)
Args:
filepath (Optional[str]): Path to save the JSON file (default: None).
Returns:
Optional[str]: JSON string if successful, None otherwise.
"""
json_str = json.dumps(self._cert_info, indent=2, ensure_ascii=False)
if filepath: if filepath:
Path(filepath).write_text(json_str, encoding="utf-8") Path(filepath).write_text(json_str, encoding="utf-8")
return None return None
return json_str return json_str
def to_pem(self, filepath: Optional[str] = None) -> Optional[str]: def to_pem(self, filepath: Optional[str] = None) -> Optional[str]:
""" """Export certificate as PEM."""
Export certificate as PEM.
Args:
filepath (Optional[str]): Path to save the PEM file (default: None).
Returns:
Optional[str]: PEM string if successful, None otherwise.
"""
try: try:
# Decode the raw_cert (which should be string due to _decode)
raw_cert_bytes = base64.b64decode(self.get("raw_cert", ""))
x509 = OpenSSL.crypto.load_certificate( x509 = OpenSSL.crypto.load_certificate(
OpenSSL.crypto.FILETYPE_ASN1, OpenSSL.crypto.FILETYPE_ASN1, raw_cert_bytes
base64.b64decode(self._cert_info["raw_cert"]),
) )
pem_data = OpenSSL.crypto.dump_certificate( pem_data = OpenSSL.crypto.dump_certificate(
OpenSSL.crypto.FILETYPE_PEM, x509 OpenSSL.crypto.FILETYPE_PEM, x509
@@ -136,49 +180,25 @@ class SSLCertificate:
Path(filepath).write_text(pem_data, encoding="utf-8") Path(filepath).write_text(pem_data, encoding="utf-8")
return None return None
return pem_data return pem_data
except Exception: except Exception as e:
return None print(f"Error converting to PEM: {e}")
return None
def to_der(self, filepath: Optional[str] = None) -> Optional[bytes]: def to_der(self, filepath: Optional[str] = None) -> Optional[bytes]:
""" """Export certificate as DER."""
Export certificate as DER.
Args:
filepath (Optional[str]): Path to save the DER file (default: None).
Returns:
Optional[bytes]: DER bytes if successful, None otherwise.
"""
try: try:
der_data = base64.b64decode(self._cert_info["raw_cert"]) # Decode the raw_cert (which should be string due to _decode)
der_data = base64.b64decode(self.get("raw_cert", ""))
if filepath: if filepath:
Path(filepath).write_bytes(der_data) Path(filepath).write_bytes(der_data)
return None return None
return der_data return der_data
except Exception: except Exception as e:
return None print(f"Error converting to DER: {e}")
return None
@property # Optional: Add __repr__ for better debugging
def issuer(self) -> Dict[str, str]: def __repr__(self) -> str:
"""Get certificate issuer information.""" subject_cn = self.subject.get('CN', 'N/A')
return self._cert_info.get("issuer", {}) issuer_cn = self.issuer.get('CN', 'N/A')
return f"<SSLCertificate Subject='{subject_cn}' Issuer='{issuer_cn}'>"
@property
def subject(self) -> Dict[str, str]:
"""Get certificate subject information."""
return self._cert_info.get("subject", {})
@property
def valid_from(self) -> str:
"""Get certificate validity start date."""
return self._cert_info.get("not_before", "")
@property
def valid_until(self) -> str:
"""Get certificate validity end date."""
return self._cert_info.get("not_after", "")
@property
def fingerprint(self) -> str:
"""Get certificate fingerprint."""
return self._cert_info.get("fingerprint", "")

View File

@@ -4,7 +4,7 @@ import json
import base64 import base64
from pathlib import Path from pathlib import Path
from typing import List from typing import List
from crawl4ai.proxy_strategy import ProxyConfig from crawl4ai import ProxyConfig
from crawl4ai import AsyncWebCrawler, CrawlerRunConfig, CacheMode, CrawlResult from crawl4ai import AsyncWebCrawler, CrawlerRunConfig, CacheMode, CrawlResult
from crawl4ai import RoundRobinProxyStrategy from crawl4ai import RoundRobinProxyStrategy

View File

@@ -13,7 +13,7 @@ from crawl4ai.deep_crawling import (
) )
from crawl4ai.deep_crawling.scorers import KeywordRelevanceScorer from crawl4ai.deep_crawling.scorers import KeywordRelevanceScorer
from crawl4ai.async_crawler_strategy import AsyncHTTPCrawlerStrategy from crawl4ai.async_crawler_strategy import AsyncHTTPCrawlerStrategy
from crawl4ai.proxy_strategy import ProxyConfig from crawl4ai import ProxyConfig
from crawl4ai import RoundRobinProxyStrategy from crawl4ai import RoundRobinProxyStrategy
from crawl4ai.content_filter_strategy import LLMContentFilter from crawl4ai.content_filter_strategy import LLMContentFilter
from crawl4ai import DefaultMarkdownGenerator from crawl4ai import DefaultMarkdownGenerator

View File

@@ -251,7 +251,7 @@ from crawl4ai import (
RoundRobinProxyStrategy, RoundRobinProxyStrategy,
) )
import asyncio import asyncio
from crawl4ai.proxy_strategy import ProxyConfig from crawl4ai import ProxyConfig
async def main(): async def main():
# Load proxies and create rotation strategy # Load proxies and create rotation strategy
proxies = ProxyConfig.from_env() proxies = ProxyConfig.from_env()

View File

@@ -0,0 +1,596 @@
# ==== File: test_rest_api_deep_crawl.py ====
import pytest
import pytest_asyncio
import httpx
import json
import asyncio
import os
from typing import List, Dict, Any, AsyncGenerator
from dotenv import load_dotenv
load_dotenv() # Load environment variables from .env file if present
# --- Test Configuration ---
BASE_URL = os.getenv("CRAWL4AI_TEST_URL", "http://localhost:11235") # Ensure this points to your running server
BASE_URL = os.getenv("CRAWL4AI_TEST_URL", "http://localhost:8020") # Ensure this points to your running server
DEEP_CRAWL_BASE_URL = "https://docs.crawl4ai.com/samples/deepcrawl/"
DEEP_CRAWL_DOMAIN = "docs.crawl4ai.com" # Used for domain filter
# --- Helper Functions ---
def load_proxies_from_env() -> List[Dict]:
"""Load proxies from PROXIES environment variable"""
proxies = []
proxies_str = os.getenv("PROXIES", "")
if not proxies_str:
print("PROXIES environment variable not set or empty.")
return proxies
try:
proxy_list = proxies_str.split(",")
for proxy in proxy_list:
proxy = proxy.strip()
if not proxy:
continue
parts = proxy.split(":")
if len(parts) == 4:
ip, port, username, password = parts
proxies.append({
"server": f"http://{ip}:{port}", # Assuming http, adjust if needed
"username": username,
"password": password,
"ip": ip # Store original IP if available
})
elif len(parts) == 2: # ip:port only
ip, port = parts
proxies.append({
"server": f"http://{ip}:{port}",
"ip": ip
})
else:
print(f"Skipping invalid proxy string format: {proxy}")
except Exception as e:
print(f"Error loading proxies from environment: {e}")
return proxies
async def check_server_health(client: httpx.AsyncClient):
"""Check if the server is healthy before running tests."""
try:
response = await client.get("/health")
response.raise_for_status()
print(f"\nServer healthy: {response.json()}")
return True
except (httpx.RequestError, httpx.HTTPStatusError) as e:
pytest.fail(f"Server health check failed: {e}. Is the server running at {BASE_URL}?", pytrace=False)
async def assert_crawl_result_structure(result: Dict[str, Any], check_ssl=False):
"""Asserts the basic structure of a single crawl result."""
assert isinstance(result, dict)
assert "url" in result
assert "success" in result
assert "html" in result # Basic crawls should return HTML
assert "metadata" in result
assert isinstance(result["metadata"], dict)
assert "depth" in result["metadata"] # Deep crawls add depth
if check_ssl:
assert "ssl_certificate" in result # Check if SSL info is present
assert isinstance(result["ssl_certificate"], dict) or result["ssl_certificate"] is None
async def process_streaming_response(response: httpx.Response) -> List[Dict[str, Any]]:
"""Processes an NDJSON streaming response."""
results = []
completed = False
async for line in response.aiter_lines():
if line:
try:
data = json.loads(line)
if data.get("status") == "completed":
completed = True
break # Stop processing after completion marker
elif data.get("url"): # Ensure it looks like a result object
results.append(data)
else:
print(f"Received non-result JSON line: {data}") # Log other status messages if needed
except json.JSONDecodeError:
pytest.fail(f"Failed to decode JSON line: {line}")
assert completed, "Streaming response did not end with a completion marker."
return results
# --- Pytest Fixtures ---
@pytest_asyncio.fixture(scope="function")
async def async_client() -> AsyncGenerator[httpx.AsyncClient, None]:
"""Provides an async HTTP client"""
# Increased timeout for potentially longer deep crawls
async with httpx.AsyncClient(base_url=BASE_URL, timeout=300.0) as client:
yield client
# No explicit close needed with 'async with'
# --- Test Class ---
@pytest.mark.asyncio
class TestDeepCrawlEndpoints:
@pytest_asyncio.fixture(autouse=True)
async def check_health_before_tests(self, async_client: httpx.AsyncClient):
"""Fixture to ensure server is healthy before each test in the class."""
await check_server_health(async_client)
# 1. Basic Deep Crawl
# async def test_deep_crawl_basic_bfs(self, async_client: httpx.AsyncClient):
# """Test BFS deep crawl with limited depth and pages."""
# max_depth = 1
# max_pages = 3 # start_url + 2 more
# payload = {
# "urls": [DEEP_CRAWL_BASE_URL],
# "browser_config": {"type": "BrowserConfig", "params": {"headless": True}},
# "crawler_config": {
# "type": "CrawlerRunConfig",
# "params": {
# "stream": False,
# "cache_mode": "BYPASS", # Use string value for CacheMode
# "deep_crawl_strategy": {
# "type": "BFSDeepCrawlStrategy",
# "params": {
# "max_depth": max_depth,
# "max_pages": max_pages,
# # Minimal filters for basic test
# "filter_chain": {
# "type": "FilterChain",
# "params": {
# "filters": [
# {
# "type": "DomainFilter",
# "params": {"allowed_domains": [DEEP_CRAWL_DOMAIN]}
# }
# ]
# }
# }
# }
# }
# }
# }
# }
# response = await async_client.post("/crawl", json=payload)
# response.raise_for_status()
# data = response.json()
# assert data["success"] is True
# assert isinstance(data["results"], list)
# assert len(data["results"]) > 1 # Should be more than just the start URL
# assert len(data["results"]) <= max_pages # Respect max_pages
# found_depth_0 = False
# found_depth_1 = False
# for result in data["results"]:
# await assert_crawl_result_structure(result)
# assert result["success"] is True
# assert DEEP_CRAWL_DOMAIN in result["url"]
# depth = result["metadata"]["depth"]
# assert depth <= max_depth
# if depth == 0: found_depth_0 = True
# if depth == 1: found_depth_1 = True
# assert found_depth_0
# assert found_depth_1
# # 2. Deep Crawl with Filtering
# async def test_deep_crawl_with_filters(self, async_client: httpx.AsyncClient):
# """Test BFS deep crawl with content type and domain filters."""
# max_depth = 1
# max_pages = 5
# payload = {
# "urls": [DEEP_CRAWL_BASE_URL],
# "browser_config": {"type": "BrowserConfig", "params": {"headless": True}},
# "crawler_config": {
# "type": "CrawlerRunConfig",
# "params": {
# "stream": False,
# "cache_mode": "BYPASS",
# "deep_crawl_strategy": {
# "type": "BFSDeepCrawlStrategy",
# "params": {
# "max_depth": max_depth,
# "max_pages": max_pages,
# "filter_chain": {
# "type": "FilterChain",
# "params": {
# "filters": [
# {
# "type": "DomainFilter",
# "params": {"allowed_domains": [DEEP_CRAWL_DOMAIN]}
# },
# {
# "type": "ContentTypeFilter",
# "params": {"allowed_types": ["text/html"]}
# },
# # Example: Exclude specific paths using regex
# {
# "type": "URLPatternFilter",
# "params": {
# "patterns": ["*/category-3/*"], # Block category 3
# "reverse": True # Block if match
# }
# }
# ]
# }
# }
# }
# }
# }
# }
# }
# response = await async_client.post("/crawl", json=payload)
# response.raise_for_status()
# data = response.json()
# assert data["success"] is True
# assert len(data["results"]) > 0
# assert len(data["results"]) <= max_pages
# for result in data["results"]:
# await assert_crawl_result_structure(result)
# assert result["success"] is True
# assert DEEP_CRAWL_DOMAIN in result["url"]
# assert "category-3" not in result["url"] # Check if filter worked
# assert result["metadata"]["depth"] <= max_depth
# # 3. Deep Crawl with Scoring
# async def test_deep_crawl_with_scoring(self, async_client: httpx.AsyncClient):
# """Test BFS deep crawl with URL scoring."""
# max_depth = 1
# max_pages = 4
# payload = {
# "urls": [DEEP_CRAWL_BASE_URL],
# "browser_config": {"type": "BrowserConfig", "params": {"headless": True}},
# "crawler_config": {
# "type": "CrawlerRunConfig",
# "params": {
# "stream": False,
# "cache_mode": "BYPASS",
# "deep_crawl_strategy": {
# "type": "BFSDeepCrawlStrategy",
# "params": {
# "max_depth": max_depth,
# "max_pages": max_pages,
# "filter_chain": { # Keep basic domain filter
# "type": "FilterChain",
# "params": { "filters": [{"type": "DomainFilter", "params": {"allowed_domains": [DEEP_CRAWL_DOMAIN]}}]}
# },
# "url_scorer": { # Add scorer
# "type": "CompositeScorer",
# "params": {
# "scorers": [
# { # Favor pages with 'product' in the URL
# "type": "KeywordRelevanceScorer",
# "params": {"keywords": ["product"], "weight": 1.0}
# },
# { # Penalize deep paths slightly
# "type": "PathDepthScorer",
# "params": {"optimal_depth": 2, "weight": -0.2}
# }
# ]
# }
# },
# # Set a threshold if needed: "score_threshold": 0.1
# }
# }
# }
# }
# }
# response = await async_client.post("/crawl", json=payload)
# response.raise_for_status()
# data = response.json()
# assert data["success"] is True
# assert len(data["results"]) > 0
# assert len(data["results"]) <= max_pages
# # Check if results seem biased towards products (harder to assert strictly without knowing exact scores)
# product_urls_found = any("product_" in result["url"] for result in data["results"] if result["metadata"]["depth"] > 0)
# print(f"Product URLs found among depth > 0 results: {product_urls_found}")
# # We expect scoring to prioritize product pages if available within limits
# # assert product_urls_found # This might be too strict depending on site structure and limits
# for result in data["results"]:
# await assert_crawl_result_structure(result)
# assert result["success"] is True
# assert result["metadata"]["depth"] <= max_depth
# # 4. Deep Crawl with CSS Extraction
# async def test_deep_crawl_with_css_extraction(self, async_client: httpx.AsyncClient):
# """Test BFS deep crawl combined with JsonCssExtractionStrategy."""
# max_depth = 6 # Go deep enough to reach product pages
# max_pages = 20
# # Schema to extract product details
# product_schema = {
# "name": "ProductDetails",
# "baseSelector": "div.container", # Base for product page
# "fields": [
# {"name": "product_title", "selector": "h1", "type": "text"},
# {"name": "price", "selector": ".product-price", "type": "text"},
# {"name": "description", "selector": ".product-description p", "type": "text"},
# {"name": "specs", "selector": ".product-specs li", "type": "list", "fields":[
# {"name": "spec_name", "selector": ".spec-name", "type": "text"},
# {"name": "spec_value", "selector": ".spec-value", "type": "text"}
# ]}
# ]
# }
# payload = {
# "urls": [DEEP_CRAWL_BASE_URL],
# "browser_config": {"type": "BrowserConfig", "params": {"headless": True}},
# "crawler_config": {
# "type": "CrawlerRunConfig",
# "params": {
# "stream": False,
# "cache_mode": "BYPASS",
# "extraction_strategy": { # Apply extraction to ALL crawled pages
# "type": "JsonCssExtractionStrategy",
# "params": {"schema": {"type": "dict", "value": product_schema}}
# },
# "deep_crawl_strategy": {
# "type": "BFSDeepCrawlStrategy",
# "params": {
# "max_depth": max_depth,
# "max_pages": max_pages,
# "filter_chain": { # Only crawl HTML on our domain
# "type": "FilterChain",
# "params": {
# "filters": [
# {"type": "DomainFilter", "params": {"allowed_domains": [DEEP_CRAWL_DOMAIN]}},
# {"type": "ContentTypeFilter", "params": {"allowed_types": ["text/html"]}}
# ]
# }
# }
# # Optional: Add scoring to prioritize product pages for extraction
# }
# }
# }
# }
# }
# response = await async_client.post("/crawl", json=payload)
# response.raise_for_status()
# data = response.json()
# assert data["success"] is True
# assert len(data["results"]) > 0
# # assert len(data["results"]) <= max_pages
# found_extracted_product = False
# for result in data["results"]:
# await assert_crawl_result_structure(result)
# assert result["success"] is True
# assert "extracted_content" in result
# if "product_" in result["url"]: # Check product pages specifically
# assert result["extracted_content"] is not None
# try:
# extracted = json.loads(result["extracted_content"])
# # Schema returns list even if one base match
# assert isinstance(extracted, list)
# if extracted:
# item = extracted[0]
# assert "product_title" in item and item["product_title"]
# assert "price" in item and item["price"]
# # Specs might be empty list if not found
# assert "specs" in item and isinstance(item["specs"], list)
# found_extracted_product = True
# print(f"Extracted product: {item.get('product_title')}")
# except (json.JSONDecodeError, AssertionError, IndexError) as e:
# pytest.fail(f"Extraction validation failed for {result['url']}: {e}\nContent: {result['extracted_content']}")
# # else:
# # # Non-product pages might have None or empty list depending on schema match
# # assert result["extracted_content"] is None or json.loads(result["extracted_content"]) == []
# assert found_extracted_product, "Did not find any pages where product data was successfully extracted."
# # 5. Deep Crawl with LLM Extraction (Requires Server LLM Setup)
# async def test_deep_crawl_with_llm_extraction(self, async_client: httpx.AsyncClient):
# """Test BFS deep crawl combined with LLMExtractionStrategy."""
# max_depth = 1 # Limit depth to keep LLM calls manageable
# max_pages = 3
# payload = {
# "urls": [DEEP_CRAWL_BASE_URL],
# "browser_config": {"type": "BrowserConfig", "params": {"headless": True}},
# "crawler_config": {
# "type": "CrawlerRunConfig",
# "params": {
# "stream": False,
# "cache_mode": "BYPASS",
# "extraction_strategy": { # Apply LLM extraction to crawled pages
# "type": "LLMExtractionStrategy",
# "params": {
# "instruction": "Extract the main H1 title and the text content of the first paragraph.",
# "llm_config": { # Example override, rely on server default if possible
# "type": "LLMConfig",
# "params": {"provider": "openai/gpt-4.1-mini"} # Use a cheaper model for testing
# },
# "schema": { # Expected JSON output
# "type": "dict",
# "value": {
# "title": "PageContent", "type": "object",
# "properties": {
# "h1_title": {"type": "string"},
# "first_paragraph": {"type": "string"}
# }
# }
# }
# }
# },
# "deep_crawl_strategy": {
# "type": "BFSDeepCrawlStrategy",
# "params": {
# "max_depth": max_depth,
# "max_pages": max_pages,
# "filter_chain": {
# "type": "FilterChain",
# "params": {
# "filters": [
# {"type": "DomainFilter", "params": {"allowed_domains": [DEEP_CRAWL_DOMAIN]}},
# {"type": "ContentTypeFilter", "params": {"allowed_types": ["text/html"]}}
# ]
# }
# }
# }
# }
# }
# }
# }
# try:
# response = await async_client.post("/crawl", json=payload)
# response.raise_for_status()
# data = response.json()
# except httpx.HTTPStatusError as e:
# pytest.fail(f"Deep Crawl + LLM extraction request failed: {e}. Response: {e.response.text}. Check server logs and LLM API key setup.")
# except httpx.RequestError as e:
# pytest.fail(f"Deep Crawl + LLM extraction request failed: {e}.")
# assert data["success"] is True
# assert len(data["results"]) > 0
# assert len(data["results"]) <= max_pages
# found_llm_extraction = False
# for result in data["results"]:
# await assert_crawl_result_structure(result)
# assert result["success"] is True
# assert "extracted_content" in result
# assert result["extracted_content"] is not None
# try:
# extracted = json.loads(result["extracted_content"])
# if isinstance(extracted, list): extracted = extracted[0] # Handle list output
# assert isinstance(extracted, dict)
# assert "h1_title" in extracted # Check keys based on schema
# assert "first_paragraph" in extracted
# found_llm_extraction = True
# print(f"LLM extracted from {result['url']}: Title='{extracted.get('h1_title')}'")
# except (json.JSONDecodeError, AssertionError, IndexError, TypeError) as e:
# pytest.fail(f"LLM extraction validation failed for {result['url']}: {e}\nContent: {result['extracted_content']}")
# assert found_llm_extraction, "LLM extraction did not yield expected data on any crawled page."
# # 6. Deep Crawl with SSL Certificate Fetching
# async def test_deep_crawl_with_ssl(self, async_client: httpx.AsyncClient):
# """Test BFS deep crawl with fetch_ssl_certificate enabled."""
# max_depth = 0 # Only fetch for start URL to keep test fast
# max_pages = 1
# payload = {
# "urls": [DEEP_CRAWL_BASE_URL],
# "browser_config": {"type": "BrowserConfig", "params": {"headless": True}},
# "crawler_config": {
# "type": "CrawlerRunConfig",
# "params": {
# "stream": False,
# "cache_mode": "BYPASS",
# "fetch_ssl_certificate": True, # <-- Enable SSL fetching
# "deep_crawl_strategy": {
# "type": "BFSDeepCrawlStrategy",
# "params": {
# "max_depth": max_depth,
# "max_pages": max_pages,
# }
# }
# }
# }
# }
# response = await async_client.post("/crawl", json=payload)
# response.raise_for_status()
# data = response.json()
# assert data["success"] is True
# assert len(data["results"]) == 1
# result = data["results"][0]
# await assert_crawl_result_structure(result, check_ssl=True) # <-- Tell helper to check SSL field
# assert result["success"] is True
# # Check if SSL info was actually retrieved
# if result["ssl_certificate"]:
# # Assert directly using dictionary keys
# assert isinstance(result["ssl_certificate"], dict) # Verify it's a dict
# assert "issuer" in result["ssl_certificate"]
# assert "subject" in result["ssl_certificate"]
# # --- MODIFIED ASSERTIONS ---
# assert "not_before" in result["ssl_certificate"] # Check for the actual key
# assert "not_after" in result["ssl_certificate"] # Check for the actual key
# # --- END MODIFICATIONS ---
# assert "fingerprint" in result["ssl_certificate"] # Check another key
# # This print statement using .get() already works correctly with dictionaries
# print(f"SSL Issuer Org: {result['ssl_certificate'].get('issuer', {}).get('O', 'N/A')}")
# print(f"SSL Valid From: {result['ssl_certificate'].get('not_before', 'N/A')}")
# else:
# # This part remains the same
# print("SSL Certificate was null in the result.")
# 7. Deep Crawl with Proxy Rotation (Requires PROXIES env var)
async def test_deep_crawl_with_proxies(self, async_client: httpx.AsyncClient):
"""Test BFS deep crawl using proxy rotation."""
proxies = load_proxies_from_env()
if not proxies:
pytest.skip("Skipping proxy test: PROXIES environment variable not set or empty.")
print(f"\nTesting with {len(proxies)} proxies loaded from environment.")
max_depth = 1
max_pages = 3
payload = {
"urls": [DEEP_CRAWL_BASE_URL], # Use the dummy site
# Use a BrowserConfig that *might* pick up proxy if set, but rely on CrawlerRunConfig
"browser_config": {"type": "BrowserConfig", "params": {"headless": True}},
"crawler_config": {
"type": "CrawlerRunConfig",
"params": {
"stream": False,
"cache_mode": "BYPASS",
"proxy_rotation_strategy": { # <-- Define the strategy
"type": "RoundRobinProxyStrategy",
"params": {
# Convert ProxyConfig dicts back to the serialized format expected by server
"proxies": [{"type": "ProxyConfig", "params": p} for p in proxies]
}
},
"deep_crawl_strategy": {
"type": "BFSDeepCrawlStrategy",
"params": {
"max_depth": max_depth,
"max_pages": max_pages,
"filter_chain": {
"type": "FilterChain",
"params": { "filters": [{"type": "DomainFilter", "params": {"allowed_domains": [DEEP_CRAWL_DOMAIN]}}]}
}
}
}
}
}
}
try:
response = await async_client.post("/crawl", json=payload)
response.raise_for_status()
data = response.json()
except httpx.HTTPStatusError as e:
# Proxies often cause connection errors, catch them
pytest.fail(f"Proxy deep crawl failed: {e}. Response: {e.response.text}. Are proxies valid and accessible by the server?")
except httpx.RequestError as e:
pytest.fail(f"Proxy deep crawl request failed: {e}. Are proxies valid and accessible?")
assert data["success"] is True
assert len(data["results"]) > 0
assert len(data["results"]) <= max_pages
# Primary assertion is that the crawl succeeded *with* proxy config
print(f"Proxy deep crawl completed successfully for {len(data['results'])} pages.")
# Verifying specific proxy usage requires server logs or custom headers/responses
# --- Main Execution Block (for running script directly) ---
if __name__ == "__main__":
pytest_args = ["-v", "-s", __file__]
# Example: Run only proxy test
# pytest_args.append("-k test_deep_crawl_with_proxies")
print(f"Running pytest with args: {pytest_args}")
exit_code = pytest.main(pytest_args)
print(f"Pytest finished with exit code: {exit_code}")

View File

@@ -0,0 +1,335 @@
# ==== File: build_dummy_site.py ====
import os
import random
import argparse
from pathlib import Path
from urllib.parse import quote
# --- Configuration ---
NUM_CATEGORIES = 3
NUM_SUBCATEGORIES_PER_CAT = 2 # Results in NUM_CATEGORIES * NUM_SUBCATEGORIES_PER_CAT total L2 categories
NUM_PRODUCTS_PER_SUBCAT = 5 # Products listed on L3 pages
MAX_DEPTH_TARGET = 5 # Explicitly set target depth
# --- Helper Functions ---
def generate_lorem(words=20):
"""Generates simple placeholder text."""
lorem_words = ["lorem", "ipsum", "dolor", "sit", "amet", "consectetur",
"adipiscing", "elit", "sed", "do", "eiusmod", "tempor",
"incididunt", "ut", "labore", "et", "dolore", "magna", "aliqua"]
return " ".join(random.choice(lorem_words) for _ in range(words)).capitalize() + "."
def create_html_page(filepath: Path, title: str, body_content: str, breadcrumbs: list = [], head_extras: str = ""):
"""Creates an HTML file with basic structure and inline CSS."""
os.makedirs(filepath.parent, exist_ok=True)
# Generate breadcrumb HTML using the 'link' provided in the breadcrumbs list
breadcrumb_html = ""
if breadcrumbs:
links_html = " » ".join(f'<a href="{bc["link"]}">{bc["name"]}</a>' for bc in breadcrumbs)
breadcrumb_html = f"<nav class='breadcrumbs'>{links_html} » {title}</nav>"
# Basic CSS for structure identification (kept the same)
css = """
<style>
body {
font-family: sans-serif;
padding: 20px;
background-color: #1e1e1e;
color: #d1d1d1;
}
.container {
max-width: 960px;
margin: auto;
background: #2c2c2c;
padding: 20px;
border-radius: 5px;
box-shadow: 0 2px 5px rgba(0, 0, 0, 0.5);
}
h1, h2 {
color: #ccc;
}
a {
color: #9bcdff;
text-decoration: none;
}
a:hover {
text-decoration: underline;
}
ul {
list-style: none;
padding-left: 0;
}
li {
margin-bottom: 10px;
}
.category-link,
.subcategory-link,
.product-link,
.details-link,
.reviews-link {
display: block;
padding: 8px;
background-color: #3a3a3a;
border-radius: 3px;
}
.product-preview {
border: 1px solid #444;
padding: 10px;
margin-bottom: 10px;
border-radius: 4px;
background-color: #2a2a2a;
}
.product-title {
color: #d1d1d1;
}
.product-price {
font-weight: bold;
color: #85e085;
}
.product-description,
.product-specs,
.product-reviews {
margin-top: 15px;
line-height: 1.6;
}
.product-specs li {
margin-bottom: 5px;
font-size: 0.9em;
}
.spec-name {
font-weight: bold;
}
.breadcrumbs {
margin-bottom: 20px;
font-size: 0.9em;
color: #888;
}
.breadcrumbs a {
color: #9bcdff;
}
</style>
"""
html_content = f"""<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>{title} - FakeShop</title>
{head_extras}
{css}
</head>
<body>
<div class="container">
{breadcrumb_html}
<h1>{title}</h1>
{body_content}
</div>
</body>
</html>"""
with open(filepath, "w", encoding="utf-8") as f:
f.write(html_content)
# Keep print statement concise for clarity
# print(f"Created: {filepath}")
def generate_site(base_dir: Path, site_name: str = "FakeShop", base_path: str = ""):
"""Generates the dummy website structure."""
base_dir.mkdir(parents=True, exist_ok=True)
# --- Clean and prepare the base path for URL construction ---
# Ensure it starts with '/' if not empty, and remove any trailing '/'
if base_path:
full_base_path = "/" + base_path.strip('/')
else:
full_base_path = "" # Represents the root
print(f"Using base path for links: '{full_base_path}'")
# --- Level 0: Homepage ---
home_body = "<h2>Welcome to FakeShop!</h2><p>Your one-stop shop for imaginary items.</p><h3>Categories:</h3>\n<ul>"
# Define the *actual* link path for the homepage breadcrumb
home_link_path = f"{full_base_path}/index.html"
breadcrumbs_home = [{"name": "Home", "link": home_link_path}] # Base breadcrumb
# Links *within* the page content should remain relative
for i in range(NUM_CATEGORIES):
cat_name = f"Category-{i+1}"
cat_folder_name = quote(cat_name.lower().replace(" ", "-"))
# This path is relative to the current directory (index.html)
cat_relative_page_path = f"{cat_folder_name}/index.html"
home_body += f'<li><a class="category-link" href="{cat_relative_page_path}">{cat_name}</a> - {generate_lorem(10)}</li>'
home_body += "</ul>"
create_html_page(base_dir / "index.html", "Homepage", home_body, []) # No breadcrumbs *on* the homepage itself
# --- Levels 1-5 ---
for i in range(NUM_CATEGORIES):
cat_name = f"Category-{i+1}"
cat_folder_name = quote(cat_name.lower().replace(" ", "-"))
cat_dir = base_dir / cat_folder_name
# This is the *absolute* path for the breadcrumb link
cat_link_path = f"{full_base_path}/{cat_folder_name}/index.html"
# Update breadcrumbs list for this level
breadcrumbs_cat = breadcrumbs_home + [{"name": cat_name, "link": cat_link_path}]
# --- Level 1: Category Page ---
cat_body = f"<p>{generate_lorem(15)} for {cat_name}.</p><h3>Sub-Categories:</h3>\n<ul>"
for j in range(NUM_SUBCATEGORIES_PER_CAT):
subcat_name = f"{cat_name}-Sub-{j+1}"
subcat_folder_name = quote(subcat_name.lower().replace(" ", "-"))
# Path relative to the category page
subcat_relative_page_path = f"{subcat_folder_name}/index.html"
cat_body += f'<li><a class="subcategory-link" href="{subcat_relative_page_path}">{subcat_name}</a> - {generate_lorem(8)}</li>'
cat_body += "</ul>"
# Pass the updated breadcrumbs list
create_html_page(cat_dir / "index.html", cat_name, cat_body, breadcrumbs_home) # Parent breadcrumb needed here
for j in range(NUM_SUBCATEGORIES_PER_CAT):
subcat_name = f"{cat_name}-Sub-{j+1}"
subcat_folder_name = quote(subcat_name.lower().replace(" ", "-"))
subcat_dir = cat_dir / subcat_folder_name
# Absolute path for the breadcrumb link
subcat_link_path = f"{full_base_path}/{cat_folder_name}/{subcat_folder_name}/index.html"
# Update breadcrumbs list for this level
breadcrumbs_subcat = breadcrumbs_cat + [{"name": subcat_name, "link": subcat_link_path}]
# --- Level 2: Sub-Category Page (Product List) ---
subcat_body = f"<p>Explore products in {subcat_name}. {generate_lorem(12)}</p><h3>Products:</h3>\n<ul class='product-list'>"
for k in range(NUM_PRODUCTS_PER_SUBCAT):
prod_id = f"P{i+1}{j+1}{k+1:03d}" # e.g., P11001
prod_name = f"{subcat_name} Product {k+1} ({prod_id})"
# Filename relative to the subcategory page
prod_filename = f"product_{prod_id}.html"
# Absolute path for the breadcrumb link
prod_link_path = f"{full_base_path}/{cat_folder_name}/{subcat_folder_name}/{prod_filename}"
# Preview on list page (link remains relative)
subcat_body += f"""
<li>
<div class="product-preview">
<a class="product-link" href="{prod_filename}"><strong>{prod_name}</strong></a>
<p>{generate_lorem(10)}</p>
<span class="product-price"{random.uniform(10, 500):.2f}</span>
</div>
</li>"""
# --- Level 3: Product Page ---
prod_price = random.uniform(10, 500)
prod_desc = generate_lorem(40)
prod_specs = {f"Spec {s+1}": generate_lorem(3) for s in range(random.randint(3,6))}
prod_reviews_count = random.randint(0, 150)
# Relative filenames for links on this page
details_filename_relative = f"product_{prod_id}_details.html"
reviews_filename_relative = f"product_{prod_id}_reviews.html"
prod_body = f"""
<p class="product-price">Price: £{prod_price:.2f}</p>
<div class="product-description">
<h2>Description</h2>
<p>{prod_desc}</p>
</div>
<div class="product-specs">
<h2>Specifications</h2>
<ul>
{''.join(f'<li><span class="spec-name">{name}</span>: <span class="spec-value">{value}</span></li>' for name, value in prod_specs.items())}
</ul>
</div>
<div class="product-reviews">
<h2>Reviews</h2>
<p>Total Reviews: <span class="review-count">{prod_reviews_count}</span></p>
</div>
<hr>
<p>
<a class="details-link" href="{details_filename_relative}">View More Details</a> |
<a class="reviews-link" href="{reviews_filename_relative}">See All Reviews</a>
</p>
"""
# Update breadcrumbs list for this level
breadcrumbs_prod = breadcrumbs_subcat + [{"name": prod_name, "link": prod_link_path}]
# Pass the updated breadcrumbs list
create_html_page(subcat_dir / prod_filename, prod_name, prod_body, breadcrumbs_subcat) # Parent breadcrumb needed here
# --- Level 4: Product Details Page ---
details_filename = f"product_{prod_id}_details.html" # Actual filename
# Absolute path for the breadcrumb link
details_link_path = f"{full_base_path}/{cat_folder_name}/{subcat_folder_name}/{details_filename}"
details_body = f"<p>This page contains extremely detailed information about {prod_name}.</p>{generate_lorem(100)}"
# Update breadcrumbs list for this level
breadcrumbs_details = breadcrumbs_prod + [{"name": "Details", "link": details_link_path}]
# Pass the updated breadcrumbs list
create_html_page(subcat_dir / details_filename, f"{prod_name} - Details", details_body, breadcrumbs_prod) # Parent breadcrumb needed here
# --- Level 5: Product Reviews Page ---
reviews_filename = f"product_{prod_id}_reviews.html" # Actual filename
# Absolute path for the breadcrumb link
reviews_link_path = f"{full_base_path}/{cat_folder_name}/{subcat_folder_name}/{reviews_filename}"
reviews_body = f"<p>All {prod_reviews_count} reviews for {prod_name} are listed here.</p><ul>"
for r in range(prod_reviews_count):
reviews_body += f"<li>Review {r+1}: {generate_lorem(random.randint(15, 50))}</li>"
reviews_body += "</ul>"
# Update breadcrumbs list for this level
breadcrumbs_reviews = breadcrumbs_prod + [{"name": "Reviews", "link": reviews_link_path}]
# Pass the updated breadcrumbs list
create_html_page(subcat_dir / reviews_filename, f"{prod_name} - Reviews", reviews_body, breadcrumbs_prod) # Parent breadcrumb needed here
subcat_body += "</ul>" # Close product-list ul
# Pass the correct breadcrumbs list for the subcategory index page
create_html_page(subcat_dir / "index.html", subcat_name, subcat_body, breadcrumbs_cat) # Parent breadcrumb needed here
# --- Main Execution ---
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Generate a dummy multi-level retail website.")
parser.add_argument(
"-o", "--output-dir",
type=str,
default="dummy_retail_site",
help="Directory to generate the website in."
)
parser.add_argument(
"-n", "--site-name",
type=str,
default="FakeShop",
help="Name of the fake shop."
)
parser.add_argument(
"-b", "--base-path",
type=str,
default="",
help="Base path for hosting the site (e.g., 'samples/deepcrawl'). Leave empty if hosted at the root."
)
# Optional: Add more args to configure counts if needed
args = parser.parse_args()
output_directory = Path(args.output_dir)
site_name = args.site_name
base_path = args.base_path
print(f"Generating dummy site '{site_name}' in '{output_directory}'...")
# Pass the base_path to the generation function
generate_site(output_directory, site_name, base_path)
print(f"\nCreated {sum(1 for _ in output_directory.rglob('*.html'))} HTML pages.")
print("Dummy site generation complete.")
print(f"To serve locally (example): python -m http.server --directory {output_directory} 8000")
if base_path:
print(f"Access the site at: http://localhost:8000/{base_path.strip('/')}/index.html")
else:
print(f"Access the site at: http://localhost:8000/index.html")