diff --git a/crawl4ai/browser_manager.py b/crawl4ai/browser_manager.py index 6a26b069..1f84bdc3 100644 --- a/crawl4ai/browser_manager.py +++ b/crawl4ai/browser_manager.py @@ -575,7 +575,20 @@ class BrowserManager: """ _playwright_instance = None - + + # Class-level tracking of pages in use, keyed by browser endpoint (CDP URL or instance id) + # This ensures multiple BrowserManager instances connecting to the same browser + # share the same page tracking, preventing race conditions. + _global_pages_in_use: dict = {} # endpoint_key -> set of pages + _global_pages_lock: asyncio.Lock = None # Initialized lazily + + @classmethod + def _get_global_lock(cls) -> asyncio.Lock: + """Get or create the global pages lock (lazy initialization for async context).""" + if cls._global_pages_lock is None: + cls._global_pages_lock = asyncio.Lock() + return cls._global_pages_lock + @classmethod async def get_playwright(cls, use_undetected: bool = False): if use_undetected: @@ -617,9 +630,8 @@ class BrowserManager: # for all racers). Prevents 'Target page/context closed' errors. self._page_lock = asyncio.Lock() - # Track pages currently in use by crawl operations to prevent - # concurrent crawls from reusing the same page (race condition fix) - self._pages_in_use = set() + # Browser endpoint key for global page tracking (set after browser starts) + self._browser_endpoint_key: Optional[str] = None # Stealth adapter for stealth mode self._stealth_adapter = None @@ -720,6 +732,77 @@ class BrowserManager: self.default_context = self.browser + # Set the browser endpoint key for global page tracking + self._browser_endpoint_key = self._compute_browser_endpoint_key() + # Initialize global tracking set for this endpoint if needed + if self._browser_endpoint_key not in BrowserManager._global_pages_in_use: + BrowserManager._global_pages_in_use[self._browser_endpoint_key] = set() + + def _compute_browser_endpoint_key(self) -> str: + """ + Compute a unique key identifying this browser connection. + + For CDP connections, uses the normalized CDP URL so all BrowserManager + instances connecting to the same browser share page tracking. + For standalone browsers, uses instance id since each is independent. + + Returns: + str: Unique identifier for this browser connection + """ + # For CDP connections, use the CDP URL as the key (normalized) + if self.config.cdp_url: + return self._normalize_cdp_url(self.config.cdp_url) + + # For managed browsers, use the CDP URL/port that was assigned + if self.managed_browser: + # Use debugging port as the key since it uniquely identifies the browser + port = getattr(self.managed_browser, 'debugging_port', None) + host = getattr(self.managed_browser, 'host', 'localhost') + if port: + return f"cdp:http://{host}:{port}" + + # For standalone browsers, use instance id (no sharing needed) + return f"instance:{id(self)}" + + def _normalize_cdp_url(self, cdp_url: str) -> str: + """ + Normalize a CDP URL to a canonical form for consistent tracking. + + Handles various formats: + - http://localhost:9222 + - ws://localhost:9222/devtools/browser/xxx + - http://localhost:9222?browser_id=xxx + + Returns: + str: Normalized CDP key in format "cdp:http://host:port" + """ + from urllib.parse import urlparse + + parsed = urlparse(cdp_url) + host = parsed.hostname or 'localhost' + port = parsed.port or 9222 + + return f"cdp:http://{host}:{port}" + + def _get_pages_in_use(self) -> set: + """Get the set of pages currently in use for this browser.""" + if self._browser_endpoint_key and self._browser_endpoint_key in BrowserManager._global_pages_in_use: + return BrowserManager._global_pages_in_use[self._browser_endpoint_key] + # Fallback: shouldn't happen, but return empty set + return set() + + def _mark_page_in_use(self, page) -> None: + """Mark a page as in use.""" + if self._browser_endpoint_key: + if self._browser_endpoint_key not in BrowserManager._global_pages_in_use: + BrowserManager._global_pages_in_use[self._browser_endpoint_key] = set() + BrowserManager._global_pages_in_use[self._browser_endpoint_key].add(page) + + def _release_page_from_use(self, page) -> None: + """Release a page from the in-use tracking.""" + if self._browser_endpoint_key and self._browser_endpoint_key in BrowserManager._global_pages_in_use: + BrowserManager._global_pages_in_use[self._browser_endpoint_key].discard(page) + async def _verify_cdp_ready(self, cdp_url: str) -> bool: """Verify CDP endpoint is ready with exponential backoff. @@ -1228,29 +1311,41 @@ class BrowserManager: if not page: async with self._page_lock: page = await context.new_page() - self._pages_in_use.add(page) + self._mark_page_in_use(page) await self._apply_stealth_to_page(page) else: # Mark pre-existing target as in use - self._pages_in_use.add(page) + self._mark_page_in_use(page) else: - # Use lock to safely check for available pages and track usage - # This prevents race conditions when multiple crawls run concurrently - async with self._page_lock: - pages = context.pages - # Find first available page (exists and not currently in use) - available_page = next( - (p for p in pages if p not in self._pages_in_use), - None - ) - if available_page: - page = available_page - else: - # No available pages - create a new one + # For CDP connections (external browser), multiple Playwright connections + # create separate browser/context objects. Page reuse across connections + # isn't reliable because each connection sees different page objects. + # Always create new pages for CDP to avoid cross-connection race conditions. + if self.config.cdp_url and not self.config.use_managed_browser: + async with self._page_lock: page = await context.new_page() - await self._apply_stealth_to_page(page) - # Mark page as in use - self._pages_in_use.add(page) + self._mark_page_in_use(page) + await self._apply_stealth_to_page(page) + else: + # For managed browsers (single process), page reuse is safe. + # Use lock to safely check for available pages and track usage. + # This prevents race conditions when multiple crawls run concurrently. + async with BrowserManager._get_global_lock(): + pages = context.pages + pages_in_use = self._get_pages_in_use() + # Find first available page (exists and not currently in use) + available_page = next( + (p for p in pages if p not in pages_in_use), + None + ) + if available_page: + page = available_page + else: + # No available pages - create a new one + page = await context.new_page() + await self._apply_stealth_to_page(page) + # Mark page as in use (global tracking) + self._mark_page_in_use(page) else: # Otherwise, check if we have an existing context for this config config_signature = self._make_config_signature(crawlerRunConfig) @@ -1283,7 +1378,7 @@ class BrowserManager: """ if session_id in self.sessions: context, page, _ = self.sessions[session_id] - self._pages_in_use.discard(page) + self._release_page_from_use(page) await page.close() if not self.config.use_managed_browser: await context.close() @@ -1291,7 +1386,7 @@ class BrowserManager: def release_page(self, page): """ - Release a page from the in-use tracking set. + Release a page from the in-use tracking set (global tracking). This should be called when a crawl operation completes to allow the page to be reused by subsequent crawls. @@ -1299,7 +1394,7 @@ class BrowserManager: Args: page: The Playwright page to release. """ - self._pages_in_use.discard(page) + self._release_page_from_use(page) def _cleanup_expired_sessions(self): """Clean up expired sessions based on TTL.""" diff --git a/tests/browser/test_page_reuse_race_condition.py b/tests/browser/test_page_reuse_race_condition.py index 6cc194f9..10b14f6c 100644 --- a/tests/browser/test_page_reuse_race_condition.py +++ b/tests/browser/test_page_reuse_race_condition.py @@ -232,7 +232,7 @@ async def test_high_concurrency_stress(): async def test_page_tracking_internal_state(): """ Test 5: Verify internal page tracking state is correct. - This directly tests the _pages_in_use tracking mechanism. + This directly tests the global page tracking mechanism. """ print("\n" + "="*70) print("TEST 5: Internal page tracking state verification") @@ -248,8 +248,13 @@ async def test_page_tracking_internal_state(): async with AsyncWebCrawler(config=browser_config) as crawler: browser_manager = crawler.crawler_strategy.browser_manager + # Check endpoint key is set + endpoint_key = browser_manager._browser_endpoint_key + print(f" Browser endpoint key: {endpoint_key}") + assert endpoint_key, "Endpoint key should be set" + # Initially, no pages should be in use - initial_in_use = len(browser_manager._pages_in_use) + initial_in_use = len(browser_manager._get_pages_in_use()) print(f" Initial pages in use: {initial_in_use}") # Do a crawl @@ -257,7 +262,7 @@ async def test_page_tracking_internal_state(): assert result.success, f"Crawl failed: {result.error_message}" # After crawl completes, page should be released - after_crawl_in_use = len(browser_manager._pages_in_use) + after_crawl_in_use = len(browser_manager._get_pages_in_use()) print(f" Pages in use after crawl: {after_crawl_in_use}") # The page should have been released (or kept as the last page) @@ -267,7 +272,7 @@ async def test_page_tracking_internal_state(): result2 = await crawler.arun("https://example.org") assert result2.success, f"Second crawl failed: {result2.error_message}" - final_in_use = len(browser_manager._pages_in_use) + final_in_use = len(browser_manager._get_pages_in_use()) print(f" Pages in use after second crawl: {final_in_use}") print(" PASSED: Page tracking state is consistent") @@ -411,6 +416,143 @@ async def test_compare_isolated_vs_shared_context(): return True +async def test_multiple_crawlers_same_cdp(): + """ + Test 8: Multiple AsyncWebCrawler instances connecting to the same CDP endpoint. + + This tests the realistic scenario where: + 1. A browser is started externally (or by a managed browser) + 2. Multiple crawler instances connect to it via CDP URL + 3. All use create_isolated_context=False to share cookies/session + 4. Each should get its own page to avoid race conditions + """ + print("\n" + "="*70) + print("TEST 8: Multiple crawlers connecting to same CDP endpoint") + print("="*70) + + import subprocess + import tempfile + + # Start a browser manually using subprocess + port = 9444 + temp_dir = tempfile.mkdtemp(prefix="browser-test-") + + browser_process = None + try: + # Start chromium with remote debugging - use Playwright's bundled chromium + import os + playwright_path = os.path.expanduser("~/.cache/ms-playwright/chromium-1200/chrome-linux64/chrome") + if not os.path.exists(playwright_path): + # Fallback - try to find it + for path in [ + "/usr/bin/chromium", + "/usr/bin/chromium-browser", + "/usr/bin/google-chrome", + ]: + if os.path.exists(path): + playwright_path = path + break + chrome_path = playwright_path + + cmd = [ + chrome_path, + f"--remote-debugging-port={port}", + f"--user-data-dir={temp_dir}", + "--headless=new", + "--no-sandbox", + "--disable-gpu", + "--disable-dev-shm-usage", + ] + + browser_process = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + await asyncio.sleep(2) # Wait for browser to start + + cdp_url = f"http://localhost:{port}" + print(f" Started browser at {cdp_url}") + + # Both crawlers connect via CDP URL + browser_config1 = BrowserConfig( + headless=True, + cdp_url=cdp_url, + create_isolated_context=False, + ) + browser_config2 = BrowserConfig( + headless=True, + cdp_url=cdp_url, + create_isolated_context=False, + ) + + urls_crawler1 = [ + "https://example.com?crawler=1", + "https://example.org?crawler=1", + ] + urls_crawler2 = [ + "https://httpbin.org/html?crawler=2", + "https://httpbin.org/get?crawler=2", + ] + + async with AsyncWebCrawler(config=browser_config1) as crawler1: + async with AsyncWebCrawler(config=browser_config2) as crawler2: + bm1 = crawler1.crawler_strategy.browser_manager + bm2 = crawler2.crawler_strategy.browser_manager + + print(f" Crawler 1 endpoint key: {bm1._browser_endpoint_key}") + print(f" Crawler 2 endpoint key: {bm2._browser_endpoint_key}") + print(f" Keys match: {bm1._browser_endpoint_key == bm2._browser_endpoint_key}") + + # Launch concurrent crawls from BOTH crawlers simultaneously + print(f" Launching {len(urls_crawler1) + len(urls_crawler2)} concurrent crawls...") + + tasks1 = [crawler1.arun(url) for url in urls_crawler1] + tasks2 = [crawler2.arun(url) for url in urls_crawler2] + + all_results = await asyncio.gather( + *tasks1, *tasks2, + return_exceptions=True + ) + + # Check results + success_count = 0 + for i, result in enumerate(all_results): + crawler_id = 1 if i < len(urls_crawler1) else 2 + url_idx = i if i < len(urls_crawler1) else i - len(urls_crawler1) + + if isinstance(result, Exception): + print(f" Crawler {crawler_id}, URL {url_idx+1}: EXCEPTION - {result}") + elif result.success: + success_count += 1 + print(f" Crawler {crawler_id}, URL {url_idx+1}: OK") + else: + print(f" Crawler {crawler_id}, URL {url_idx+1}: FAILED - {result.error_message}") + + total = len(urls_crawler1) + len(urls_crawler2) + assert success_count == total, f"Only {success_count}/{total} succeeded" + + print(f" PASSED: All {total} concurrent crawls from 2 crawlers succeeded") + return True + + except Exception as e: + print(f" FAILED: {str(e)}") + import traceback + traceback.print_exc() + return False + + finally: + # Clean up browser process + if browser_process: + browser_process.terminate() + try: + browser_process.wait(timeout=5) + except: + browser_process.kill() + # Clean up temp dir + import shutil + try: + shutil.rmtree(temp_dir) + except: + pass + + async def run_all_tests(): """Run all tests and report results.""" print("\n" + "#"*70)