From 9e7f5aa44b3b477add54ba9fb16f59e66b5213bc Mon Sep 17 00:00:00 2001 From: unclecode Date: Fri, 26 Dec 2025 12:45:57 +0000 Subject: [PATCH] Updates on proxy rotation and proxy configuration --- crawl4ai/async_configs.py | 30 +- crawl4ai/async_webcrawler.py | 71 +++- crawl4ai/proxy_strategy.py | 195 +++++++++- tests/proxy/test_sticky_sessions.py | 569 ++++++++++++++++++++++++++++ 4 files changed, 843 insertions(+), 22 deletions(-) create mode 100644 tests/proxy/test_sticky_sessions.py diff --git a/crawl4ai/async_configs.py b/crawl4ai/async_configs.py index 749ae717..664ad16b 100644 --- a/crawl4ai/async_configs.py +++ b/crawl4ai/async_configs.py @@ -1033,6 +1033,18 @@ class CrawlerRunConfig(): proxy_config (ProxyConfig or dict or None): Detailed proxy configuration, e.g. {"server": "...", "username": "..."}. If None, no additional proxy config. Default: None. + # Sticky Proxy Session Parameters + proxy_session_id (str or None): When set, maintains the same proxy for all requests sharing this session ID. + The proxy is acquired on first request and reused for subsequent requests. + Session expires when explicitly released or crawler context is closed. + Default: None. + proxy_session_ttl (int or None): Time-to-live for sticky session in seconds. + After TTL expires, a new proxy is acquired on next request. + Default: None (session lasts until explicitly released or crawler closes). + proxy_session_auto_release (bool): If True, automatically release the proxy session after a batch operation. + Useful for arun_many() to clean up sessions automatically. + Default: False. + # Browser Location and Identity Parameters locale (str or None): Locale to use for the browser context (e.g., "en-US"). Default: None. @@ -1221,6 +1233,10 @@ class CrawlerRunConfig(): scraping_strategy: ContentScrapingStrategy = None, proxy_config: Union[ProxyConfig, dict, None] = None, proxy_rotation_strategy: Optional[ProxyRotationStrategy] = None, + # Sticky Proxy Session Parameters + proxy_session_id: Optional[str] = None, + proxy_session_ttl: Optional[int] = None, + proxy_session_auto_release: bool = False, # Browser Location and Identity Parameters locale: Optional[str] = None, timezone_id: Optional[str] = None, @@ -1337,7 +1353,12 @@ class CrawlerRunConfig(): self.proxy_config = ProxyConfig.from_string(proxy_config) self.proxy_rotation_strategy = proxy_rotation_strategy - + + # Sticky Proxy Session Parameters + self.proxy_session_id = proxy_session_id + self.proxy_session_ttl = proxy_session_ttl + self.proxy_session_auto_release = proxy_session_auto_release + # Browser Location and Identity Parameters self.locale = locale self.timezone_id = timezone_id @@ -1621,6 +1642,10 @@ class CrawlerRunConfig(): scraping_strategy=kwargs.get("scraping_strategy"), proxy_config=kwargs.get("proxy_config"), proxy_rotation_strategy=kwargs.get("proxy_rotation_strategy"), + # Sticky Proxy Session Parameters + proxy_session_id=kwargs.get("proxy_session_id"), + proxy_session_ttl=kwargs.get("proxy_session_ttl"), + proxy_session_auto_release=kwargs.get("proxy_session_auto_release", False), # Browser Location and Identity Parameters locale=kwargs.get("locale", None), timezone_id=kwargs.get("timezone_id", None), @@ -1746,6 +1771,9 @@ class CrawlerRunConfig(): "scraping_strategy": self.scraping_strategy, "proxy_config": self.proxy_config, "proxy_rotation_strategy": self.proxy_rotation_strategy, + "proxy_session_id": self.proxy_session_id, + "proxy_session_ttl": self.proxy_session_ttl, + "proxy_session_auto_release": self.proxy_session_auto_release, "locale": self.locale, "timezone_id": self.timezone_id, "geolocation": self.geolocation, diff --git a/crawl4ai/async_webcrawler.py b/crawl4ai/async_webcrawler.py index 95468a28..ef03cb74 100644 --- a/crawl4ai/async_webcrawler.py +++ b/crawl4ai/async_webcrawler.py @@ -343,15 +343,32 @@ class AsyncWebCrawler: # Update proxy configuration from rotation strategy if available if config and config.proxy_rotation_strategy: - next_proxy: ProxyConfig = await config.proxy_rotation_strategy.get_next_proxy() - if next_proxy: - self.logger.info( - message="Switch proxy: {proxy}", - tag="PROXY", - params={"proxy": next_proxy.server} + # Handle sticky sessions - use same proxy for all requests with same session_id + if config.proxy_session_id: + next_proxy: ProxyConfig = await config.proxy_rotation_strategy.get_proxy_for_session( + config.proxy_session_id, + ttl=config.proxy_session_ttl ) - config.proxy_config = next_proxy - # config = config.clone(proxy_config=next_proxy) + if next_proxy: + self.logger.info( + message="Using sticky proxy session: {session_id} -> {proxy}", + tag="PROXY", + params={ + "session_id": config.proxy_session_id, + "proxy": next_proxy.server + } + ) + config.proxy_config = next_proxy + else: + # Existing behavior: rotate on each request + next_proxy: ProxyConfig = await config.proxy_rotation_strategy.get_next_proxy() + if next_proxy: + self.logger.info( + message="Switch proxy: {proxy}", + tag="PROXY", + params={"proxy": next_proxy.server} + ) + config.proxy_config = next_proxy # Fetch fresh content if needed if not cached_result or not html: @@ -833,21 +850,45 @@ class AsyncWebCrawler: # Handle stream setting - use first config's stream setting if config is a list if isinstance(config, list): stream = config[0].stream if config else False + primary_config = config[0] if config else None else: stream = config.stream + primary_config = config + + # Helper to release sticky session if auto_release is enabled + async def maybe_release_session(): + if (primary_config and + primary_config.proxy_session_id and + primary_config.proxy_session_auto_release and + primary_config.proxy_rotation_strategy): + await primary_config.proxy_rotation_strategy.release_session( + primary_config.proxy_session_id + ) + self.logger.info( + message="Auto-released proxy session: {session_id}", + tag="PROXY", + params={"session_id": primary_config.proxy_session_id} + ) if stream: - async def result_transformer(): - async for task_result in dispatcher.run_urls_stream( - crawler=self, urls=urls, config=config - ): - yield transform_result(task_result) + try: + async for task_result in dispatcher.run_urls_stream( + crawler=self, urls=urls, config=config + ): + yield transform_result(task_result) + finally: + # Auto-release session after streaming completes + await maybe_release_session() return result_transformer() else: - _results = await dispatcher.run_urls(crawler=self, urls=urls, config=config) - return [transform_result(res) for res in _results] + try: + _results = await dispatcher.run_urls(crawler=self, urls=urls, config=config) + return [transform_result(res) for res in _results] + finally: + # Auto-release session after batch completes + await maybe_release_session() async def aseed_urls( self, diff --git a/crawl4ai/proxy_strategy.py b/crawl4ai/proxy_strategy.py index 2c01a2f5..9bd1fe95 100644 --- a/crawl4ai/proxy_strategy.py +++ b/crawl4ai/proxy_strategy.py @@ -1,7 +1,9 @@ -from typing import List, Dict, Optional +from typing import List, Dict, Optional, Tuple from abc import ABC, abstractmethod from itertools import cycle import os +import asyncio +import time ########### ATTENTION PEOPLE OF EARTH ########### @@ -120,7 +122,7 @@ class ProxyConfig: class ProxyRotationStrategy(ABC): """Base abstract class for proxy rotation strategies""" - + @abstractmethod async def get_next_proxy(self) -> Optional[ProxyConfig]: """Get next proxy configuration from the strategy""" @@ -131,18 +133,81 @@ class ProxyRotationStrategy(ABC): """Add proxy configurations to the strategy""" pass -class RoundRobinProxyStrategy: - """Simple round-robin proxy rotation strategy using ProxyConfig objects""" + @abstractmethod + async def get_proxy_for_session( + self, + session_id: str, + ttl: Optional[int] = None + ) -> Optional[ProxyConfig]: + """ + Get or create a sticky proxy for a session. + + If session_id already has an assigned proxy (and hasn't expired), return it. + If session_id is new, acquire a new proxy and associate it. + + Args: + session_id: Unique session identifier + ttl: Optional time-to-live in seconds for this session + + Returns: + ProxyConfig for this session + """ + pass + + @abstractmethod + async def release_session(self, session_id: str) -> None: + """ + Release a sticky session, making the proxy available for reuse. + + Args: + session_id: Session to release + """ + pass + + @abstractmethod + def get_session_proxy(self, session_id: str) -> Optional[ProxyConfig]: + """ + Get the proxy for an existing session without creating new one. + + Args: + session_id: Session to look up + + Returns: + ProxyConfig if session exists and hasn't expired, None otherwise + """ + pass + + @abstractmethod + def get_active_sessions(self) -> Dict[str, ProxyConfig]: + """ + Get all active sticky sessions. + + Returns: + Dictionary mapping session_id to ProxyConfig + """ + pass + +class RoundRobinProxyStrategy(ProxyRotationStrategy): + """Simple round-robin proxy rotation strategy using ProxyConfig objects. + + Supports sticky sessions where a session_id can be bound to a specific proxy + for the duration of the session. This is useful for deep crawling where + you want to maintain the same IP address across multiple requests. + """ def __init__(self, proxies: List[ProxyConfig] = None): """ Initialize with optional list of proxy configurations - + Args: proxies: List of ProxyConfig objects """ - self._proxies = [] + self._proxies: List[ProxyConfig] = [] self._proxy_cycle = None + # Session tracking: maps session_id -> (ProxyConfig, created_at, ttl) + self._sessions: Dict[str, Tuple[ProxyConfig, float, Optional[int]]] = {} + self._session_lock = asyncio.Lock() + if proxies: self.add_proxies(proxies) @@ -156,3 +221,121 @@ class RoundRobinProxyStrategy: if not self._proxy_cycle: return None return next(self._proxy_cycle) + + async def get_proxy_for_session( + self, + session_id: str, + ttl: Optional[int] = None + ) -> Optional[ProxyConfig]: + """ + Get or create a sticky proxy for a session. + + If session_id already has an assigned proxy (and hasn't expired), return it. + If session_id is new, acquire a new proxy and associate it. + + Args: + session_id: Unique session identifier + ttl: Optional time-to-live in seconds for this session + + Returns: + ProxyConfig for this session + """ + async with self._session_lock: + # Check if session exists and hasn't expired + if session_id in self._sessions: + proxy, created_at, session_ttl = self._sessions[session_id] + + # Check TTL expiration + effective_ttl = ttl if ttl is not None else session_ttl + if effective_ttl is not None: + elapsed = time.time() - created_at + if elapsed >= effective_ttl: + # Session expired, remove it and get new proxy + del self._sessions[session_id] + else: + return proxy + else: + return proxy + + # Acquire new proxy for this session + proxy = await self.get_next_proxy() + if proxy: + self._sessions[session_id] = (proxy, time.time(), ttl) + + return proxy + + async def release_session(self, session_id: str) -> None: + """ + Release a sticky session, making the proxy available for reuse. + + Args: + session_id: Session to release + """ + async with self._session_lock: + if session_id in self._sessions: + del self._sessions[session_id] + + def get_session_proxy(self, session_id: str) -> Optional[ProxyConfig]: + """ + Get the proxy for an existing session without creating new one. + + Args: + session_id: Session to look up + + Returns: + ProxyConfig if session exists and hasn't expired, None otherwise + """ + if session_id not in self._sessions: + return None + + proxy, created_at, ttl = self._sessions[session_id] + + # Check TTL expiration + if ttl is not None: + elapsed = time.time() - created_at + if elapsed >= ttl: + return None + + return proxy + + def get_active_sessions(self) -> Dict[str, ProxyConfig]: + """ + Get all active sticky sessions (excluding expired ones). + + Returns: + Dictionary mapping session_id to ProxyConfig + """ + current_time = time.time() + active_sessions = {} + + for session_id, (proxy, created_at, ttl) in self._sessions.items(): + # Skip expired sessions + if ttl is not None: + elapsed = current_time - created_at + if elapsed >= ttl: + continue + active_sessions[session_id] = proxy + + return active_sessions + + async def cleanup_expired_sessions(self) -> int: + """ + Remove all expired sessions from tracking. + + Returns: + Number of sessions removed + """ + async with self._session_lock: + current_time = time.time() + expired = [] + + for session_id, (proxy, created_at, ttl) in self._sessions.items(): + if ttl is not None: + elapsed = current_time - created_at + if elapsed >= ttl: + expired.append(session_id) + + for session_id in expired: + del self._sessions[session_id] + + return len(expired) diff --git a/tests/proxy/test_sticky_sessions.py b/tests/proxy/test_sticky_sessions.py new file mode 100644 index 00000000..738240b6 --- /dev/null +++ b/tests/proxy/test_sticky_sessions.py @@ -0,0 +1,569 @@ +""" +Comprehensive test suite for Sticky Proxy Sessions functionality. + +Tests cover: +1. Basic sticky session - same proxy for same session_id +2. Different sessions get different proxies +3. Session release +4. TTL expiration +5. Thread safety / concurrent access +6. Integration tests with AsyncWebCrawler +""" + +import asyncio +import os +import time +import pytest +from unittest.mock import patch + +from crawl4ai import AsyncWebCrawler, BrowserConfig +from crawl4ai.async_configs import CrawlerRunConfig, ProxyConfig +from crawl4ai.proxy_strategy import RoundRobinProxyStrategy +from crawl4ai.cache_context import CacheMode + + +class TestRoundRobinProxyStrategySession: + """Test suite for RoundRobinProxyStrategy session methods.""" + + def setup_method(self): + """Setup for each test method.""" + self.proxies = [ + ProxyConfig(server=f"http://proxy{i}.test:8080") + for i in range(5) + ] + + # ==================== BASIC STICKY SESSION TESTS ==================== + + @pytest.mark.asyncio + async def test_sticky_session_same_proxy(self): + """Verify same proxy is returned for same session_id.""" + strategy = RoundRobinProxyStrategy(self.proxies) + + # First call - acquires proxy + proxy1 = await strategy.get_proxy_for_session("session-1") + + # Second call - should return same proxy + proxy2 = await strategy.get_proxy_for_session("session-1") + + # Third call - should return same proxy + proxy3 = await strategy.get_proxy_for_session("session-1") + + assert proxy1 is not None + assert proxy1.server == proxy2.server == proxy3.server + + @pytest.mark.asyncio + async def test_different_sessions_different_proxies(self): + """Verify different session_ids can get different proxies.""" + strategy = RoundRobinProxyStrategy(self.proxies) + + proxy_a = await strategy.get_proxy_for_session("session-a") + proxy_b = await strategy.get_proxy_for_session("session-b") + proxy_c = await strategy.get_proxy_for_session("session-c") + + # All should be different (round-robin) + servers = {proxy_a.server, proxy_b.server, proxy_c.server} + assert len(servers) == 3 + + @pytest.mark.asyncio + async def test_sticky_session_with_regular_rotation(self): + """Verify sticky sessions don't interfere with regular rotation.""" + strategy = RoundRobinProxyStrategy(self.proxies) + + # Acquire a sticky session + session_proxy = await strategy.get_proxy_for_session("sticky-session") + + # Regular rotation should continue independently + regular_proxy1 = await strategy.get_next_proxy() + regular_proxy2 = await strategy.get_next_proxy() + + # Sticky session should still return same proxy + session_proxy_again = await strategy.get_proxy_for_session("sticky-session") + + assert session_proxy.server == session_proxy_again.server + # Regular proxies should rotate + assert regular_proxy1.server != regular_proxy2.server + + # ==================== SESSION RELEASE TESTS ==================== + + @pytest.mark.asyncio + async def test_session_release(self): + """Verify session can be released and reacquired.""" + strategy = RoundRobinProxyStrategy(self.proxies) + + # Acquire session + proxy1 = await strategy.get_proxy_for_session("session-1") + assert strategy.get_session_proxy("session-1") is not None + + # Release session + await strategy.release_session("session-1") + assert strategy.get_session_proxy("session-1") is None + + # Reacquire - should get a new proxy (next in round-robin) + proxy2 = await strategy.get_proxy_for_session("session-1") + assert proxy2 is not None + # After release, next call gets the next proxy in rotation + # (not necessarily the same as before) + + @pytest.mark.asyncio + async def test_release_nonexistent_session(self): + """Verify releasing non-existent session doesn't raise error.""" + strategy = RoundRobinProxyStrategy(self.proxies) + + # Should not raise + await strategy.release_session("nonexistent-session") + + @pytest.mark.asyncio + async def test_release_twice(self): + """Verify releasing session twice doesn't raise error.""" + strategy = RoundRobinProxyStrategy(self.proxies) + + await strategy.get_proxy_for_session("session-1") + await strategy.release_session("session-1") + await strategy.release_session("session-1") # Should not raise + + # ==================== GET SESSION PROXY TESTS ==================== + + @pytest.mark.asyncio + async def test_get_session_proxy_existing(self): + """Verify get_session_proxy returns proxy for existing session.""" + strategy = RoundRobinProxyStrategy(self.proxies) + + acquired = await strategy.get_proxy_for_session("session-1") + retrieved = strategy.get_session_proxy("session-1") + + assert retrieved is not None + assert acquired.server == retrieved.server + + def test_get_session_proxy_nonexistent(self): + """Verify get_session_proxy returns None for non-existent session.""" + strategy = RoundRobinProxyStrategy(self.proxies) + + result = strategy.get_session_proxy("nonexistent-session") + assert result is None + + # ==================== TTL EXPIRATION TESTS ==================== + + @pytest.mark.asyncio + async def test_session_ttl_not_expired(self): + """Verify session returns same proxy when TTL not expired.""" + strategy = RoundRobinProxyStrategy(self.proxies) + + # Acquire with 10 second TTL + proxy1 = await strategy.get_proxy_for_session("session-1", ttl=10) + + # Immediately request again - should return same proxy + proxy2 = await strategy.get_proxy_for_session("session-1", ttl=10) + + assert proxy1.server == proxy2.server + + @pytest.mark.asyncio + async def test_session_ttl_expired(self): + """Verify new proxy acquired after TTL expires.""" + strategy = RoundRobinProxyStrategy(self.proxies) + + # Acquire with 1 second TTL + proxy1 = await strategy.get_proxy_for_session("session-1", ttl=1) + + # Wait for TTL to expire + await asyncio.sleep(1.1) + + # Request again - should get new proxy due to expiration + proxy2 = await strategy.get_proxy_for_session("session-1", ttl=1) + + # May or may not be same server depending on round-robin state, + # but session should have been recreated + assert proxy2 is not None + + @pytest.mark.asyncio + async def test_get_session_proxy_ttl_expired(self): + """Verify get_session_proxy returns None after TTL expires.""" + strategy = RoundRobinProxyStrategy(self.proxies) + + await strategy.get_proxy_for_session("session-1", ttl=1) + + # Wait for expiration + await asyncio.sleep(1.1) + + # Should return None for expired session + result = strategy.get_session_proxy("session-1") + assert result is None + + @pytest.mark.asyncio + async def test_cleanup_expired_sessions(self): + """Verify cleanup_expired_sessions removes expired sessions.""" + strategy = RoundRobinProxyStrategy(self.proxies) + + # Create sessions with short TTL + await strategy.get_proxy_for_session("short-ttl-1", ttl=1) + await strategy.get_proxy_for_session("short-ttl-2", ttl=1) + # Create session without TTL (should not be cleaned up) + await strategy.get_proxy_for_session("no-ttl") + + # Wait for TTL to expire + await asyncio.sleep(1.1) + + # Cleanup + removed = await strategy.cleanup_expired_sessions() + + assert removed == 2 + assert strategy.get_session_proxy("short-ttl-1") is None + assert strategy.get_session_proxy("short-ttl-2") is None + assert strategy.get_session_proxy("no-ttl") is not None + + # ==================== GET ACTIVE SESSIONS TESTS ==================== + + @pytest.mark.asyncio + async def test_get_active_sessions(self): + """Verify get_active_sessions returns all active sessions.""" + strategy = RoundRobinProxyStrategy(self.proxies) + + await strategy.get_proxy_for_session("session-a") + await strategy.get_proxy_for_session("session-b") + await strategy.get_proxy_for_session("session-c") + + active = strategy.get_active_sessions() + + assert len(active) == 3 + assert "session-a" in active + assert "session-b" in active + assert "session-c" in active + + @pytest.mark.asyncio + async def test_get_active_sessions_excludes_expired(self): + """Verify get_active_sessions excludes expired sessions.""" + strategy = RoundRobinProxyStrategy(self.proxies) + + await strategy.get_proxy_for_session("short-ttl", ttl=1) + await strategy.get_proxy_for_session("no-ttl") + + # Before expiration + active = strategy.get_active_sessions() + assert len(active) == 2 + + # Wait for TTL to expire + await asyncio.sleep(1.1) + + # After expiration + active = strategy.get_active_sessions() + assert len(active) == 1 + assert "no-ttl" in active + assert "short-ttl" not in active + + # ==================== THREAD SAFETY TESTS ==================== + + @pytest.mark.asyncio + async def test_concurrent_session_access(self): + """Verify thread-safe access to sessions.""" + strategy = RoundRobinProxyStrategy(self.proxies) + + async def acquire_session(session_id: str): + proxy = await strategy.get_proxy_for_session(session_id) + await asyncio.sleep(0.01) # Simulate work + return proxy.server + + # Acquire same session from multiple coroutines + results = await asyncio.gather(*[ + acquire_session("shared-session") for _ in range(10) + ]) + + # All should get same proxy + assert len(set(results)) == 1 + + @pytest.mark.asyncio + async def test_concurrent_different_sessions(self): + """Verify concurrent acquisition of different sessions works correctly.""" + strategy = RoundRobinProxyStrategy(self.proxies) + + async def acquire_session(session_id: str): + proxy = await strategy.get_proxy_for_session(session_id) + await asyncio.sleep(0.01) + return (session_id, proxy.server) + + # Acquire different sessions concurrently + results = await asyncio.gather(*[ + acquire_session(f"session-{i}") for i in range(5) + ]) + + # Each session should have a consistent proxy + session_proxies = dict(results) + assert len(session_proxies) == 5 + + # Verify each session still returns same proxy + for session_id, expected_server in session_proxies.items(): + actual = await strategy.get_proxy_for_session(session_id) + assert actual.server == expected_server + + @pytest.mark.asyncio + async def test_concurrent_session_acquire_and_release(self): + """Verify concurrent acquire and release operations work correctly.""" + strategy = RoundRobinProxyStrategy(self.proxies) + + async def acquire_and_release(session_id: str): + proxy = await strategy.get_proxy_for_session(session_id) + await asyncio.sleep(0.01) + await strategy.release_session(session_id) + return proxy.server + + # Run multiple acquire/release cycles concurrently + await asyncio.gather(*[ + acquire_and_release(f"session-{i}") for i in range(10) + ]) + + # All sessions should be released + active = strategy.get_active_sessions() + assert len(active) == 0 + + # ==================== EMPTY PROXY POOL TESTS ==================== + + @pytest.mark.asyncio + async def test_empty_proxy_pool_session(self): + """Verify behavior with empty proxy pool.""" + strategy = RoundRobinProxyStrategy() # No proxies + + result = await strategy.get_proxy_for_session("session-1") + assert result is None + + @pytest.mark.asyncio + async def test_add_proxies_after_session(self): + """Verify adding proxies after session creation works.""" + strategy = RoundRobinProxyStrategy() + + # No proxies initially + result1 = await strategy.get_proxy_for_session("session-1") + assert result1 is None + + # Add proxies + strategy.add_proxies(self.proxies) + + # Now should work + result2 = await strategy.get_proxy_for_session("session-2") + assert result2 is not None + + +class TestCrawlerRunConfigSession: + """Test CrawlerRunConfig with sticky session parameters.""" + + def test_config_has_session_fields(self): + """Verify CrawlerRunConfig has sticky session fields.""" + config = CrawlerRunConfig( + proxy_session_id="test-session", + proxy_session_ttl=300, + proxy_session_auto_release=True + ) + + assert config.proxy_session_id == "test-session" + assert config.proxy_session_ttl == 300 + assert config.proxy_session_auto_release is True + + def test_config_session_defaults(self): + """Verify default values for session fields.""" + config = CrawlerRunConfig() + + assert config.proxy_session_id is None + assert config.proxy_session_ttl is None + assert config.proxy_session_auto_release is False + + +class TestCrawlerStickySessionIntegration: + """Integration tests for AsyncWebCrawler with sticky sessions.""" + + def setup_method(self): + """Setup for each test method.""" + self.proxies = [ + ProxyConfig(server=f"http://proxy{i}.test:8080") + for i in range(3) + ] + self.test_url = "https://httpbin.org/ip" + + @pytest.mark.asyncio + async def test_crawler_sticky_session_without_proxy(self): + """Test that crawler works when proxy_session_id set but no strategy.""" + browser_config = BrowserConfig(headless=True) + + config = CrawlerRunConfig( + cache_mode=CacheMode.BYPASS, + proxy_session_id="test-session", + page_timeout=15000 + ) + + async with AsyncWebCrawler(config=browser_config) as crawler: + result = await crawler.arun(url=self.test_url, config=config) + # Should work without errors (no proxy strategy means no proxy) + assert result is not None + + @pytest.mark.asyncio + async def test_crawler_sticky_session_basic(self): + """Test basic sticky session with crawler.""" + strategy = RoundRobinProxyStrategy(self.proxies) + + config = CrawlerRunConfig( + cache_mode=CacheMode.BYPASS, + proxy_rotation_strategy=strategy, + proxy_session_id="integration-test", + page_timeout=10000 + ) + + browser_config = BrowserConfig(headless=True) + + async with AsyncWebCrawler(config=browser_config) as crawler: + # First request + try: + result1 = await crawler.arun(url=self.test_url, config=config) + except Exception: + pass # Proxy connection may fail, but session should be tracked + + # Verify session was created + session_proxy = strategy.get_session_proxy("integration-test") + assert session_proxy is not None + + # Cleanup + await strategy.release_session("integration-test") + + @pytest.mark.asyncio + async def test_crawler_rotating_vs_sticky(self): + """Compare rotating behavior vs sticky session behavior.""" + strategy = RoundRobinProxyStrategy(self.proxies) + + # Config WITHOUT sticky session - should rotate + rotating_config = CrawlerRunConfig( + cache_mode=CacheMode.BYPASS, + proxy_rotation_strategy=strategy, + page_timeout=5000 + ) + + # Config WITH sticky session - should use same proxy + sticky_config = CrawlerRunConfig( + cache_mode=CacheMode.BYPASS, + proxy_rotation_strategy=strategy, + proxy_session_id="sticky-test", + page_timeout=5000 + ) + + browser_config = BrowserConfig(headless=True) + + async with AsyncWebCrawler(config=browser_config) as crawler: + # Track proxy configs used + rotating_proxies = [] + sticky_proxies = [] + + # Try rotating requests (may fail due to test proxies, but config should be set) + for _ in range(3): + try: + await crawler.arun(url=self.test_url, config=rotating_config) + except Exception: + pass + rotating_proxies.append(rotating_config.proxy_config.server if rotating_config.proxy_config else None) + + # Try sticky requests + for _ in range(3): + try: + await crawler.arun(url=self.test_url, config=sticky_config) + except Exception: + pass + sticky_proxies.append(sticky_config.proxy_config.server if sticky_config.proxy_config else None) + + # Rotating should have different proxies (or cycle through them) + # Sticky should have same proxy for all requests + if all(sticky_proxies): + assert len(set(sticky_proxies)) == 1, "Sticky session should use same proxy" + + await strategy.release_session("sticky-test") + + +class TestStickySessionRealWorld: + """Real-world scenario tests for sticky sessions. + + Note: These tests require actual proxy servers to verify IP consistency. + They are marked to be skipped if no proxy is configured. + """ + + @pytest.mark.asyncio + @pytest.mark.skipif( + not os.environ.get('TEST_PROXY_1'), + reason="Requires TEST_PROXY_1 environment variable" + ) + async def test_verify_ip_consistency(self): + """Verify that sticky session actually uses same IP. + + This test requires real proxies set in environment variables: + TEST_PROXY_1=ip:port:user:pass + TEST_PROXY_2=ip:port:user:pass + """ + import re + + # Load proxies from environment + proxy_strs = [ + os.environ.get('TEST_PROXY_1', ''), + os.environ.get('TEST_PROXY_2', '') + ] + proxies = [ProxyConfig.from_string(p) for p in proxy_strs if p] + + if len(proxies) < 2: + pytest.skip("Need at least 2 proxies for this test") + + strategy = RoundRobinProxyStrategy(proxies) + + # Config WITH sticky session + config = CrawlerRunConfig( + cache_mode=CacheMode.BYPASS, + proxy_rotation_strategy=strategy, + proxy_session_id="ip-verify-session", + page_timeout=30000 + ) + + browser_config = BrowserConfig(headless=True) + + async with AsyncWebCrawler(config=browser_config) as crawler: + ips = [] + + for i in range(3): + result = await crawler.arun( + url="https://httpbin.org/ip", + config=config + ) + + if result and result.success and result.html: + # Extract IP from response + ip_match = re.search(r'"origin":\s*"([^"]+)"', result.html) + if ip_match: + ips.append(ip_match.group(1)) + + await strategy.release_session("ip-verify-session") + + # All IPs should be same for sticky session + if len(ips) >= 2: + assert len(set(ips)) == 1, f"Expected same IP, got: {ips}" + + +# ==================== STANDALONE TEST FUNCTIONS ==================== + +@pytest.mark.asyncio +async def test_sticky_session_simple(): + """Simple test for sticky session functionality.""" + proxies = [ + ProxyConfig(server=f"http://proxy{i}.test:8080") + for i in range(3) + ] + strategy = RoundRobinProxyStrategy(proxies) + + # Same session should return same proxy + p1 = await strategy.get_proxy_for_session("test") + p2 = await strategy.get_proxy_for_session("test") + p3 = await strategy.get_proxy_for_session("test") + + assert p1.server == p2.server == p3.server + print(f"Sticky session works! All requests use: {p1.server}") + + # Cleanup + await strategy.release_session("test") + + +if __name__ == "__main__": + print("Running Sticky Session tests...") + print("=" * 50) + + asyncio.run(test_sticky_session_simple()) + + print("\n" + "=" * 50) + print("To run the full pytest suite, use: pytest " + __file__) + print("=" * 50)