Improve page tracking with global CDP endpoint-based tracking
- Use class-level tracking keyed by normalized CDP URL - All BrowserManager instances connecting to same browser share tracking - For CDP connections, always create new pages (cross-connection page sharing isn't reliable in Playwright) - For managed browsers, page reuse works within same process - Normalize CDP URLs to handle different formats (http, ws, query params)
This commit is contained in:
@@ -575,7 +575,20 @@ class BrowserManager:
|
||||
"""
|
||||
|
||||
_playwright_instance = None
|
||||
|
||||
|
||||
# Class-level tracking of pages in use, keyed by browser endpoint (CDP URL or instance id)
|
||||
# This ensures multiple BrowserManager instances connecting to the same browser
|
||||
# share the same page tracking, preventing race conditions.
|
||||
_global_pages_in_use: dict = {} # endpoint_key -> set of pages
|
||||
_global_pages_lock: asyncio.Lock = None # Initialized lazily
|
||||
|
||||
@classmethod
|
||||
def _get_global_lock(cls) -> asyncio.Lock:
|
||||
"""Get or create the global pages lock (lazy initialization for async context)."""
|
||||
if cls._global_pages_lock is None:
|
||||
cls._global_pages_lock = asyncio.Lock()
|
||||
return cls._global_pages_lock
|
||||
|
||||
@classmethod
|
||||
async def get_playwright(cls, use_undetected: bool = False):
|
||||
if use_undetected:
|
||||
@@ -617,9 +630,8 @@ class BrowserManager:
|
||||
# for all racers). Prevents 'Target page/context closed' errors.
|
||||
self._page_lock = asyncio.Lock()
|
||||
|
||||
# Track pages currently in use by crawl operations to prevent
|
||||
# concurrent crawls from reusing the same page (race condition fix)
|
||||
self._pages_in_use = set()
|
||||
# Browser endpoint key for global page tracking (set after browser starts)
|
||||
self._browser_endpoint_key: Optional[str] = None
|
||||
|
||||
# Stealth adapter for stealth mode
|
||||
self._stealth_adapter = None
|
||||
@@ -720,6 +732,77 @@ class BrowserManager:
|
||||
|
||||
self.default_context = self.browser
|
||||
|
||||
# Set the browser endpoint key for global page tracking
|
||||
self._browser_endpoint_key = self._compute_browser_endpoint_key()
|
||||
# Initialize global tracking set for this endpoint if needed
|
||||
if self._browser_endpoint_key not in BrowserManager._global_pages_in_use:
|
||||
BrowserManager._global_pages_in_use[self._browser_endpoint_key] = set()
|
||||
|
||||
def _compute_browser_endpoint_key(self) -> str:
|
||||
"""
|
||||
Compute a unique key identifying this browser connection.
|
||||
|
||||
For CDP connections, uses the normalized CDP URL so all BrowserManager
|
||||
instances connecting to the same browser share page tracking.
|
||||
For standalone browsers, uses instance id since each is independent.
|
||||
|
||||
Returns:
|
||||
str: Unique identifier for this browser connection
|
||||
"""
|
||||
# For CDP connections, use the CDP URL as the key (normalized)
|
||||
if self.config.cdp_url:
|
||||
return self._normalize_cdp_url(self.config.cdp_url)
|
||||
|
||||
# For managed browsers, use the CDP URL/port that was assigned
|
||||
if self.managed_browser:
|
||||
# Use debugging port as the key since it uniquely identifies the browser
|
||||
port = getattr(self.managed_browser, 'debugging_port', None)
|
||||
host = getattr(self.managed_browser, 'host', 'localhost')
|
||||
if port:
|
||||
return f"cdp:http://{host}:{port}"
|
||||
|
||||
# For standalone browsers, use instance id (no sharing needed)
|
||||
return f"instance:{id(self)}"
|
||||
|
||||
def _normalize_cdp_url(self, cdp_url: str) -> str:
|
||||
"""
|
||||
Normalize a CDP URL to a canonical form for consistent tracking.
|
||||
|
||||
Handles various formats:
|
||||
- http://localhost:9222
|
||||
- ws://localhost:9222/devtools/browser/xxx
|
||||
- http://localhost:9222?browser_id=xxx
|
||||
|
||||
Returns:
|
||||
str: Normalized CDP key in format "cdp:http://host:port"
|
||||
"""
|
||||
from urllib.parse import urlparse
|
||||
|
||||
parsed = urlparse(cdp_url)
|
||||
host = parsed.hostname or 'localhost'
|
||||
port = parsed.port or 9222
|
||||
|
||||
return f"cdp:http://{host}:{port}"
|
||||
|
||||
def _get_pages_in_use(self) -> set:
|
||||
"""Get the set of pages currently in use for this browser."""
|
||||
if self._browser_endpoint_key and self._browser_endpoint_key in BrowserManager._global_pages_in_use:
|
||||
return BrowserManager._global_pages_in_use[self._browser_endpoint_key]
|
||||
# Fallback: shouldn't happen, but return empty set
|
||||
return set()
|
||||
|
||||
def _mark_page_in_use(self, page) -> None:
|
||||
"""Mark a page as in use."""
|
||||
if self._browser_endpoint_key:
|
||||
if self._browser_endpoint_key not in BrowserManager._global_pages_in_use:
|
||||
BrowserManager._global_pages_in_use[self._browser_endpoint_key] = set()
|
||||
BrowserManager._global_pages_in_use[self._browser_endpoint_key].add(page)
|
||||
|
||||
def _release_page_from_use(self, page) -> None:
|
||||
"""Release a page from the in-use tracking."""
|
||||
if self._browser_endpoint_key and self._browser_endpoint_key in BrowserManager._global_pages_in_use:
|
||||
BrowserManager._global_pages_in_use[self._browser_endpoint_key].discard(page)
|
||||
|
||||
async def _verify_cdp_ready(self, cdp_url: str) -> bool:
|
||||
"""Verify CDP endpoint is ready with exponential backoff.
|
||||
|
||||
@@ -1228,29 +1311,41 @@ class BrowserManager:
|
||||
if not page:
|
||||
async with self._page_lock:
|
||||
page = await context.new_page()
|
||||
self._pages_in_use.add(page)
|
||||
self._mark_page_in_use(page)
|
||||
await self._apply_stealth_to_page(page)
|
||||
else:
|
||||
# Mark pre-existing target as in use
|
||||
self._pages_in_use.add(page)
|
||||
self._mark_page_in_use(page)
|
||||
else:
|
||||
# Use lock to safely check for available pages and track usage
|
||||
# This prevents race conditions when multiple crawls run concurrently
|
||||
async with self._page_lock:
|
||||
pages = context.pages
|
||||
# Find first available page (exists and not currently in use)
|
||||
available_page = next(
|
||||
(p for p in pages if p not in self._pages_in_use),
|
||||
None
|
||||
)
|
||||
if available_page:
|
||||
page = available_page
|
||||
else:
|
||||
# No available pages - create a new one
|
||||
# For CDP connections (external browser), multiple Playwright connections
|
||||
# create separate browser/context objects. Page reuse across connections
|
||||
# isn't reliable because each connection sees different page objects.
|
||||
# Always create new pages for CDP to avoid cross-connection race conditions.
|
||||
if self.config.cdp_url and not self.config.use_managed_browser:
|
||||
async with self._page_lock:
|
||||
page = await context.new_page()
|
||||
await self._apply_stealth_to_page(page)
|
||||
# Mark page as in use
|
||||
self._pages_in_use.add(page)
|
||||
self._mark_page_in_use(page)
|
||||
await self._apply_stealth_to_page(page)
|
||||
else:
|
||||
# For managed browsers (single process), page reuse is safe.
|
||||
# Use lock to safely check for available pages and track usage.
|
||||
# This prevents race conditions when multiple crawls run concurrently.
|
||||
async with BrowserManager._get_global_lock():
|
||||
pages = context.pages
|
||||
pages_in_use = self._get_pages_in_use()
|
||||
# Find first available page (exists and not currently in use)
|
||||
available_page = next(
|
||||
(p for p in pages if p not in pages_in_use),
|
||||
None
|
||||
)
|
||||
if available_page:
|
||||
page = available_page
|
||||
else:
|
||||
# No available pages - create a new one
|
||||
page = await context.new_page()
|
||||
await self._apply_stealth_to_page(page)
|
||||
# Mark page as in use (global tracking)
|
||||
self._mark_page_in_use(page)
|
||||
else:
|
||||
# Otherwise, check if we have an existing context for this config
|
||||
config_signature = self._make_config_signature(crawlerRunConfig)
|
||||
@@ -1283,7 +1378,7 @@ class BrowserManager:
|
||||
"""
|
||||
if session_id in self.sessions:
|
||||
context, page, _ = self.sessions[session_id]
|
||||
self._pages_in_use.discard(page)
|
||||
self._release_page_from_use(page)
|
||||
await page.close()
|
||||
if not self.config.use_managed_browser:
|
||||
await context.close()
|
||||
@@ -1291,7 +1386,7 @@ class BrowserManager:
|
||||
|
||||
def release_page(self, page):
|
||||
"""
|
||||
Release a page from the in-use tracking set.
|
||||
Release a page from the in-use tracking set (global tracking).
|
||||
|
||||
This should be called when a crawl operation completes to allow
|
||||
the page to be reused by subsequent crawls.
|
||||
@@ -1299,7 +1394,7 @@ class BrowserManager:
|
||||
Args:
|
||||
page: The Playwright page to release.
|
||||
"""
|
||||
self._pages_in_use.discard(page)
|
||||
self._release_page_from_use(page)
|
||||
|
||||
def _cleanup_expired_sessions(self):
|
||||
"""Clean up expired sessions based on TTL."""
|
||||
|
||||
@@ -232,7 +232,7 @@ async def test_high_concurrency_stress():
|
||||
async def test_page_tracking_internal_state():
|
||||
"""
|
||||
Test 5: Verify internal page tracking state is correct.
|
||||
This directly tests the _pages_in_use tracking mechanism.
|
||||
This directly tests the global page tracking mechanism.
|
||||
"""
|
||||
print("\n" + "="*70)
|
||||
print("TEST 5: Internal page tracking state verification")
|
||||
@@ -248,8 +248,13 @@ async def test_page_tracking_internal_state():
|
||||
async with AsyncWebCrawler(config=browser_config) as crawler:
|
||||
browser_manager = crawler.crawler_strategy.browser_manager
|
||||
|
||||
# Check endpoint key is set
|
||||
endpoint_key = browser_manager._browser_endpoint_key
|
||||
print(f" Browser endpoint key: {endpoint_key}")
|
||||
assert endpoint_key, "Endpoint key should be set"
|
||||
|
||||
# Initially, no pages should be in use
|
||||
initial_in_use = len(browser_manager._pages_in_use)
|
||||
initial_in_use = len(browser_manager._get_pages_in_use())
|
||||
print(f" Initial pages in use: {initial_in_use}")
|
||||
|
||||
# Do a crawl
|
||||
@@ -257,7 +262,7 @@ async def test_page_tracking_internal_state():
|
||||
assert result.success, f"Crawl failed: {result.error_message}"
|
||||
|
||||
# After crawl completes, page should be released
|
||||
after_crawl_in_use = len(browser_manager._pages_in_use)
|
||||
after_crawl_in_use = len(browser_manager._get_pages_in_use())
|
||||
print(f" Pages in use after crawl: {after_crawl_in_use}")
|
||||
|
||||
# The page should have been released (or kept as the last page)
|
||||
@@ -267,7 +272,7 @@ async def test_page_tracking_internal_state():
|
||||
result2 = await crawler.arun("https://example.org")
|
||||
assert result2.success, f"Second crawl failed: {result2.error_message}"
|
||||
|
||||
final_in_use = len(browser_manager._pages_in_use)
|
||||
final_in_use = len(browser_manager._get_pages_in_use())
|
||||
print(f" Pages in use after second crawl: {final_in_use}")
|
||||
|
||||
print(" PASSED: Page tracking state is consistent")
|
||||
@@ -411,6 +416,143 @@ async def test_compare_isolated_vs_shared_context():
|
||||
return True
|
||||
|
||||
|
||||
async def test_multiple_crawlers_same_cdp():
|
||||
"""
|
||||
Test 8: Multiple AsyncWebCrawler instances connecting to the same CDP endpoint.
|
||||
|
||||
This tests the realistic scenario where:
|
||||
1. A browser is started externally (or by a managed browser)
|
||||
2. Multiple crawler instances connect to it via CDP URL
|
||||
3. All use create_isolated_context=False to share cookies/session
|
||||
4. Each should get its own page to avoid race conditions
|
||||
"""
|
||||
print("\n" + "="*70)
|
||||
print("TEST 8: Multiple crawlers connecting to same CDP endpoint")
|
||||
print("="*70)
|
||||
|
||||
import subprocess
|
||||
import tempfile
|
||||
|
||||
# Start a browser manually using subprocess
|
||||
port = 9444
|
||||
temp_dir = tempfile.mkdtemp(prefix="browser-test-")
|
||||
|
||||
browser_process = None
|
||||
try:
|
||||
# Start chromium with remote debugging - use Playwright's bundled chromium
|
||||
import os
|
||||
playwright_path = os.path.expanduser("~/.cache/ms-playwright/chromium-1200/chrome-linux64/chrome")
|
||||
if not os.path.exists(playwright_path):
|
||||
# Fallback - try to find it
|
||||
for path in [
|
||||
"/usr/bin/chromium",
|
||||
"/usr/bin/chromium-browser",
|
||||
"/usr/bin/google-chrome",
|
||||
]:
|
||||
if os.path.exists(path):
|
||||
playwright_path = path
|
||||
break
|
||||
chrome_path = playwright_path
|
||||
|
||||
cmd = [
|
||||
chrome_path,
|
||||
f"--remote-debugging-port={port}",
|
||||
f"--user-data-dir={temp_dir}",
|
||||
"--headless=new",
|
||||
"--no-sandbox",
|
||||
"--disable-gpu",
|
||||
"--disable-dev-shm-usage",
|
||||
]
|
||||
|
||||
browser_process = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
|
||||
await asyncio.sleep(2) # Wait for browser to start
|
||||
|
||||
cdp_url = f"http://localhost:{port}"
|
||||
print(f" Started browser at {cdp_url}")
|
||||
|
||||
# Both crawlers connect via CDP URL
|
||||
browser_config1 = BrowserConfig(
|
||||
headless=True,
|
||||
cdp_url=cdp_url,
|
||||
create_isolated_context=False,
|
||||
)
|
||||
browser_config2 = BrowserConfig(
|
||||
headless=True,
|
||||
cdp_url=cdp_url,
|
||||
create_isolated_context=False,
|
||||
)
|
||||
|
||||
urls_crawler1 = [
|
||||
"https://example.com?crawler=1",
|
||||
"https://example.org?crawler=1",
|
||||
]
|
||||
urls_crawler2 = [
|
||||
"https://httpbin.org/html?crawler=2",
|
||||
"https://httpbin.org/get?crawler=2",
|
||||
]
|
||||
|
||||
async with AsyncWebCrawler(config=browser_config1) as crawler1:
|
||||
async with AsyncWebCrawler(config=browser_config2) as crawler2:
|
||||
bm1 = crawler1.crawler_strategy.browser_manager
|
||||
bm2 = crawler2.crawler_strategy.browser_manager
|
||||
|
||||
print(f" Crawler 1 endpoint key: {bm1._browser_endpoint_key}")
|
||||
print(f" Crawler 2 endpoint key: {bm2._browser_endpoint_key}")
|
||||
print(f" Keys match: {bm1._browser_endpoint_key == bm2._browser_endpoint_key}")
|
||||
|
||||
# Launch concurrent crawls from BOTH crawlers simultaneously
|
||||
print(f" Launching {len(urls_crawler1) + len(urls_crawler2)} concurrent crawls...")
|
||||
|
||||
tasks1 = [crawler1.arun(url) for url in urls_crawler1]
|
||||
tasks2 = [crawler2.arun(url) for url in urls_crawler2]
|
||||
|
||||
all_results = await asyncio.gather(
|
||||
*tasks1, *tasks2,
|
||||
return_exceptions=True
|
||||
)
|
||||
|
||||
# Check results
|
||||
success_count = 0
|
||||
for i, result in enumerate(all_results):
|
||||
crawler_id = 1 if i < len(urls_crawler1) else 2
|
||||
url_idx = i if i < len(urls_crawler1) else i - len(urls_crawler1)
|
||||
|
||||
if isinstance(result, Exception):
|
||||
print(f" Crawler {crawler_id}, URL {url_idx+1}: EXCEPTION - {result}")
|
||||
elif result.success:
|
||||
success_count += 1
|
||||
print(f" Crawler {crawler_id}, URL {url_idx+1}: OK")
|
||||
else:
|
||||
print(f" Crawler {crawler_id}, URL {url_idx+1}: FAILED - {result.error_message}")
|
||||
|
||||
total = len(urls_crawler1) + len(urls_crawler2)
|
||||
assert success_count == total, f"Only {success_count}/{total} succeeded"
|
||||
|
||||
print(f" PASSED: All {total} concurrent crawls from 2 crawlers succeeded")
|
||||
return True
|
||||
|
||||
except Exception as e:
|
||||
print(f" FAILED: {str(e)}")
|
||||
import traceback
|
||||
traceback.print_exc()
|
||||
return False
|
||||
|
||||
finally:
|
||||
# Clean up browser process
|
||||
if browser_process:
|
||||
browser_process.terminate()
|
||||
try:
|
||||
browser_process.wait(timeout=5)
|
||||
except:
|
||||
browser_process.kill()
|
||||
# Clean up temp dir
|
||||
import shutil
|
||||
try:
|
||||
shutil.rmtree(temp_dir)
|
||||
except:
|
||||
pass
|
||||
|
||||
|
||||
async def run_all_tests():
|
||||
"""Run all tests and report results."""
|
||||
print("\n" + "#"*70)
|
||||
|
||||
Reference in New Issue
Block a user