diff --git a/crawl4ai/__init__.py b/crawl4ai/__init__.py index 0ab808f3..37dd8366 100644 --- a/crawl4ai/__init__.py +++ b/crawl4ai/__init__.py @@ -2,7 +2,7 @@ import warnings from .async_webcrawler import AsyncWebCrawler, CacheMode -from .async_configs import BrowserConfig, CrawlerRunConfig, HTTPCrawlerConfig, LLMConfig +from .async_configs import BrowserConfig, CrawlerRunConfig, HTTPCrawlerConfig, LLMConfig, ProxyConfig from .content_scraping_strategy import ( ContentScrapingStrategy, @@ -121,6 +121,7 @@ __all__ = [ "Crawl4aiDockerClient", "ProxyRotationStrategy", "RoundRobinProxyStrategy", + "ProxyConfig" ] diff --git a/crawl4ai/async_configs.py b/crawl4ai/async_configs.py index 2f421178..faa29024 100644 --- a/crawl4ai/async_configs.py +++ b/crawl4ai/async_configs.py @@ -5,6 +5,7 @@ from .config import ( MIN_WORD_THRESHOLD, IMAGE_DESCRIPTION_MIN_WORD_THRESHOLD, PROVIDER_MODELS, + PROVIDER_MODELS_PREFIXES, SCREENSHOT_HEIGHT_TRESHOLD, PAGE_TIMEOUT, IMAGE_SCORE_THRESHOLD, @@ -27,11 +28,8 @@ import inspect from typing import Any, Dict, Optional from enum import Enum -from .proxy_strategy import ProxyConfig -try: - from .browser.models import DockerConfig -except ImportError: - DockerConfig = None +# from .proxy_strategy import ProxyConfig + def to_serializable_dict(obj: Any, ignore_default_value : bool = False) -> Dict: @@ -161,6 +159,117 @@ def is_empty_value(value: Any) -> bool: return True return False +class ProxyConfig: + def __init__( + self, + server: str, + username: Optional[str] = None, + password: Optional[str] = None, + ip: Optional[str] = None, + ): + """Configuration class for a single proxy. + + Args: + server: Proxy server URL (e.g., "http://127.0.0.1:8080") + username: Optional username for proxy authentication + password: Optional password for proxy authentication + ip: Optional IP address for verification purposes + """ + self.server = server + self.username = username + self.password = password + + # Extract IP from server if not explicitly provided + self.ip = ip or self._extract_ip_from_server() + + def _extract_ip_from_server(self) -> Optional[str]: + """Extract IP address from server URL.""" + try: + # Simple extraction assuming http://ip:port format + if "://" in self.server: + parts = self.server.split("://")[1].split(":") + return parts[0] + else: + parts = self.server.split(":") + return parts[0] + except Exception: + return None + + @staticmethod + def from_string(proxy_str: str) -> "ProxyConfig": + """Create a ProxyConfig from a string in the format 'ip:port:username:password'.""" + parts = proxy_str.split(":") + if len(parts) == 4: # ip:port:username:password + ip, port, username, password = parts + return ProxyConfig( + server=f"http://{ip}:{port}", + username=username, + password=password, + ip=ip + ) + elif len(parts) == 2: # ip:port only + ip, port = parts + return ProxyConfig( + server=f"http://{ip}:{port}", + ip=ip + ) + else: + raise ValueError(f"Invalid proxy string format: {proxy_str}") + + @staticmethod + def from_dict(proxy_dict: Dict) -> "ProxyConfig": + """Create a ProxyConfig from a dictionary.""" + return ProxyConfig( + server=proxy_dict.get("server"), + username=proxy_dict.get("username"), + password=proxy_dict.get("password"), + ip=proxy_dict.get("ip") + ) + + @staticmethod + def from_env(env_var: str = "PROXIES") -> List["ProxyConfig"]: + """Load proxies from environment variable. + + Args: + env_var: Name of environment variable containing comma-separated proxy strings + + Returns: + List of ProxyConfig objects + """ + proxies = [] + try: + proxy_list = os.getenv(env_var, "").split(",") + for proxy in proxy_list: + if not proxy: + continue + proxies.append(ProxyConfig.from_string(proxy)) + except Exception as e: + print(f"Error loading proxies from environment: {e}") + return proxies + + def to_dict(self) -> Dict: + """Convert to dictionary representation.""" + return { + "server": self.server, + "username": self.username, + "password": self.password, + "ip": self.ip + } + + def clone(self, **kwargs) -> "ProxyConfig": + """Create a copy of this configuration with updated values. + + Args: + **kwargs: Key-value pairs of configuration options to update + + Returns: + ProxyConfig: A new instance with the specified updates + """ + config_dict = self.to_dict() + config_dict.update(kwargs) + return ProxyConfig.from_dict(config_dict) + + class BrowserConfig: """ @@ -197,8 +306,6 @@ class BrowserConfig: Default: None. proxy_config (ProxyConfig or dict or None): Detailed proxy configuration, e.g. {"server": "...", "username": "..."}. If None, no additional proxy config. Default: None. - docker_config (DockerConfig or dict or None): Configuration for Docker-based browser automation. - Contains settings for Docker container operation. Default: None. viewport_width (int): Default viewport width for pages. Default: 1080. viewport_height (int): Default viewport height for pages. Default: 600. viewport (dict): Default viewport dimensions for pages. If set, overrides viewport_width and viewport_height. @@ -244,7 +351,6 @@ class BrowserConfig: channel: str = "chromium", proxy: str = None, proxy_config: Union[ProxyConfig, dict, None] = None, - docker_config: Union[DockerConfig, dict, None] = None, viewport_width: int = 1080, viewport_height: int = 600, viewport: dict = None, @@ -285,15 +391,7 @@ class BrowserConfig: self.chrome_channel = "" self.proxy = proxy self.proxy_config = proxy_config - - # Handle docker configuration - if isinstance(docker_config, dict) and DockerConfig is not None: - self.docker_config = DockerConfig.from_kwargs(docker_config) - else: - self.docker_config = docker_config - if self.docker_config: - self.user_data_dir = self.docker_config.user_data_dir self.viewport_width = viewport_width self.viewport_height = viewport_height @@ -364,7 +462,6 @@ class BrowserConfig: channel=kwargs.get("channel", "chromium"), proxy=kwargs.get("proxy"), proxy_config=kwargs.get("proxy_config", None), - docker_config=kwargs.get("docker_config", None), viewport_width=kwargs.get("viewport_width", 1080), viewport_height=kwargs.get("viewport_height", 600), accept_downloads=kwargs.get("accept_downloads", False), @@ -421,13 +518,7 @@ class BrowserConfig: "debugging_port": self.debugging_port, "host": self.host, } - - # Include docker_config if it exists - if hasattr(self, "docker_config") and self.docker_config is not None: - if hasattr(self.docker_config, "to_dict"): - result["docker_config"] = self.docker_config.to_dict() - else: - result["docker_config"] = self.docker_config + return result @@ -1180,9 +1271,18 @@ class LLMConfig: elif api_token and api_token.startswith("env:"): self.api_token = os.getenv(api_token[4:]) else: - self.api_token = PROVIDER_MODELS.get(provider, "no-token") or os.getenv( - DEFAULT_PROVIDER_API_KEY - ) + # Check if given provider starts with any of key in PROVIDER_MODELS_PREFIXES + # If not, check if it is in PROVIDER_MODELS + prefixes = PROVIDER_MODELS_PREFIXES.keys() + if any(provider.startswith(prefix) for prefix in prefixes): + selected_prefix = next( + (prefix for prefix in prefixes if provider.startswith(prefix)), + None, + ) + self.api_token = PROVIDER_MODELS_PREFIXES.get(selected_prefix) + else: + self.provider = DEFAULT_PROVIDER + self.api_token = os.getenv(DEFAULT_PROVIDER_API_KEY) self.base_url = base_url self.temprature = temprature self.max_tokens = max_tokens diff --git a/crawl4ai/async_webcrawler.py b/crawl4ai/async_webcrawler.py index 1cd1b8c9..9ba508b2 100644 --- a/crawl4ai/async_webcrawler.py +++ b/crawl4ai/async_webcrawler.py @@ -36,7 +36,7 @@ from .markdown_generation_strategy import ( ) from .deep_crawling import DeepCrawlDecorator from .async_logger import AsyncLogger, AsyncLoggerBase -from .async_configs import BrowserConfig, CrawlerRunConfig +from .async_configs import BrowserConfig, CrawlerRunConfig, ProxyConfig from .async_dispatcher import * # noqa: F403 from .async_dispatcher import BaseDispatcher, MemoryAdaptiveDispatcher, RateLimiter @@ -291,12 +291,12 @@ class AsyncWebCrawler: # Update proxy configuration from rotation strategy if available if config and config.proxy_rotation_strategy: - next_proxy = await config.proxy_rotation_strategy.get_next_proxy() + next_proxy : ProxyConfig = await config.proxy_rotation_strategy.get_next_proxy() if next_proxy: self.logger.info( message="Switch proxy: {proxy}", tag="PROXY", - params={"proxy": next_proxy.server}, + params={"proxy": next_proxy.server} ) config.proxy_config = next_proxy # config = config.clone(proxy_config=next_proxy) diff --git a/crawl4ai/browser_manager.py b/crawl4ai/browser_manager.py index bfe22f4e..a338d71d 100644 --- a/crawl4ai/browser_manager.py +++ b/crawl4ai/browser_manager.py @@ -94,6 +94,7 @@ class ManagedBrowser: host: str = "localhost", debugging_port: int = 9222, cdp_url: Optional[str] = None, + browser_config: Optional[BrowserConfig] = None, ): """ Initialize the ManagedBrowser instance. @@ -109,17 +110,19 @@ class ManagedBrowser: host (str): Host for debugging the browser. Default: "localhost". debugging_port (int): Port for debugging the browser. Default: 9222. cdp_url (str or None): CDP URL to connect to the browser. Default: None. + browser_config (BrowserConfig): Configuration object containing all browser settings. Default: None. """ - self.browser_type = browser_type - self.user_data_dir = user_data_dir - self.headless = headless + self.browser_type = browser_config.browser_type + self.user_data_dir = browser_config.user_data_dir + self.headless = browser_config.headless self.browser_process = None self.temp_dir = None - self.debugging_port = debugging_port - self.host = host + self.debugging_port = browser_config.debugging_port + self.host = browser_config.host self.logger = logger self.shutting_down = False - self.cdp_url = cdp_url + self.cdp_url = browser_config.cdp_url + self.browser_config = browser_config async def start(self) -> str: """ @@ -142,6 +145,9 @@ class ManagedBrowser: # Get browser path and args based on OS and browser type # browser_path = self._get_browser_path() args = await self._get_browser_args() + + if self.browser_config.extra_args: + args.extend(self.browser_config.extra_args) # Start browser process try: @@ -477,6 +483,7 @@ class BrowserManager: logger=self.logger, debugging_port=self.config.debugging_port, cdp_url=self.config.cdp_url, + browser_config=self.config, ) async def start(self): diff --git a/crawl4ai/config.py b/crawl4ai/config.py index 103dc1b7..08f56b83 100644 --- a/crawl4ai/config.py +++ b/crawl4ai/config.py @@ -29,6 +29,14 @@ PROVIDER_MODELS = { 'gemini/gemini-2.0-flash-lite-preview-02-05': os.getenv("GEMINI_API_KEY"), "deepseek/deepseek-chat": os.getenv("DEEPSEEK_API_KEY"), } +PROVIDER_MODELS_PREFIXES = { + "ollama": "no-token-needed", # Any model from Ollama no need for API token + "groq": os.getenv("GROQ_API_KEY"), + "openai": os.getenv("OPENAI_API_KEY"), + "anthropic": os.getenv("ANTHROPIC_API_KEY"), + "gemini": os.getenv("GEMINI_API_KEY"), + "deepseek": os.getenv("DEEPSEEK_API_KEY"), +} # Chunk token threshold CHUNK_TOKEN_THRESHOLD = 2**11 # 2048 tokens diff --git a/crawl4ai/proxy_strategy.py b/crawl4ai/proxy_strategy.py index 6821c566..2c01a2f5 100644 --- a/crawl4ai/proxy_strategy.py +++ b/crawl4ai/proxy_strategy.py @@ -4,6 +4,9 @@ from itertools import cycle import os +########### ATTENTION PEOPLE OF EARTH ########### +# I have moved this config to async_configs.py, kept it here, in case someone still importing it, however +# be a dear and follow `from crawl4ai import ProxyConfig` instead :) class ProxyConfig: def __init__( self, @@ -119,12 +122,12 @@ class ProxyRotationStrategy(ABC): """Base abstract class for proxy rotation strategies""" @abstractmethod - async def get_next_proxy(self) -> Optional[Dict]: + async def get_next_proxy(self) -> Optional[ProxyConfig]: """Get next proxy configuration from the strategy""" pass @abstractmethod - def add_proxies(self, proxies: List[Dict]): + def add_proxies(self, proxies: List[ProxyConfig]): """Add proxy configurations to the strategy""" pass diff --git a/crawl4ai/ssl_certificate.py b/crawl4ai/ssl_certificate.py index 722bb7f9..a60b7cbc 100644 --- a/crawl4ai/ssl_certificate.py +++ b/crawl4ai/ssl_certificate.py @@ -9,83 +9,44 @@ from urllib.parse import urlparse import OpenSSL.crypto from pathlib import Path - -class SSLCertificate: +# === Inherit from dict === +class SSLCertificate(dict): """ - A class representing an SSL certificate with methods to export in various formats. + A class representing an SSL certificate, behaving like a dictionary + for direct JSON serialization. It stores the certificate information internally + and provides methods for export and property access. - Attributes: - cert_info (Dict[str, Any]): The certificate information. - - Methods: - from_url(url: str, timeout: int = 10) -> Optional['SSLCertificate']: Create SSLCertificate instance from a URL. - from_file(file_path: str) -> Optional['SSLCertificate']: Create SSLCertificate instance from a file. - from_binary(binary_data: bytes) -> Optional['SSLCertificate']: Create SSLCertificate instance from binary data. - export_as_pem() -> str: Export the certificate as PEM format. - export_as_der() -> bytes: Export the certificate as DER format. - export_as_json() -> Dict[str, Any]: Export the certificate as JSON format. - export_as_text() -> str: Export the certificate as text format. + Inherits from dict, so instances are directly JSON serializable. """ + # Use __slots__ for potential memory optimization if desired, though less common when inheriting dict + # __slots__ = ("_cert_info",) # If using slots, be careful with dict inheritance interaction + def __init__(self, cert_info: Dict[str, Any]): - self._cert_info = self._decode_cert_data(cert_info) - - @staticmethod - def from_url(url: str, timeout: int = 10) -> Optional["SSLCertificate"]: """ - Create SSLCertificate instance from a URL. + Initializes the SSLCertificate object. Args: - url (str): URL of the website. - timeout (int): Timeout for the connection (default: 10). - - Returns: - Optional[SSLCertificate]: SSLCertificate instance if successful, None otherwise. + cert_info (Dict[str, Any]): The raw certificate dictionary. """ - try: - hostname = urlparse(url).netloc - if ":" in hostname: - hostname = hostname.split(":")[0] + # 1. Decode the data (handle bytes -> str) + decoded_info = self._decode_cert_data(cert_info) - context = ssl.create_default_context() - with socket.create_connection((hostname, 443), timeout=timeout) as sock: - with context.wrap_socket(sock, server_hostname=hostname) as ssock: - cert_binary = ssock.getpeercert(binary_form=True) - x509 = OpenSSL.crypto.load_certificate( - OpenSSL.crypto.FILETYPE_ASN1, cert_binary - ) + # 2. Store the decoded info internally (optional but good practice) + # self._cert_info = decoded_info # You can keep this if methods rely on it - cert_info = { - "subject": dict(x509.get_subject().get_components()), - "issuer": dict(x509.get_issuer().get_components()), - "version": x509.get_version(), - "serial_number": hex(x509.get_serial_number()), - "not_before": x509.get_notBefore(), - "not_after": x509.get_notAfter(), - "fingerprint": x509.digest("sha256").hex(), - "signature_algorithm": x509.get_signature_algorithm(), - "raw_cert": base64.b64encode(cert_binary), - } - - # Add extensions - extensions = [] - for i in range(x509.get_extension_count()): - ext = x509.get_extension(i) - extensions.append( - {"name": ext.get_short_name(), "value": str(ext)} - ) - cert_info["extensions"] = extensions - - return SSLCertificate(cert_info) - - except Exception: - return None + # 3. Initialize the dictionary part of the object with the decoded data + super().__init__(decoded_info) @staticmethod def _decode_cert_data(data: Any) -> Any: """Helper method to decode bytes in certificate data.""" if isinstance(data, bytes): - return data.decode("utf-8") + try: + # Try UTF-8 first, fallback to latin-1 for arbitrary bytes + return data.decode("utf-8") + except UnicodeDecodeError: + return data.decode("latin-1") # Or handle as needed, maybe hex representation elif isinstance(data, dict): return { ( @@ -97,36 +58,119 @@ class SSLCertificate: return [SSLCertificate._decode_cert_data(item) for item in data] return data + @staticmethod + def from_url(url: str, timeout: int = 10) -> Optional["SSLCertificate"]: + """ + Create SSLCertificate instance from a URL. Fetches cert info and initializes. + (Fetching logic remains the same) + """ + cert_info_raw = None # Variable to hold the fetched dict + try: + hostname = urlparse(url).netloc + if ":" in hostname: + hostname = hostname.split(":")[0] + + context = ssl.create_default_context() + # Set check_hostname to False and verify_mode to CERT_NONE temporarily + # for potentially problematic certificates during fetch, but parse the result regardless. + # context.check_hostname = False + # context.verify_mode = ssl.CERT_NONE + + with socket.create_connection((hostname, 443), timeout=timeout) as sock: + with context.wrap_socket(sock, server_hostname=hostname) as ssock: + cert_binary = ssock.getpeercert(binary_form=True) + if not cert_binary: + print(f"Warning: No certificate returned for {hostname}") + return None + + x509 = OpenSSL.crypto.load_certificate( + OpenSSL.crypto.FILETYPE_ASN1, cert_binary + ) + + # Create the dictionary directly + cert_info_raw = { + "subject": dict(x509.get_subject().get_components()), + "issuer": dict(x509.get_issuer().get_components()), + "version": x509.get_version(), + "serial_number": hex(x509.get_serial_number()), + "not_before": x509.get_notBefore(), # Keep as bytes initially, _decode handles it + "not_after": x509.get_notAfter(), # Keep as bytes initially + "fingerprint": x509.digest("sha256").hex(), # hex() is already string + "signature_algorithm": x509.get_signature_algorithm(), # Keep as bytes + "raw_cert": base64.b64encode(cert_binary), # Base64 is bytes, _decode handles it + } + + # Add extensions + extensions = [] + for i in range(x509.get_extension_count()): + ext = x509.get_extension(i) + # get_short_name() returns bytes, str(ext) handles value conversion + extensions.append( + {"name": ext.get_short_name(), "value": str(ext)} + ) + cert_info_raw["extensions"] = extensions + + except ssl.SSLCertVerificationError as e: + print(f"SSL Verification Error for {url}: {e}") + # Decide if you want to proceed or return None based on your needs + # You might try fetching without verification here if needed, but be cautious. + return None + except socket.gaierror: + print(f"Could not resolve hostname: {hostname}") + return None + except socket.timeout: + print(f"Connection timed out for {url}") + return None + except Exception as e: + print(f"Error fetching/processing certificate for {url}: {e}") + # Log the full error details if needed: logging.exception("Cert fetch error") + return None + + # If successful, create the SSLCertificate instance from the dictionary + if cert_info_raw: + return SSLCertificate(cert_info_raw) + else: + return None + + + # --- Properties now access the dictionary items directly via self[] --- + @property + def issuer(self) -> Dict[str, str]: + return self.get("issuer", {}) # Use self.get for safety + + @property + def subject(self) -> Dict[str, str]: + return self.get("subject", {}) + + @property + def valid_from(self) -> str: + return self.get("not_before", "") + + @property + def valid_until(self) -> str: + return self.get("not_after", "") + + @property + def fingerprint(self) -> str: + return self.get("fingerprint", "") + + # --- Export methods can use `self` directly as it is the dict --- def to_json(self, filepath: Optional[str] = None) -> Optional[str]: - """ - Export certificate as JSON. - - Args: - filepath (Optional[str]): Path to save the JSON file (default: None). - - Returns: - Optional[str]: JSON string if successful, None otherwise. - """ - json_str = json.dumps(self._cert_info, indent=2, ensure_ascii=False) + """Export certificate as JSON.""" + # `self` is already the dictionary we want to serialize + json_str = json.dumps(self, indent=2, ensure_ascii=False) if filepath: Path(filepath).write_text(json_str, encoding="utf-8") return None return json_str def to_pem(self, filepath: Optional[str] = None) -> Optional[str]: - """ - Export certificate as PEM. - - Args: - filepath (Optional[str]): Path to save the PEM file (default: None). - - Returns: - Optional[str]: PEM string if successful, None otherwise. - """ + """Export certificate as PEM.""" try: + # Decode the raw_cert (which should be string due to _decode) + raw_cert_bytes = base64.b64decode(self.get("raw_cert", "")) x509 = OpenSSL.crypto.load_certificate( - OpenSSL.crypto.FILETYPE_ASN1, - base64.b64decode(self._cert_info["raw_cert"]), + OpenSSL.crypto.FILETYPE_ASN1, raw_cert_bytes ) pem_data = OpenSSL.crypto.dump_certificate( OpenSSL.crypto.FILETYPE_PEM, x509 @@ -136,49 +180,25 @@ class SSLCertificate: Path(filepath).write_text(pem_data, encoding="utf-8") return None return pem_data - except Exception: - return None + except Exception as e: + print(f"Error converting to PEM: {e}") + return None def to_der(self, filepath: Optional[str] = None) -> Optional[bytes]: - """ - Export certificate as DER. - - Args: - filepath (Optional[str]): Path to save the DER file (default: None). - - Returns: - Optional[bytes]: DER bytes if successful, None otherwise. - """ + """Export certificate as DER.""" try: - der_data = base64.b64decode(self._cert_info["raw_cert"]) + # Decode the raw_cert (which should be string due to _decode) + der_data = base64.b64decode(self.get("raw_cert", "")) if filepath: Path(filepath).write_bytes(der_data) return None return der_data - except Exception: - return None + except Exception as e: + print(f"Error converting to DER: {e}") + return None - @property - def issuer(self) -> Dict[str, str]: - """Get certificate issuer information.""" - return self._cert_info.get("issuer", {}) - - @property - def subject(self) -> Dict[str, str]: - """Get certificate subject information.""" - return self._cert_info.get("subject", {}) - - @property - def valid_from(self) -> str: - """Get certificate validity start date.""" - return self._cert_info.get("not_before", "") - - @property - def valid_until(self) -> str: - """Get certificate validity end date.""" - return self._cert_info.get("not_after", "") - - @property - def fingerprint(self) -> str: - """Get certificate fingerprint.""" - return self._cert_info.get("fingerprint", "") + # Optional: Add __repr__ for better debugging + def __repr__(self) -> str: + subject_cn = self.subject.get('CN', 'N/A') + issuer_cn = self.issuer.get('CN', 'N/A') + return f"" \ No newline at end of file diff --git a/docs/examples/quickstart_examples_set_1.py b/docs/examples/quickstart_examples_set_1.py index 76224746..078d1c4a 100644 --- a/docs/examples/quickstart_examples_set_1.py +++ b/docs/examples/quickstart_examples_set_1.py @@ -4,7 +4,7 @@ import json import base64 from pathlib import Path from typing import List -from crawl4ai.proxy_strategy import ProxyConfig +from crawl4ai import ProxyConfig from crawl4ai import AsyncWebCrawler, CrawlerRunConfig, CacheMode, CrawlResult from crawl4ai import RoundRobinProxyStrategy diff --git a/docs/examples/tutorial_v0.5.py b/docs/examples/tutorial_v0.5.py index 3cbbdb7b..fe8e0a2b 100644 --- a/docs/examples/tutorial_v0.5.py +++ b/docs/examples/tutorial_v0.5.py @@ -13,7 +13,7 @@ from crawl4ai.deep_crawling import ( ) from crawl4ai.deep_crawling.scorers import KeywordRelevanceScorer from crawl4ai.async_crawler_strategy import AsyncHTTPCrawlerStrategy -from crawl4ai.proxy_strategy import ProxyConfig +from crawl4ai import ProxyConfig from crawl4ai import RoundRobinProxyStrategy from crawl4ai.content_filter_strategy import LLMContentFilter from crawl4ai import DefaultMarkdownGenerator diff --git a/docs/md_v2/blog/releases/0.5.0.md b/docs/md_v2/blog/releases/0.5.0.md index 24b0feda..30269a29 100644 --- a/docs/md_v2/blog/releases/0.5.0.md +++ b/docs/md_v2/blog/releases/0.5.0.md @@ -251,7 +251,7 @@ from crawl4ai import ( RoundRobinProxyStrategy, ) import asyncio -from crawl4ai.proxy_strategy import ProxyConfig +from crawl4ai import ProxyConfig async def main(): # Load proxies and create rotation strategy proxies = ProxyConfig.from_env() diff --git a/tests/docker/test_rest_api_deep_crawl.py b/tests/docker/test_rest_api_deep_crawl.py new file mode 100644 index 00000000..64afefff --- /dev/null +++ b/tests/docker/test_rest_api_deep_crawl.py @@ -0,0 +1,596 @@ +# ==== File: test_rest_api_deep_crawl.py ==== + +import pytest +import pytest_asyncio +import httpx +import json +import asyncio +import os +from typing import List, Dict, Any, AsyncGenerator + +from dotenv import load_dotenv +load_dotenv() # Load environment variables from .env file if present + +# --- Test Configuration --- +BASE_URL = os.getenv("CRAWL4AI_TEST_URL", "http://localhost:11235") # Ensure this points to your running server +BASE_URL = os.getenv("CRAWL4AI_TEST_URL", "http://localhost:8020") # Ensure this points to your running server +DEEP_CRAWL_BASE_URL = "https://docs.crawl4ai.com/samples/deepcrawl/" +DEEP_CRAWL_DOMAIN = "docs.crawl4ai.com" # Used for domain filter + +# --- Helper Functions --- +def load_proxies_from_env() -> List[Dict]: + """Load proxies from PROXIES environment variable""" + proxies = [] + proxies_str = os.getenv("PROXIES", "") + if not proxies_str: + print("PROXIES environment variable not set or empty.") + return proxies + try: + proxy_list = proxies_str.split(",") + for proxy in proxy_list: + proxy = proxy.strip() + if not proxy: + continue + parts = proxy.split(":") + if len(parts) == 4: + ip, port, username, password = parts + proxies.append({ + "server": f"http://{ip}:{port}", # Assuming http, adjust if needed + "username": username, + "password": password, + "ip": ip # Store original IP if available + }) + elif len(parts) == 2: # ip:port only + ip, port = parts + proxies.append({ + "server": f"http://{ip}:{port}", + "ip": ip + }) + else: + print(f"Skipping invalid proxy string format: {proxy}") + + except Exception as e: + print(f"Error loading proxies from environment: {e}") + return proxies + + +async def check_server_health(client: httpx.AsyncClient): + """Check if the server is healthy before running tests.""" + try: + response = await client.get("/health") + response.raise_for_status() + print(f"\nServer healthy: {response.json()}") + return True + except (httpx.RequestError, httpx.HTTPStatusError) as e: + pytest.fail(f"Server health check failed: {e}. Is the server running at {BASE_URL}?", pytrace=False) + +async def assert_crawl_result_structure(result: Dict[str, Any], check_ssl=False): + """Asserts the basic structure of a single crawl result.""" + assert isinstance(result, dict) + assert "url" in result + assert "success" in result + assert "html" in result # Basic crawls should return HTML + assert "metadata" in result + assert isinstance(result["metadata"], dict) + assert "depth" in result["metadata"] # Deep crawls add depth + + if check_ssl: + assert "ssl_certificate" in result # Check if SSL info is present + assert isinstance(result["ssl_certificate"], dict) or result["ssl_certificate"] is None + + +async def process_streaming_response(response: httpx.Response) -> List[Dict[str, Any]]: + """Processes an NDJSON streaming response.""" + results = [] + completed = False + async for line in response.aiter_lines(): + if line: + try: + data = json.loads(line) + if data.get("status") == "completed": + completed = True + break # Stop processing after completion marker + elif data.get("url"): # Ensure it looks like a result object + results.append(data) + else: + print(f"Received non-result JSON line: {data}") # Log other status messages if needed + except json.JSONDecodeError: + pytest.fail(f"Failed to decode JSON line: {line}") + assert completed, "Streaming response did not end with a completion marker." + return results + + +# --- Pytest Fixtures --- +@pytest_asyncio.fixture(scope="function") +async def async_client() -> AsyncGenerator[httpx.AsyncClient, None]: + """Provides an async HTTP client""" + # Increased timeout for potentially longer deep crawls + async with httpx.AsyncClient(base_url=BASE_URL, timeout=300.0) as client: + yield client + # No explicit close needed with 'async with' + +# --- Test Class --- +@pytest.mark.asyncio +class TestDeepCrawlEndpoints: + + @pytest_asyncio.fixture(autouse=True) + async def check_health_before_tests(self, async_client: httpx.AsyncClient): + """Fixture to ensure server is healthy before each test in the class.""" + await check_server_health(async_client) + + # 1. Basic Deep Crawl + # async def test_deep_crawl_basic_bfs(self, async_client: httpx.AsyncClient): + # """Test BFS deep crawl with limited depth and pages.""" + # max_depth = 1 + # max_pages = 3 # start_url + 2 more + # payload = { + # "urls": [DEEP_CRAWL_BASE_URL], + # "browser_config": {"type": "BrowserConfig", "params": {"headless": True}}, + # "crawler_config": { + # "type": "CrawlerRunConfig", + # "params": { + # "stream": False, + # "cache_mode": "BYPASS", # Use string value for CacheMode + # "deep_crawl_strategy": { + # "type": "BFSDeepCrawlStrategy", + # "params": { + # "max_depth": max_depth, + # "max_pages": max_pages, + # # Minimal filters for basic test + # "filter_chain": { + # "type": "FilterChain", + # "params": { + # "filters": [ + # { + # "type": "DomainFilter", + # "params": {"allowed_domains": [DEEP_CRAWL_DOMAIN]} + # } + # ] + # } + # } + # } + # } + # } + # } + # } + # response = await async_client.post("/crawl", json=payload) + # response.raise_for_status() + # data = response.json() + + # assert data["success"] is True + # assert isinstance(data["results"], list) + # assert len(data["results"]) > 1 # Should be more than just the start URL + # assert len(data["results"]) <= max_pages # Respect max_pages + + # found_depth_0 = False + # found_depth_1 = False + # for result in data["results"]: + # await assert_crawl_result_structure(result) + # assert result["success"] is True + # assert DEEP_CRAWL_DOMAIN in result["url"] + # depth = result["metadata"]["depth"] + # assert depth <= max_depth + # if depth == 0: found_depth_0 = True + # if depth == 1: found_depth_1 = True + + # assert found_depth_0 + # assert found_depth_1 + + # # 2. Deep Crawl with Filtering + # async def test_deep_crawl_with_filters(self, async_client: httpx.AsyncClient): + # """Test BFS deep crawl with content type and domain filters.""" + # max_depth = 1 + # max_pages = 5 + # payload = { + # "urls": [DEEP_CRAWL_BASE_URL], + # "browser_config": {"type": "BrowserConfig", "params": {"headless": True}}, + # "crawler_config": { + # "type": "CrawlerRunConfig", + # "params": { + # "stream": False, + # "cache_mode": "BYPASS", + # "deep_crawl_strategy": { + # "type": "BFSDeepCrawlStrategy", + # "params": { + # "max_depth": max_depth, + # "max_pages": max_pages, + # "filter_chain": { + # "type": "FilterChain", + # "params": { + # "filters": [ + # { + # "type": "DomainFilter", + # "params": {"allowed_domains": [DEEP_CRAWL_DOMAIN]} + # }, + # { + # "type": "ContentTypeFilter", + # "params": {"allowed_types": ["text/html"]} + # }, + # # Example: Exclude specific paths using regex + # { + # "type": "URLPatternFilter", + # "params": { + # "patterns": ["*/category-3/*"], # Block category 3 + # "reverse": True # Block if match + # } + # } + # ] + # } + # } + # } + # } + # } + # } + # } + # response = await async_client.post("/crawl", json=payload) + # response.raise_for_status() + # data = response.json() + + # assert data["success"] is True + # assert len(data["results"]) > 0 + # assert len(data["results"]) <= max_pages + + # for result in data["results"]: + # await assert_crawl_result_structure(result) + # assert result["success"] is True + # assert DEEP_CRAWL_DOMAIN in result["url"] + # assert "category-3" not in result["url"] # Check if filter worked + # assert result["metadata"]["depth"] <= max_depth + + # # 3. Deep Crawl with Scoring + # async def test_deep_crawl_with_scoring(self, async_client: httpx.AsyncClient): + # """Test BFS deep crawl with URL scoring.""" + # max_depth = 1 + # max_pages = 4 + # payload = { + # "urls": [DEEP_CRAWL_BASE_URL], + # "browser_config": {"type": "BrowserConfig", "params": {"headless": True}}, + # "crawler_config": { + # "type": "CrawlerRunConfig", + # "params": { + # "stream": False, + # "cache_mode": "BYPASS", + # "deep_crawl_strategy": { + # "type": "BFSDeepCrawlStrategy", + # "params": { + # "max_depth": max_depth, + # "max_pages": max_pages, + # "filter_chain": { # Keep basic domain filter + # "type": "FilterChain", + # "params": { "filters": [{"type": "DomainFilter", "params": {"allowed_domains": [DEEP_CRAWL_DOMAIN]}}]} + # }, + # "url_scorer": { # Add scorer + # "type": "CompositeScorer", + # "params": { + # "scorers": [ + # { # Favor pages with 'product' in the URL + # "type": "KeywordRelevanceScorer", + # "params": {"keywords": ["product"], "weight": 1.0} + # }, + # { # Penalize deep paths slightly + # "type": "PathDepthScorer", + # "params": {"optimal_depth": 2, "weight": -0.2} + # } + # ] + # } + # }, + # # Set a threshold if needed: "score_threshold": 0.1 + # } + # } + # } + # } + # } + # response = await async_client.post("/crawl", json=payload) + # response.raise_for_status() + # data = response.json() + + # assert data["success"] is True + # assert len(data["results"]) > 0 + # assert len(data["results"]) <= max_pages + + # # Check if results seem biased towards products (harder to assert strictly without knowing exact scores) + # product_urls_found = any("product_" in result["url"] for result in data["results"] if result["metadata"]["depth"] > 0) + # print(f"Product URLs found among depth > 0 results: {product_urls_found}") + # # We expect scoring to prioritize product pages if available within limits + # # assert product_urls_found # This might be too strict depending on site structure and limits + + # for result in data["results"]: + # await assert_crawl_result_structure(result) + # assert result["success"] is True + # assert result["metadata"]["depth"] <= max_depth + + # # 4. Deep Crawl with CSS Extraction + # async def test_deep_crawl_with_css_extraction(self, async_client: httpx.AsyncClient): + # """Test BFS deep crawl combined with JsonCssExtractionStrategy.""" + # max_depth = 6 # Go deep enough to reach product pages + # max_pages = 20 + # # Schema to extract product details + # product_schema = { + # "name": "ProductDetails", + # "baseSelector": "div.container", # Base for product page + # "fields": [ + # {"name": "product_title", "selector": "h1", "type": "text"}, + # {"name": "price", "selector": ".product-price", "type": "text"}, + # {"name": "description", "selector": ".product-description p", "type": "text"}, + # {"name": "specs", "selector": ".product-specs li", "type": "list", "fields":[ + # {"name": "spec_name", "selector": ".spec-name", "type": "text"}, + # {"name": "spec_value", "selector": ".spec-value", "type": "text"} + # ]} + # ] + # } + # payload = { + # "urls": [DEEP_CRAWL_BASE_URL], + # "browser_config": {"type": "BrowserConfig", "params": {"headless": True}}, + # "crawler_config": { + # "type": "CrawlerRunConfig", + # "params": { + # "stream": False, + # "cache_mode": "BYPASS", + # "extraction_strategy": { # Apply extraction to ALL crawled pages + # "type": "JsonCssExtractionStrategy", + # "params": {"schema": {"type": "dict", "value": product_schema}} + # }, + # "deep_crawl_strategy": { + # "type": "BFSDeepCrawlStrategy", + # "params": { + # "max_depth": max_depth, + # "max_pages": max_pages, + # "filter_chain": { # Only crawl HTML on our domain + # "type": "FilterChain", + # "params": { + # "filters": [ + # {"type": "DomainFilter", "params": {"allowed_domains": [DEEP_CRAWL_DOMAIN]}}, + # {"type": "ContentTypeFilter", "params": {"allowed_types": ["text/html"]}} + # ] + # } + # } + # # Optional: Add scoring to prioritize product pages for extraction + # } + # } + # } + # } + # } + # response = await async_client.post("/crawl", json=payload) + # response.raise_for_status() + # data = response.json() + + # assert data["success"] is True + # assert len(data["results"]) > 0 + # # assert len(data["results"]) <= max_pages + + # found_extracted_product = False + # for result in data["results"]: + # await assert_crawl_result_structure(result) + # assert result["success"] is True + # assert "extracted_content" in result + # if "product_" in result["url"]: # Check product pages specifically + # assert result["extracted_content"] is not None + # try: + # extracted = json.loads(result["extracted_content"]) + # # Schema returns list even if one base match + # assert isinstance(extracted, list) + # if extracted: + # item = extracted[0] + # assert "product_title" in item and item["product_title"] + # assert "price" in item and item["price"] + # # Specs might be empty list if not found + # assert "specs" in item and isinstance(item["specs"], list) + # found_extracted_product = True + # print(f"Extracted product: {item.get('product_title')}") + # except (json.JSONDecodeError, AssertionError, IndexError) as e: + # pytest.fail(f"Extraction validation failed for {result['url']}: {e}\nContent: {result['extracted_content']}") + # # else: + # # # Non-product pages might have None or empty list depending on schema match + # # assert result["extracted_content"] is None or json.loads(result["extracted_content"]) == [] + + # assert found_extracted_product, "Did not find any pages where product data was successfully extracted." + + # # 5. Deep Crawl with LLM Extraction (Requires Server LLM Setup) + # async def test_deep_crawl_with_llm_extraction(self, async_client: httpx.AsyncClient): + # """Test BFS deep crawl combined with LLMExtractionStrategy.""" + # max_depth = 1 # Limit depth to keep LLM calls manageable + # max_pages = 3 + # payload = { + # "urls": [DEEP_CRAWL_BASE_URL], + # "browser_config": {"type": "BrowserConfig", "params": {"headless": True}}, + # "crawler_config": { + # "type": "CrawlerRunConfig", + # "params": { + # "stream": False, + # "cache_mode": "BYPASS", + # "extraction_strategy": { # Apply LLM extraction to crawled pages + # "type": "LLMExtractionStrategy", + # "params": { + # "instruction": "Extract the main H1 title and the text content of the first paragraph.", + # "llm_config": { # Example override, rely on server default if possible + # "type": "LLMConfig", + # "params": {"provider": "openai/gpt-4.1-mini"} # Use a cheaper model for testing + # }, + # "schema": { # Expected JSON output + # "type": "dict", + # "value": { + # "title": "PageContent", "type": "object", + # "properties": { + # "h1_title": {"type": "string"}, + # "first_paragraph": {"type": "string"} + # } + # } + # } + # } + # }, + # "deep_crawl_strategy": { + # "type": "BFSDeepCrawlStrategy", + # "params": { + # "max_depth": max_depth, + # "max_pages": max_pages, + # "filter_chain": { + # "type": "FilterChain", + # "params": { + # "filters": [ + # {"type": "DomainFilter", "params": {"allowed_domains": [DEEP_CRAWL_DOMAIN]}}, + # {"type": "ContentTypeFilter", "params": {"allowed_types": ["text/html"]}} + # ] + # } + # } + # } + # } + # } + # } + # } + + # try: + # response = await async_client.post("/crawl", json=payload) + # response.raise_for_status() + # data = response.json() + # except httpx.HTTPStatusError as e: + # pytest.fail(f"Deep Crawl + LLM extraction request failed: {e}. Response: {e.response.text}. Check server logs and LLM API key setup.") + # except httpx.RequestError as e: + # pytest.fail(f"Deep Crawl + LLM extraction request failed: {e}.") + + + # assert data["success"] is True + # assert len(data["results"]) > 0 + # assert len(data["results"]) <= max_pages + + # found_llm_extraction = False + # for result in data["results"]: + # await assert_crawl_result_structure(result) + # assert result["success"] is True + # assert "extracted_content" in result + # assert result["extracted_content"] is not None + # try: + # extracted = json.loads(result["extracted_content"]) + # if isinstance(extracted, list): extracted = extracted[0] # Handle list output + # assert isinstance(extracted, dict) + # assert "h1_title" in extracted # Check keys based on schema + # assert "first_paragraph" in extracted + # found_llm_extraction = True + # print(f"LLM extracted from {result['url']}: Title='{extracted.get('h1_title')}'") + # except (json.JSONDecodeError, AssertionError, IndexError, TypeError) as e: + # pytest.fail(f"LLM extraction validation failed for {result['url']}: {e}\nContent: {result['extracted_content']}") + + # assert found_llm_extraction, "LLM extraction did not yield expected data on any crawled page." + + + # # 6. Deep Crawl with SSL Certificate Fetching + # async def test_deep_crawl_with_ssl(self, async_client: httpx.AsyncClient): + # """Test BFS deep crawl with fetch_ssl_certificate enabled.""" + # max_depth = 0 # Only fetch for start URL to keep test fast + # max_pages = 1 + # payload = { + # "urls": [DEEP_CRAWL_BASE_URL], + # "browser_config": {"type": "BrowserConfig", "params": {"headless": True}}, + # "crawler_config": { + # "type": "CrawlerRunConfig", + # "params": { + # "stream": False, + # "cache_mode": "BYPASS", + # "fetch_ssl_certificate": True, # <-- Enable SSL fetching + # "deep_crawl_strategy": { + # "type": "BFSDeepCrawlStrategy", + # "params": { + # "max_depth": max_depth, + # "max_pages": max_pages, + # } + # } + # } + # } + # } + # response = await async_client.post("/crawl", json=payload) + # response.raise_for_status() + # data = response.json() + + # assert data["success"] is True + # assert len(data["results"]) == 1 + # result = data["results"][0] + + # await assert_crawl_result_structure(result, check_ssl=True) # <-- Tell helper to check SSL field + # assert result["success"] is True + # # Check if SSL info was actually retrieved + # if result["ssl_certificate"]: + # # Assert directly using dictionary keys + # assert isinstance(result["ssl_certificate"], dict) # Verify it's a dict + # assert "issuer" in result["ssl_certificate"] + # assert "subject" in result["ssl_certificate"] + # # --- MODIFIED ASSERTIONS --- + # assert "not_before" in result["ssl_certificate"] # Check for the actual key + # assert "not_after" in result["ssl_certificate"] # Check for the actual key + # # --- END MODIFICATIONS --- + # assert "fingerprint" in result["ssl_certificate"] # Check another key + + # # This print statement using .get() already works correctly with dictionaries + # print(f"SSL Issuer Org: {result['ssl_certificate'].get('issuer', {}).get('O', 'N/A')}") + # print(f"SSL Valid From: {result['ssl_certificate'].get('not_before', 'N/A')}") + # else: + # # This part remains the same + # print("SSL Certificate was null in the result.") + + + # 7. Deep Crawl with Proxy Rotation (Requires PROXIES env var) + async def test_deep_crawl_with_proxies(self, async_client: httpx.AsyncClient): + """Test BFS deep crawl using proxy rotation.""" + proxies = load_proxies_from_env() + if not proxies: + pytest.skip("Skipping proxy test: PROXIES environment variable not set or empty.") + + print(f"\nTesting with {len(proxies)} proxies loaded from environment.") + + max_depth = 1 + max_pages = 3 + payload = { + "urls": [DEEP_CRAWL_BASE_URL], # Use the dummy site + # Use a BrowserConfig that *might* pick up proxy if set, but rely on CrawlerRunConfig + "browser_config": {"type": "BrowserConfig", "params": {"headless": True}}, + "crawler_config": { + "type": "CrawlerRunConfig", + "params": { + "stream": False, + "cache_mode": "BYPASS", + "proxy_rotation_strategy": { # <-- Define the strategy + "type": "RoundRobinProxyStrategy", + "params": { + # Convert ProxyConfig dicts back to the serialized format expected by server + "proxies": [{"type": "ProxyConfig", "params": p} for p in proxies] + } + }, + "deep_crawl_strategy": { + "type": "BFSDeepCrawlStrategy", + "params": { + "max_depth": max_depth, + "max_pages": max_pages, + "filter_chain": { + "type": "FilterChain", + "params": { "filters": [{"type": "DomainFilter", "params": {"allowed_domains": [DEEP_CRAWL_DOMAIN]}}]} + } + } + } + } + } + } + try: + response = await async_client.post("/crawl", json=payload) + response.raise_for_status() + data = response.json() + except httpx.HTTPStatusError as e: + # Proxies often cause connection errors, catch them + pytest.fail(f"Proxy deep crawl failed: {e}. Response: {e.response.text}. Are proxies valid and accessible by the server?") + except httpx.RequestError as e: + pytest.fail(f"Proxy deep crawl request failed: {e}. Are proxies valid and accessible?") + + assert data["success"] is True + assert len(data["results"]) > 0 + assert len(data["results"]) <= max_pages + # Primary assertion is that the crawl succeeded *with* proxy config + print(f"Proxy deep crawl completed successfully for {len(data['results'])} pages.") + + # Verifying specific proxy usage requires server logs or custom headers/responses + + +# --- Main Execution Block (for running script directly) --- +if __name__ == "__main__": + pytest_args = ["-v", "-s", __file__] + # Example: Run only proxy test + # pytest_args.append("-k test_deep_crawl_with_proxies") + print(f"Running pytest with args: {pytest_args}") + exit_code = pytest.main(pytest_args) + print(f"Pytest finished with exit code: {exit_code}") \ No newline at end of file diff --git a/tests/general/generate_dummy_site.py b/tests/general/generate_dummy_site.py new file mode 100644 index 00000000..d4218b6b --- /dev/null +++ b/tests/general/generate_dummy_site.py @@ -0,0 +1,335 @@ +# ==== File: build_dummy_site.py ==== + +import os +import random +import argparse +from pathlib import Path +from urllib.parse import quote + +# --- Configuration --- +NUM_CATEGORIES = 3 +NUM_SUBCATEGORIES_PER_CAT = 2 # Results in NUM_CATEGORIES * NUM_SUBCATEGORIES_PER_CAT total L2 categories +NUM_PRODUCTS_PER_SUBCAT = 5 # Products listed on L3 pages +MAX_DEPTH_TARGET = 5 # Explicitly set target depth + +# --- Helper Functions --- + +def generate_lorem(words=20): + """Generates simple placeholder text.""" + lorem_words = ["lorem", "ipsum", "dolor", "sit", "amet", "consectetur", + "adipiscing", "elit", "sed", "do", "eiusmod", "tempor", + "incididunt", "ut", "labore", "et", "dolore", "magna", "aliqua"] + return " ".join(random.choice(lorem_words) for _ in range(words)).capitalize() + "." + +def create_html_page(filepath: Path, title: str, body_content: str, breadcrumbs: list = [], head_extras: str = ""): + """Creates an HTML file with basic structure and inline CSS.""" + os.makedirs(filepath.parent, exist_ok=True) + + # Generate breadcrumb HTML using the 'link' provided in the breadcrumbs list + breadcrumb_html = "" + if breadcrumbs: + links_html = " » ".join(f'{bc["name"]}' for bc in breadcrumbs) + breadcrumb_html = f"" + + # Basic CSS for structure identification (kept the same) + css = """ + + """ + html_content = f""" + + + + + {title} - FakeShop + {head_extras} + {css} + + +
+ {breadcrumb_html} +

{title}

+ {body_content} +
+ +""" + with open(filepath, "w", encoding="utf-8") as f: + f.write(html_content) + # Keep print statement concise for clarity + # print(f"Created: {filepath}") + +def generate_site(base_dir: Path, site_name: str = "FakeShop", base_path: str = ""): + """Generates the dummy website structure.""" + base_dir.mkdir(parents=True, exist_ok=True) + + # --- Clean and prepare the base path for URL construction --- + # Ensure it starts with '/' if not empty, and remove any trailing '/' + if base_path: + full_base_path = "/" + base_path.strip('/') + else: + full_base_path = "" # Represents the root + + print(f"Using base path for links: '{full_base_path}'") + + # --- Level 0: Homepage --- + home_body = "

Welcome to FakeShop!

Your one-stop shop for imaginary items.

Categories:

\n" + create_html_page(base_dir / "index.html", "Homepage", home_body, []) # No breadcrumbs *on* the homepage itself + + # --- Levels 1-5 --- + for i in range(NUM_CATEGORIES): + cat_name = f"Category-{i+1}" + cat_folder_name = quote(cat_name.lower().replace(" ", "-")) + cat_dir = base_dir / cat_folder_name + # This is the *absolute* path for the breadcrumb link + cat_link_path = f"{full_base_path}/{cat_folder_name}/index.html" + # Update breadcrumbs list for this level + breadcrumbs_cat = breadcrumbs_home + [{"name": cat_name, "link": cat_link_path}] + + # --- Level 1: Category Page --- + cat_body = f"

{generate_lorem(15)} for {cat_name}.

Sub-Categories:

\n" + # Pass the updated breadcrumbs list + create_html_page(cat_dir / "index.html", cat_name, cat_body, breadcrumbs_home) # Parent breadcrumb needed here + + for j in range(NUM_SUBCATEGORIES_PER_CAT): + subcat_name = f"{cat_name}-Sub-{j+1}" + subcat_folder_name = quote(subcat_name.lower().replace(" ", "-")) + subcat_dir = cat_dir / subcat_folder_name + # Absolute path for the breadcrumb link + subcat_link_path = f"{full_base_path}/{cat_folder_name}/{subcat_folder_name}/index.html" + # Update breadcrumbs list for this level + breadcrumbs_subcat = breadcrumbs_cat + [{"name": subcat_name, "link": subcat_link_path}] + + # --- Level 2: Sub-Category Page (Product List) --- + subcat_body = f"

Explore products in {subcat_name}. {generate_lorem(12)}

Products:

\n" # Close product-list ul + # Pass the correct breadcrumbs list for the subcategory index page + create_html_page(subcat_dir / "index.html", subcat_name, subcat_body, breadcrumbs_cat) # Parent breadcrumb needed here + + +# --- Main Execution --- +if __name__ == "__main__": + parser = argparse.ArgumentParser(description="Generate a dummy multi-level retail website.") + parser.add_argument( + "-o", "--output-dir", + type=str, + default="dummy_retail_site", + help="Directory to generate the website in." + ) + parser.add_argument( + "-n", "--site-name", + type=str, + default="FakeShop", + help="Name of the fake shop." + ) + parser.add_argument( + "-b", "--base-path", + type=str, + default="", + help="Base path for hosting the site (e.g., 'samples/deepcrawl'). Leave empty if hosted at the root." + ) + # Optional: Add more args to configure counts if needed + + args = parser.parse_args() + + output_directory = Path(args.output_dir) + site_name = args.site_name + base_path = args.base_path + + print(f"Generating dummy site '{site_name}' in '{output_directory}'...") + # Pass the base_path to the generation function + generate_site(output_directory, site_name, base_path) + print(f"\nCreated {sum(1 for _ in output_directory.rglob('*.html'))} HTML pages.") + print("Dummy site generation complete.") + print(f"To serve locally (example): python -m http.server --directory {output_directory} 8000") + if base_path: + print(f"Access the site at: http://localhost:8000/{base_path.strip('/')}/index.html") + else: + print(f"Access the site at: http://localhost:8000/index.html") \ No newline at end of file