refactor(proxy): consolidate proxy configuration handling

Moves ProxyConfig from configs/ directory into proxy_strategy.py to improve code organization and reduce fragmentation. Updates all imports and type hints to reflect the new location.

Key changes:
- Moved ProxyConfig class from configs/proxy_config.py to proxy_strategy.py
- Updated type hints in async_configs.py to support ProxyConfig
- Fixed proxy configuration handling in browser_manager.py
- Updated documentation and examples to use new import path

BREAKING CHANGE: ProxyConfig import path has changed from crawl4ai.configs to crawl4ai.proxy_strategy
This commit is contained in:
UncleCode
2025-03-07 23:14:11 +08:00
parent a68cbb232b
commit 4aeb7ef9ad
11 changed files with 311 additions and 129 deletions

View File

@@ -26,6 +26,8 @@ import inspect
from typing import Any, Dict, Optional
from enum import Enum
from .proxy_strategy import ProxyConfig
def to_serializable_dict(obj: Any, ignore_default_value : bool = False) -> Dict:
"""
@@ -180,7 +182,7 @@ class BrowserConfig:
is "chromium". Default: "chromium".
proxy (Optional[str]): Proxy server URL (e.g., "http://username:password@proxy:port"). If None, no proxy is used.
Default: None.
proxy_config (dict or None): Detailed proxy configuration, e.g. {"server": "...", "username": "..."}.
proxy_config (ProxyConfig or dict or None): Detailed proxy configuration, e.g. {"server": "...", "username": "..."}.
If None, no additional proxy config. Default: None.
viewport_width (int): Default viewport width for pages. Default: 1080.
viewport_height (int): Default viewport height for pages. Default: 600.
@@ -225,7 +227,7 @@ class BrowserConfig:
chrome_channel: str = "chromium",
channel: str = "chromium",
proxy: str = None,
proxy_config: dict = None,
proxy_config: Union[ProxyConfig, dict, None] = None,
viewport_width: int = 1080,
viewport_height: int = 600,
viewport: dict = None,
@@ -315,7 +317,7 @@ class BrowserConfig:
chrome_channel=kwargs.get("chrome_channel", "chromium"),
channel=kwargs.get("channel", "chromium"),
proxy=kwargs.get("proxy"),
proxy_config=kwargs.get("proxy_config"),
proxy_config=kwargs.get("proxy_config", None),
viewport_width=kwargs.get("viewport_width", 1080),
viewport_height=kwargs.get("viewport_height", 600),
accept_downloads=kwargs.get("accept_downloads", False),
@@ -515,7 +517,7 @@ class CrawlerRunConfig():
Default: "lxml".
scraping_strategy (ContentScrapingStrategy): Scraping strategy to use.
Default: WebScrapingStrategy.
proxy_config (dict or None): Detailed proxy configuration, e.g. {"server": "...", "username": "..."}.
proxy_config (ProxyConfig or dict or None): Detailed proxy configuration, e.g. {"server": "...", "username": "..."}.
If None, no additional proxy config. Default: None.
# SSL Parameters
@@ -656,7 +658,7 @@ class CrawlerRunConfig():
prettiify: bool = False,
parser_type: str = "lxml",
scraping_strategy: ContentScrapingStrategy = None,
proxy_config: dict = None,
proxy_config: Union[ProxyConfig, dict, None] = None,
proxy_rotation_strategy: Optional[ProxyRotationStrategy] = None,
# SSL Parameters
fetch_ssl_certificate: bool = False,

View File

@@ -767,6 +767,7 @@ class AsyncPlaywrightCrawlerStrategy(AsyncCrawlerStrategy):
# Handle wait_for condition
# Todo: Decide how to handle this
if not config.wait_for and config.css_selector and False:
# if not config.wait_for and config.css_selector:
config.wait_for = f"css:{config.css_selector}"
if config.wait_for:
@@ -806,8 +807,28 @@ class AsyncPlaywrightCrawlerStrategy(AsyncCrawlerStrategy):
if config.remove_overlay_elements:
await self.remove_overlay_elements(page)
# Get final HTML content
html = await page.content()
if config.css_selector:
try:
# Handle comma-separated selectors by splitting them
selectors = [s.strip() for s in config.css_selector.split(',')]
html_parts = []
for selector in selectors:
try:
content = await page.evaluate(f"document.querySelector('{selector}')?.outerHTML || ''")
html_parts.append(content)
except Error as e:
print(f"Warning: Could not get content for selector '{selector}': {str(e)}")
# Wrap in a div to create a valid HTML structure
html = f"<div class='crawl4ai-result'>\n" + "\n".join(html_parts) + "\n</div>"
except Error as e:
raise RuntimeError(f"Failed to extract HTML content: {str(e)}")
else:
html = await page.content()
# # Get final HTML content
# html = await page.content()
await self.execute_hook(
"before_return_html", page=page, html=html, context=context, config=config
)

View File

@@ -531,9 +531,9 @@ class BrowserManager:
ProxySettings(server=self.config.proxy)
if self.config.proxy
else ProxySettings(
server=self.config.proxy_config.get("server"),
username=self.config.proxy_config.get("username"),
password=self.config.proxy_config.get("password"),
server=self.config.proxy_config.server,
username=self.config.proxy_config.username,
password=self.config.proxy_config.password,
)
)
browser_args["proxy"] = proxy_settings

View File

@@ -1,2 +0,0 @@
from .proxy_config import ProxyConfig
__all__ = ["ProxyConfig"]

View File

@@ -1,113 +0,0 @@
import os
from typing import Dict, List, Optional
class ProxyConfig:
def __init__(
self,
server: str,
username: Optional[str] = None,
password: Optional[str] = None,
ip: Optional[str] = None,
):
"""Configuration class for a single proxy.
Args:
server: Proxy server URL (e.g., "http://127.0.0.1:8080")
username: Optional username for proxy authentication
password: Optional password for proxy authentication
ip: Optional IP address for verification purposes
"""
self.server = server
self.username = username
self.password = password
# Extract IP from server if not explicitly provided
self.ip = ip or self._extract_ip_from_server()
def _extract_ip_from_server(self) -> Optional[str]:
"""Extract IP address from server URL."""
try:
# Simple extraction assuming http://ip:port format
if "://" in self.server:
parts = self.server.split("://")[1].split(":")
return parts[0]
else:
parts = self.server.split(":")
return parts[0]
except Exception:
return None
@staticmethod
def from_string(proxy_str: str) -> "ProxyConfig":
"""Create a ProxyConfig from a string in the format 'ip:port:username:password'."""
parts = proxy_str.split(":")
if len(parts) == 4: # ip:port:username:password
ip, port, username, password = parts
return ProxyConfig(
server=f"http://{ip}:{port}",
username=username,
password=password,
ip=ip
)
elif len(parts) == 2: # ip:port only
ip, port = parts
return ProxyConfig(
server=f"http://{ip}:{port}",
ip=ip
)
else:
raise ValueError(f"Invalid proxy string format: {proxy_str}")
@staticmethod
def from_dict(proxy_dict: Dict) -> "ProxyConfig":
"""Create a ProxyConfig from a dictionary."""
return ProxyConfig(
server=proxy_dict.get("server"),
username=proxy_dict.get("username"),
password=proxy_dict.get("password"),
ip=proxy_dict.get("ip")
)
@staticmethod
def from_env(env_var: str = "PROXIES") -> List["ProxyConfig"]:
"""Load proxies from environment variable.
Args:
env_var: Name of environment variable containing comma-separated proxy strings
Returns:
List of ProxyConfig objects
"""
proxies = []
try:
proxy_list = os.getenv(env_var, "").split(",")
for proxy in proxy_list:
if not proxy:
continue
proxies.append(ProxyConfig.from_string(proxy))
except Exception as e:
print(f"Error loading proxies from environment: {e}")
return proxies
def to_dict(self) -> Dict:
"""Convert to dictionary representation."""
return {
"server": self.server,
"username": self.username,
"password": self.password,
"ip": self.ip
}
def clone(self, **kwargs) -> "ProxyConfig":
"""Create a copy of this configuration with updated values.
Args:
**kwargs: Key-value pairs of configuration options to update
Returns:
ProxyConfig: A new instance with the specified updates
"""
config_dict = self.to_dict()
config_dict.update(kwargs)
return ProxyConfig.from_dict(config_dict)

View File

@@ -742,7 +742,7 @@ class WebScrapingStrategy(ContentScrapingStrategy):
for element in body.select(excluded_selector):
element.extract()
if css_selector:
if False and css_selector:
selected_elements = body.select(css_selector)
if not selected_elements:
return {
@@ -848,6 +848,7 @@ class WebScrapingStrategy(ContentScrapingStrategy):
return {
# **markdown_content,
"scraped_html": html,
"cleaned_html": cleaned_html,
"success": success,
"media": media,

View File

@@ -1,8 +1,119 @@
from typing import List, Dict, Optional
from abc import ABC, abstractmethod
from itertools import cycle
import os
class ProxyConfig:
def __init__(
self,
server: str,
username: Optional[str] = None,
password: Optional[str] = None,
ip: Optional[str] = None,
):
"""Configuration class for a single proxy.
Args:
server: Proxy server URL (e.g., "http://127.0.0.1:8080")
username: Optional username for proxy authentication
password: Optional password for proxy authentication
ip: Optional IP address for verification purposes
"""
self.server = server
self.username = username
self.password = password
# Extract IP from server if not explicitly provided
self.ip = ip or self._extract_ip_from_server()
def _extract_ip_from_server(self) -> Optional[str]:
"""Extract IP address from server URL."""
try:
# Simple extraction assuming http://ip:port format
if "://" in self.server:
parts = self.server.split("://")[1].split(":")
return parts[0]
else:
parts = self.server.split(":")
return parts[0]
except Exception:
return None
@staticmethod
def from_string(proxy_str: str) -> "ProxyConfig":
"""Create a ProxyConfig from a string in the format 'ip:port:username:password'."""
parts = proxy_str.split(":")
if len(parts) == 4: # ip:port:username:password
ip, port, username, password = parts
return ProxyConfig(
server=f"http://{ip}:{port}",
username=username,
password=password,
ip=ip
)
elif len(parts) == 2: # ip:port only
ip, port = parts
return ProxyConfig(
server=f"http://{ip}:{port}",
ip=ip
)
else:
raise ValueError(f"Invalid proxy string format: {proxy_str}")
@staticmethod
def from_dict(proxy_dict: Dict) -> "ProxyConfig":
"""Create a ProxyConfig from a dictionary."""
return ProxyConfig(
server=proxy_dict.get("server"),
username=proxy_dict.get("username"),
password=proxy_dict.get("password"),
ip=proxy_dict.get("ip")
)
@staticmethod
def from_env(env_var: str = "PROXIES") -> List["ProxyConfig"]:
"""Load proxies from environment variable.
Args:
env_var: Name of environment variable containing comma-separated proxy strings
Returns:
List of ProxyConfig objects
"""
proxies = []
try:
proxy_list = os.getenv(env_var, "").split(",")
for proxy in proxy_list:
if not proxy:
continue
proxies.append(ProxyConfig.from_string(proxy))
except Exception as e:
print(f"Error loading proxies from environment: {e}")
return proxies
def to_dict(self) -> Dict:
"""Convert to dictionary representation."""
return {
"server": self.server,
"username": self.username,
"password": self.password,
"ip": self.ip
}
def clone(self, **kwargs) -> "ProxyConfig":
"""Create a copy of this configuration with updated values.
Args:
**kwargs: Key-value pairs of configuration options to update
Returns:
ProxyConfig: A new instance with the specified updates
"""
config_dict = self.to_dict()
config_dict.update(kwargs)
return ProxyConfig.from_dict(config_dict)
from crawl4ai.configs import ProxyConfig
class ProxyRotationStrategy(ABC):
"""Base abstract class for proxy rotation strategies"""

View File

@@ -13,7 +13,7 @@ from crawl4ai.deep_crawling import (
)
from crawl4ai.deep_crawling.scorers import KeywordRelevanceScorer
from crawl4ai.async_crawler_strategy import AsyncHTTPCrawlerStrategy
from crawl4ai.configs import ProxyConfig
from crawl4ai.proxy_strategy import ProxyConfig
from crawl4ai import RoundRobinProxyStrategy
from crawl4ai.content_filter_strategy import LLMContentFilter
from crawl4ai import DefaultMarkdownGenerator

View File

@@ -251,7 +251,7 @@ from crawl4ai import (
RoundRobinProxyStrategy,
)
import asyncio
from crawl4ai.configs import ProxyConfig
from crawl4ai.proxy_strategy import ProxyConfig
async def main():
# Load proxies and create rotation strategy
proxies = ProxyConfig.from_env()

View File

@@ -0,0 +1,162 @@
import asyncio
from typing import List
from crawl4ai import (
AsyncWebCrawler,
CrawlerRunConfig,
BFSDeepCrawlStrategy,
CrawlResult,
URLFilter, # Base class for filters, not directly used in examples but good to import for context
ContentTypeFilter,
DomainFilter,
FilterChain,
URLPatternFilter,
SEOFilter # Advanced filter, can be introduced later or as bonus
)
async def deep_crawl_filter_tutorial_part_2():
"""
Tutorial demonstrating URL filters in Crawl4AI, focusing on isolated filter behavior
before integrating them into a deep crawl.
This tutorial covers:
- Testing individual filters with synthetic URLs.
- Understanding filter logic and behavior in isolation.
- Combining filters using FilterChain.
- Integrating filters into a deep crawling example.
"""
# === Introduction: URL Filters in Isolation ===
print("\n" + "=" * 40)
print("=== Introduction: URL Filters in Isolation ===")
print("=" * 40 + "\n")
print("In this section, we will explore each filter individually using synthetic URLs.")
print("This allows us to understand exactly how each filter works before using them in a crawl.\n")
# === 2. ContentTypeFilter - Testing in Isolation ===
print("\n" + "=" * 40)
print("=== 2. ContentTypeFilter - Testing in Isolation ===")
print("=" * 40 + "\n")
# 2.1. Create ContentTypeFilter:
# Create a ContentTypeFilter to allow only 'text/html' and 'application/json' content types
# BASED ON URL EXTENSIONS.
content_type_filter = ContentTypeFilter(allowed_types=["text/html", "application/json"])
print("ContentTypeFilter created, allowing types (by extension): ['text/html', 'application/json']")
print("Note: ContentTypeFilter in Crawl4ai works by checking URL file extensions, not HTTP headers.")
# 2.2. Synthetic URLs for Testing:
# ContentTypeFilter checks URL extensions. We provide URLs with different extensions to test.
test_urls_content_type = [
"https://example.com/page.html", # Should pass: .html extension (text/html)
"https://example.com/data.json", # Should pass: .json extension (application/json)
"https://example.com/image.png", # Should reject: .png extension (not allowed type)
"https://example.com/document.pdf", # Should reject: .pdf extension (not allowed type)
"https://example.com/page", # Should pass: no extension (defaults to allow) - check default behaviour!
"https://example.com/page.xhtml", # Should pass: .xhtml extension (text/html)
]
# 2.3. Apply Filter and Show Results:
print("\n=== Testing ContentTypeFilter (URL Extension based) ===")
for url in test_urls_content_type:
passed = content_type_filter.apply(url)
result = "PASSED" if passed else "REJECTED"
extension = ContentTypeFilter._extract_extension(url) # Show extracted extension for clarity
print(f"- URL: {url} - {result} (Extension: '{extension or 'No Extension'}')")
print("=" * 40)
input("Press Enter to continue to DomainFilter example...")
# === 3. DomainFilter - Testing in Isolation ===
print("\n" + "=" * 40)
print("=== 3. DomainFilter - Testing in Isolation ===")
print("=" * 40 + "\n")
# 3.1. Create DomainFilter:
domain_filter = DomainFilter(allowed_domains=["crawl4ai.com", "example.com"])
print("DomainFilter created, allowing domains: ['crawl4ai.com', 'example.com']")
# 3.2. Synthetic URLs for Testing:
test_urls_domain = [
"https://docs.crawl4ai.com/api",
"https://example.com/products",
"https://another-website.org/blog",
"https://sub.example.com/about",
"https://crawl4ai.com.attacker.net", # Corrected example: now should be rejected
]
# 3.3. Apply Filter and Show Results:
print("\n=== Testing DomainFilter ===")
for url in test_urls_domain:
passed = domain_filter.apply(url)
result = "PASSED" if passed else "REJECTED"
print(f"- URL: {url} - {result}")
print("=" * 40)
input("Press Enter to continue to FilterChain example...")
# === 4. FilterChain - Combining Filters ===
print("\n" + "=" * 40)
print("=== 4. FilterChain - Combining Filters ===")
print("=" * 40 + "\n")
combined_filter = FilterChain(
filters=[
URLPatternFilter(patterns=["*api*"]),
ContentTypeFilter(allowed_types=["text/html"]), # Still URL extension based
DomainFilter(allowed_domains=["docs.crawl4ai.com"]),
]
)
print("FilterChain created, combining URLPatternFilter, ContentTypeFilter, and DomainFilter.")
test_urls_combined = [
"https://docs.crawl4ai.com/api/async-webcrawler",
"https://example.com/api/products",
"https://docs.crawl4ai.com/core/crawling",
"https://another-website.org/api/data",
]
# 4.3. Apply FilterChain and Show Results
print("\n=== Testing FilterChain (URLPatternFilter + ContentTypeFilter + DomainFilter) ===")
for url in test_urls_combined:
passed = await combined_filter.apply(url)
result = "PASSED" if passed else "REJECTED"
print(f"- URL: {url} - {result}")
print("=" * 40)
input("Press Enter to continue to Deep Crawl with FilterChain example...")
# === 5. Deep Crawl with FilterChain ===
print("\n" + "=" * 40)
print("=== 5. Deep Crawl with FilterChain ===")
print("=" * 40 + "\n")
print("Finally, let's integrate the FilterChain into a deep crawl example.")
config_final_crawl = CrawlerRunConfig(
deep_crawl_strategy=BFSDeepCrawlStrategy(
max_depth=2,
max_pages=10,
include_external=False,
filter_chain=combined_filter
),
verbose=False,
)
async with AsyncWebCrawler() as crawler:
results_final_crawl: List[CrawlResult] = await crawler.arun(
url="https://docs.crawl4ai.com", config=config_final_crawl
)
print("=== Crawled URLs (Deep Crawl with FilterChain) ===")
for result in results_final_crawl:
print(f"- {result.url}, Depth: {result.metadata.get('depth', 0)}")
print("=" * 40)
print("\nTutorial Completed! Review the output of each section to understand URL filters.")
if __name__ == "__main__":
asyncio.run(deep_crawl_filter_tutorial_part_2())