refactor(browser): improve browser strategy architecture and lifecycle management

Major refactoring of browser strategy implementations to improve code organization and reliability:
- Move CrawlResultContainer and RunManyReturn types from async_webcrawler to models.py
- Simplify browser lifecycle management in AsyncWebCrawler
- Standardize browser strategy interface with _generate_page method
- Improve headless mode handling and browser args construction
- Clean up Docker and Playwright strategy implementations
- Fix session management and context handling across strategies

BREAKING CHANGE: Browser strategy interface has changed with new _generate_page method requirement
This commit is contained in:
UncleCode
2025-03-30 20:58:39 +08:00
parent 3ff7eec8f3
commit bb02398086
11 changed files with 271 additions and 459 deletions

View File

@@ -270,7 +270,7 @@ class BrowserConfig:
host: str = "localhost",
):
self.browser_type = browser_type
self.headless = headless
self.headless = headless and "new" or False
self.browser_mode = browser_mode
self.use_managed_browser = use_managed_browser
self.cdp_url = cdp_url

View File

@@ -4,18 +4,25 @@ import sys
import time
from colorama import Fore
from pathlib import Path
from typing import Optional, List, Generic, TypeVar
from typing import Optional, List
import json
import asyncio
# from contextlib import nullcontext, asynccontextmanager
from contextlib import asynccontextmanager
from .models import CrawlResult, MarkdownGenerationResult, DispatchResult, ScrapingResult
from .models import (
CrawlResult,
MarkdownGenerationResult,
DispatchResult,
ScrapingResult,
CrawlResultContainer,
RunManyReturn
)
from .async_database import async_db_manager
from .chunking_strategy import * # noqa: F403
from .chunking_strategy import IdentityChunking
from .content_filter_strategy import * # noqa: F403
from .extraction_strategy import * # noqa: F403
from .extraction_strategy import * # noqa: F403
from .extraction_strategy import NoExtractionStrategy
from .async_crawler_strategy import (
AsyncCrawlerStrategy,
@@ -30,7 +37,7 @@ from .markdown_generation_strategy import (
from .deep_crawling import DeepCrawlDecorator
from .async_logger import AsyncLogger, AsyncLoggerBase
from .async_configs import BrowserConfig, CrawlerRunConfig
from .async_dispatcher import * # noqa: F403
from .async_dispatcher import * # noqa: F403
from .async_dispatcher import BaseDispatcher, MemoryAdaptiveDispatcher, RateLimiter
from .utils import (
@@ -42,45 +49,6 @@ from .utils import (
RobotsParser,
)
from typing import Union, AsyncGenerator
CrawlResultT = TypeVar('CrawlResultT', bound=CrawlResult)
# RunManyReturn = Union[CrawlResultT, List[CrawlResultT], AsyncGenerator[CrawlResultT, None]]
class CrawlResultContainer(Generic[CrawlResultT]):
def __init__(self, results: Union[CrawlResultT, List[CrawlResultT]]):
# Normalize to a list
if isinstance(results, list):
self._results = results
else:
self._results = [results]
def __iter__(self):
return iter(self._results)
def __getitem__(self, index):
return self._results[index]
def __len__(self):
return len(self._results)
def __getattr__(self, attr):
# Delegate attribute access to the first element.
if self._results:
return getattr(self._results[0], attr)
raise AttributeError(f"{self.__class__.__name__} object has no attribute '{attr}'")
def __repr__(self):
return f"{self.__class__.__name__}({self._results!r})"
# Redefine the union type. Now synchronous calls always return a container,
# while stream mode is handled with an AsyncGenerator.
RunManyReturn = Union[
CrawlResultContainer[CrawlResultT],
AsyncGenerator[CrawlResultT, None]
]
class AsyncWebCrawler:
"""
@@ -193,45 +161,18 @@ class AsyncWebCrawler:
# Decorate arun method with deep crawling capabilities
self._deep_handler = DeepCrawlDecorator(self)
self.arun = self._deep_handler(self.arun)
self.arun = self._deep_handler(self.arun)
async def start(self):
"""
Start the crawler explicitly without using context manager.
This is equivalent to using 'async with' but gives more control over the lifecycle.
This method will:
1. Check for builtin browser if browser_mode is 'builtin'
2. Initialize the browser and context
3. Perform warmup sequence
4. Return the crawler instance for method chaining
Returns:
AsyncWebCrawler: The initialized crawler instance
"""
# Check for builtin browser if requested
if self.browser_config.browser_mode == "builtin" and not self.browser_config.cdp_url:
# Import here to avoid circular imports
from .browser_profiler import BrowserProfiler
profiler = BrowserProfiler(logger=self.logger)
# Get builtin browser info or launch if needed
browser_info = profiler.get_builtin_browser_info()
if not browser_info:
self.logger.info("Builtin browser not found, launching new instance...", tag="BROWSER")
cdp_url = await profiler.launch_builtin_browser()
if not cdp_url:
self.logger.warning("Failed to launch builtin browser, falling back to dedicated browser", tag="BROWSER")
else:
self.browser_config.cdp_url = cdp_url
self.browser_config.use_managed_browser = True
else:
self.logger.info(f"Using existing builtin browser at {browser_info.get('cdp_url')}", tag="BROWSER")
self.browser_config.cdp_url = browser_info.get('cdp_url')
self.browser_config.use_managed_browser = True
await self.crawler_strategy.__aenter__()
await self.awarmup()
self.logger.info(f"Crawl4AI {crawl4ai_version}", tag="INIT")
self.ready = True
return self
async def close(self):
@@ -251,18 +192,6 @@ class AsyncWebCrawler:
async def __aexit__(self, exc_type, exc_val, exc_tb):
await self.close()
async def awarmup(self):
"""
Initialize the crawler with warm-up sequence.
This method:
1. Logs initialization info
2. Sets up browser configuration
3. Marks the crawler as ready
"""
self.logger.info(f"Crawl4AI {crawl4ai_version}", tag="INIT")
self.ready = True
@asynccontextmanager
async def nullcontext(self):
"""异步空上下文管理器"""
@@ -305,7 +234,7 @@ class AsyncWebCrawler:
# Auto-start if not ready
if not self.ready:
await self.start()
config = config or CrawlerRunConfig()
if not isinstance(url, str) or not url:
raise ValueError("Invalid URL, make sure the URL is a non-empty string")
@@ -319,9 +248,7 @@ class AsyncWebCrawler:
config.cache_mode = CacheMode.ENABLED
# Create cache context
cache_context = CacheContext(
url, config.cache_mode, False
)
cache_context = CacheContext(url, config.cache_mode, False)
# Initialize processing variables
async_response: AsyncCrawlResponse = None
@@ -351,7 +278,7 @@ class AsyncWebCrawler:
# if config.screenshot and not screenshot or config.pdf and not pdf:
if config.screenshot and not screenshot_data:
cached_result = None
if config.pdf and not pdf_data:
cached_result = None
@@ -383,14 +310,18 @@ class AsyncWebCrawler:
# Check robots.txt if enabled
if config and config.check_robots_txt:
if not await self.robots_parser.can_fetch(url, self.browser_config.user_agent):
if not await self.robots_parser.can_fetch(
url, self.browser_config.user_agent
):
return CrawlResult(
url=url,
html="",
success=False,
status_code=403,
error_message="Access denied by robots.txt",
response_headers={"X-Robots-Status": "Blocked by robots.txt"}
response_headers={
"X-Robots-Status": "Blocked by robots.txt"
},
)
##############################
@@ -417,7 +348,7 @@ class AsyncWebCrawler:
###############################################################
# Process the HTML content, Call CrawlerStrategy.process_html #
###############################################################
crawl_result : CrawlResult = await self.aprocess_html(
crawl_result: CrawlResult = await self.aprocess_html(
url=url,
html=html,
extracted_content=extracted_content,
@@ -494,7 +425,7 @@ class AsyncWebCrawler:
tag="ERROR",
)
return CrawlResultContainer(
return CrawlResultContainer(
CrawlResult(
url=url, html="", success=False, error_message=error_message
)
@@ -539,15 +470,14 @@ class AsyncWebCrawler:
# Process HTML content
params = config.__dict__.copy()
params.pop("url", None)
params.pop("url", None)
# add keys from kwargs to params that doesn't exist in params
params.update({k: v for k, v in kwargs.items() if k not in params.keys()})
################################
# Scraping Strategy Execution #
################################
result : ScrapingResult = scraping_strategy.scrap(url, html, **params)
result: ScrapingResult = scraping_strategy.scrap(url, html, **params)
if result is None:
raise ValueError(
@@ -596,7 +526,10 @@ class AsyncWebCrawler:
self.logger.info(
message="{url:.50}... | Time: {timing}s",
tag="SCRAPE",
params={"url": _url, "timing": int((time.perf_counter() - t1) * 1000) / 1000},
params={
"url": _url,
"timing": int((time.perf_counter() - t1) * 1000) / 1000,
},
)
################################
@@ -671,7 +604,7 @@ class AsyncWebCrawler:
async def arun_many(
self,
urls: List[str],
config: Optional[CrawlerRunConfig] = None,
config: Optional[CrawlerRunConfig] = None,
dispatcher: Optional[BaseDispatcher] = None,
# Legacy parameters maintained for backwards compatibility
# word_count_threshold=MIN_WORD_THRESHOLD,
@@ -685,8 +618,8 @@ class AsyncWebCrawler:
# pdf: bool = False,
# user_agent: str = None,
# verbose=True,
**kwargs
) -> RunManyReturn:
**kwargs,
) -> RunManyReturn:
"""
Runs the crawler for multiple URLs concurrently using a configurable dispatcher strategy.
@@ -742,37 +675,32 @@ class AsyncWebCrawler:
def transform_result(task_result):
return (
setattr(task_result.result, 'dispatch_result',
DispatchResult(
task_id=task_result.task_id,
memory_usage=task_result.memory_usage,
peak_memory=task_result.peak_memory,
start_time=task_result.start_time,
end_time=task_result.end_time,
error_message=task_result.error_message,
)
) or task_result.result
setattr(
task_result.result,
"dispatch_result",
DispatchResult(
task_id=task_result.task_id,
memory_usage=task_result.memory_usage,
peak_memory=task_result.peak_memory,
start_time=task_result.start_time,
end_time=task_result.end_time,
error_message=task_result.error_message,
),
)
or task_result.result
)
stream = config.stream
if stream:
async def result_transformer():
async for task_result in dispatcher.run_urls_stream(crawler=self, urls=urls, config=config):
async for task_result in dispatcher.run_urls_stream(
crawler=self, urls=urls, config=config
):
yield transform_result(task_result)
return result_transformer()
else:
_results = await dispatcher.run_urls(crawler=self, urls=urls, config=config)
return [transform_result(res) for res in _results]
async def aclear_cache(self):
"""Clear the cache database."""
await async_db_manager.cleanup()
async def aflush_cache(self):
"""Flush the cache database."""
await async_db_manager.aflush_db()
async def aget_cache_size(self):
"""Get the total number of cached items."""
return await async_db_manager.aget_total_count()
return [transform_result(res) for res in _results]

View File

@@ -50,7 +50,7 @@ class BrowserManager:
self.logger = logger
# Create strategy based on configuration
self._strategy = self._create_strategy()
self.strategy = self._create_strategy()
# Initialize state variables for compatibility with existing code
self.browser = None
@@ -92,23 +92,23 @@ class BrowserManager:
self: For method chaining
"""
# Start the strategy
await self._strategy.start()
await self.strategy.start()
# Update legacy references
self.browser = self._strategy.browser
self.default_context = self._strategy.default_context
self.browser = self.strategy.browser
self.default_context = self.strategy.default_context
# Set browser process reference (for CDP strategy)
if hasattr(self._strategy, 'browser_process'):
self.managed_browser = self._strategy
if hasattr(self.strategy, 'browser_process'):
self.managed_browser = self.strategy
# Set Playwright reference
self.playwright = self._strategy.playwright
self.playwright = self.strategy.playwright
# Sync sessions if needed
if hasattr(self._strategy, 'sessions'):
self.sessions = self._strategy.sessions
self.session_ttl = self._strategy.session_ttl
if hasattr(self.strategy, 'sessions'):
self.sessions = self.strategy.sessions
self.session_ttl = self.strategy.session_ttl
return self
@@ -122,11 +122,11 @@ class BrowserManager:
Tuple of (Page, BrowserContext)
"""
# Delegate to strategy
page, context = await self._strategy.get_page(crawlerRunConfig)
page, context = await self.strategy.get_page(crawlerRunConfig)
# Sync sessions if needed
if hasattr(self._strategy, 'sessions'):
self.sessions = self._strategy.sessions
if hasattr(self.strategy, 'sessions'):
self.sessions = self.strategy.sessions
return page, context
@@ -144,14 +144,15 @@ class BrowserManager:
List of (Page, Context) tuples
"""
# Delegate to strategy
pages = await self._strategy.get_pages(crawlerRunConfig, count)
pages = await self.strategy.get_pages(crawlerRunConfig, count)
# Sync sessions if needed
if hasattr(self._strategy, 'sessions'):
self.sessions = self._strategy.sessions
if hasattr(self.strategy, 'sessions'):
self.sessions = self.strategy.sessions
return pages
# Just for legacy compatibility
async def kill_session(self, session_id: str):
"""Kill a browser session and clean up resources.
@@ -159,33 +160,16 @@ class BrowserManager:
session_id: The session ID to kill
"""
# Handle kill_session via our strategy if it supports it
await self._strategy.kill_session(session_id)
await self.strategy.kill_session(session_id)
# sync sessions if needed
if hasattr(self._strategy, 'sessions'):
self.sessions = self._strategy.sessions
def _cleanup_expired_sessions(self):
"""Clean up expired sessions based on TTL."""
# Use strategy's implementation if available
if hasattr(self._strategy, '_cleanup_expired_sessions'):
self._strategy._cleanup_expired_sessions()
return
# Otherwise use our own implementation
current_time = time.time()
expired_sessions = [
sid
for sid, (_, _, last_used) in self.sessions.items()
if current_time - last_used > self.session_ttl
]
for sid in expired_sessions:
asyncio.create_task(self.kill_session(sid))
if hasattr(self.strategy, 'sessions'):
self.sessions = self.strategy.sessions
async def close(self):
"""Close the browser and clean up resources."""
# Delegate to strategy
await self._strategy.close()
await self.strategy.close()
# Reset legacy references
self.browser = None

View File

@@ -82,6 +82,9 @@ class BaseBrowserStrategy(ABC):
return self
@abstractmethod
async def _generate_page(self, crawlerRunConfig: CrawlerRunConfig) -> Tuple[Page, BrowserContext]:
pass
async def get_page(self, crawlerRunConfig: CrawlerRunConfig) -> Tuple[Page, BrowserContext]:
"""Get a page with specified configuration.
@@ -94,6 +97,23 @@ class BaseBrowserStrategy(ABC):
Returns:
Tuple of (Page, BrowserContext)
"""
# Clean up expired sessions first
self._cleanup_expired_sessions()
# If a session_id is provided and we already have it, reuse that page + context
if crawlerRunConfig.session_id and crawlerRunConfig.session_id in self.sessions:
context, page, _ = self.sessions[crawlerRunConfig.session_id]
# Update last-used timestamp
self.sessions[crawlerRunConfig.session_id] = (context, page, time.time())
return page, context
page, context = await self._generate_page(crawlerRunConfig)
# If a session_id is specified, store this session so we can reuse later
if crawlerRunConfig.session_id:
self.sessions[crawlerRunConfig.session_id] = (context, page, time.time())
return page, context
pass
async def get_pages(self, crawlerRunConfig: CrawlerRunConfig, count: int = 1) -> List[Tuple[Page, BrowserContext]]:
@@ -120,31 +140,29 @@ class BaseBrowserStrategy(ABC):
"""
# Define common browser arguments that improve performance and stability
args = [
"--disable-gpu",
"--disable-gpu-compositing",
"--disable-software-rasterizer",
"--no-sandbox",
"--disable-dev-shm-usage",
"--no-first-run",
"--no-default-browser-check",
"--disable-infobars",
"--window-position=0,0",
"--ignore-certificate-errors",
"--ignore-certificate-errors-spki-list",
"--disable-blink-features=AutomationControlled",
"--window-position=400,0",
"--disable-renderer-backgrounding",
"--disable-ipc-flooding-protection",
"--force-color-profile=srgb",
"--mute-audio",
"--disable-gpu",
"--disable-gpu-compositing",
"--disable-software-rasterizer",
"--disable-dev-shm-usage",
"--disable-infobars",
"--disable-blink-features=AutomationControlled",
"--disable-renderer-backgrounding",
"--disable-ipc-flooding-protection",
"--disable-background-timer-throttling",
f"--window-size={self.config.viewport_width},{self.config.viewport_height}",
]
# Define browser disable options for light mode
browser_disable_options = [
"--disable-background-networking",
"--disable-background-timer-throttling",
"--disable-backgrounding-occluded-windows",
"--disable-breakpad",
"--disable-client-side-phishing-detection",
@@ -153,13 +171,10 @@ class BaseBrowserStrategy(ABC):
"--disable-extensions",
"--disable-features=TranslateUI",
"--disable-hang-monitor",
"--disable-ipc-flooding-protection",
"--disable-popup-blocking",
"--disable-prompt-on-repost",
"--disable-sync",
"--force-color-profile=srgb",
"--metrics-recording-only",
"--no-first-run",
"--password-store=basic",
"--use-mock-keychain",
]

View File

@@ -115,24 +115,11 @@ class BuiltinBrowserStrategy(CDPBrowserStrategy):
except Exception as e:
if self.logger:
self.logger.error(f"Failed to start built-in browser: {str(e)}", tag="BUILTIN")
# There is a possibility that at this point I need to clean up some resourece
raise
async def get_page(self, crawlerRunConfig: CrawlerRunConfig) -> Tuple[Page, BrowserContext]:
"""Get a page for the given configuration.
Inherits behavior from CDPBrowserStrategy for page management.
Args:
crawlerRunConfig: Configuration object for the crawler run
Returns:
Tuple of (Page, BrowserContext)
"""
# For built-in browsers, we use the same page management as CDP strategy
return await super().get_page(crawlerRunConfig)
@classmethod
def get_builtin_browser_info(cls, debugging_port: int, config_file: str, logger: Optional[AsyncLogger] = None) -> Optional[Dict[str, Any]]:
def _get_builtin_browser_info(cls, debugging_port: int, config_file: str, logger: Optional[AsyncLogger] = None) -> Optional[Dict[str, Any]]:
"""Get information about the built-in browser for a specific debugging port.
Args:
@@ -157,15 +144,14 @@ class BuiltinBrowserStrategy(CDPBrowserStrategy):
browser_info = browser_info_dict["port_map"][port_str]
# Check if the browser is still running
pids = browser_info.get('pid')
if type(pids) == str and len(pids.split("\n")) > 1:
pids = [int(pid) for pid in pids.split("\n") if pid.isdigit()]
elif type(pids) == str and pids.isdigit():
pids = [int(pids)]
elif type(pids) == int:
pids = browser_info.get('pid', '')
if isinstance(pids, str):
pids = [int(pid) for pid in pids.split() if pid.isdigit()]
elif isinstance(pids, int):
pids = [pids]
else:
pids = []
# Check if any of the PIDs are running
if not pids:
if logger:
@@ -205,7 +191,7 @@ class BuiltinBrowserStrategy(CDPBrowserStrategy):
Returns:
dict: Browser information or None if no running browser is configured
"""
return self.get_builtin_browser_info(
return self._get_builtin_browser_info(
debugging_port=self.config.debugging_port,
config_file=self.builtin_config_file,
logger=self.logger
@@ -226,7 +212,7 @@ class BuiltinBrowserStrategy(CDPBrowserStrategy):
str: CDP URL for the browser, or None if launch failed
"""
# Check if there's an existing browser still running
browser_info = self.get_builtin_browser_info(
browser_info = self._get_builtin_browser_info(
debugging_port=debugging_port,
config_file=self.builtin_config_file,
logger=self.logger
@@ -238,6 +224,7 @@ class BuiltinBrowserStrategy(CDPBrowserStrategy):
# Create a user data directory for the built-in browser
user_data_dir = os.path.join(self.builtin_browser_dir, "user_data")
# Raise error if user data dir is already engaged
if self._check_user_dir_is_engaged(user_data_dir):
raise Exception(f"User data directory {user_data_dir} is already engaged by another browser instance.")
@@ -246,15 +233,19 @@ class BuiltinBrowserStrategy(CDPBrowserStrategy):
os.makedirs(user_data_dir, exist_ok=True)
# Prepare browser launch arguments
browser_args = super()._build_browser_args()
browser_path = await get_browser_executable(browser_type)
base_args = [browser_path]
if browser_type == "chromium":
args = [
browser_path,
f"--remote-debugging-port={debugging_port}",
f"--user-data-dir={user_data_dir}",
]
if headless:
args.append("--headless=new")
# if headless:
# args.append("--headless=new")
elif browser_type == "firefox":
args = [
browser_path,
@@ -270,6 +261,8 @@ class BuiltinBrowserStrategy(CDPBrowserStrategy):
self.logger.error(f"Browser type {browser_type} not supported for built-in browser", tag="BUILTIN")
return None
args = base_args + browser_args + args
try:
# Check if the port is already in use
@@ -333,11 +326,12 @@ class BuiltinBrowserStrategy(CDPBrowserStrategy):
# Check if it already uses port mapping
if isinstance(existing_data, dict) and "port_map" in existing_data:
port_map = existing_data["port_map"]
# Convert legacy format to port mapping
elif isinstance(existing_data, dict) and "debugging_port" in existing_data:
old_port = str(existing_data.get("debugging_port"))
if self._is_browser_running(existing_data.get("pid")):
port_map[old_port] = existing_data
# # Convert legacy format to port mapping
# elif isinstance(existing_data, dict) and "debugging_port" in existing_data:
# old_port = str(existing_data.get("debugging_port"))
# if self._is_browser_running(existing_data.get("pid")):
# port_map[old_port] = existing_data
except Exception as e:
if self.logger:
self.logger.warning(f"Could not read existing config: {str(e)}", tag="BUILTIN")
@@ -413,15 +407,19 @@ class BuiltinBrowserStrategy(CDPBrowserStrategy):
# Update config file to remove this browser
with open(self.builtin_config_file, 'r') as f:
browser_info_dict = json.load(f)
# Remove this port from the dictionary
port_str = str(self.config.debugging_port)
if port_str in browser_info_dict.get("port_map", {}):
del browser_info_dict["port_map"][port_str]
with open(self.builtin_config_file, 'w') as f:
json.dump(browser_info_dict, f, indent=2)
# Remove user data directory if it exists
if os.path.exists(self.builtin_browser_dir):
shutil.rmtree(self.builtin_browser_dir)
# Clear the browser info cache
self.browser = None
self.temp_dir = None
@@ -460,14 +458,11 @@ class BuiltinBrowserStrategy(CDPBrowserStrategy):
async def close(self):
"""Close the built-in browser and clean up resources."""
# Store the shutting_down state
was_shutting_down = getattr(self, 'shutting_down', False)
# Call parent class close method
await super().close()
# Clean up built-in browser if we created it and were in shutdown mode
if was_shutting_down:
if self.shutting_down:
await self.kill_builtin_browser()
if self.logger:
self.logger.debug("Killed built-in browser during shutdown", tag="BUILTIN")

View File

@@ -68,9 +68,11 @@ class CDPBrowserStrategy(BaseBrowserStrategy):
if self.logger:
self.logger.debug(f"Connected to CDP browser at {cdp_url}", tag="CDP")
except Exception as e:
if self.logger:
self.logger.error(f"Failed to connect to CDP browser: {str(e)}", tag="CDP")
# Clean up any resources before re-raising
await self._cleanup_process()
raise
@@ -95,7 +97,32 @@ class CDPBrowserStrategy(BaseBrowserStrategy):
user_data_dir = self.config.user_data_dir
# Get browser args based on OS and browser type
args = await self._get_browser_args(user_data_dir)
# args = await self._get_browser_args(user_data_dir)
browser_args = super()._build_browser_args()
browser_path = await get_browser_executable(self.config.browser_type)
base_args = [browser_path]
if self.config.browser_type == "chromium":
args = [
f"--remote-debugging-port={self.config.debugging_port}",
f"--user-data-dir={user_data_dir}",
]
# if self.config.headless:
# args.append("--headless=new")
elif self.config.browser_type == "firefox":
args = [
"--remote-debugging-port",
str(self.config.debugging_port),
"--profile",
user_data_dir,
]
if self.config.headless:
args.append("--headless")
else:
raise NotImplementedError(f"Browser type {self.config.browser_type} not supported")
args = base_args + browser_args + args
# Start browser process
try:
@@ -136,40 +163,6 @@ class CDPBrowserStrategy(BaseBrowserStrategy):
except Exception as e:
await self._cleanup_process()
raise Exception(f"Failed to start browser: {e}")
async def _get_browser_args(self, user_data_dir: str) -> List[str]:
"""Returns browser-specific command line arguments.
Args:
user_data_dir: Path to user data directory
Returns:
List of command-line arguments for the browser
"""
browser_args = super()._build_browser_args()
browser_path = await get_browser_executable(self.config.browser_type)
base_args = [browser_path]
if self.config.browser_type == "chromium":
args = [
f"--remote-debugging-port={self.config.debugging_port}",
f"--user-data-dir={user_data_dir}",
]
if self.config.headless:
args.append("--headless=new")
elif self.config.browser_type == "firefox":
args = [
"--remote-debugging-port",
str(self.config.debugging_port),
"--profile",
user_data_dir,
]
if self.config.headless:
args.append("--headless")
else:
raise NotImplementedError(f"Browser type {self.config.browser_type} not supported")
return base_args + browser_args + args
async def _cleanup_process(self):
"""Cleanup browser process and temporary directory."""
@@ -204,15 +197,40 @@ class CDPBrowserStrategy(BaseBrowserStrategy):
if self.temp_dir and os.path.exists(self.temp_dir):
try:
shutil.rmtree(self.temp_dir)
self.temp_dir = None
if self.logger:
self.logger.debug("Removed temporary directory", tag="CDP")
except Exception as e:
if self.logger:
self.logger.error(
message="Error removing temporary directory: {error}",
tag="ERROR",
params={"error": str(e)},
tag="CDP",
params={"error": str(e)}
)
self.browser_process = None
async def get_page(self, crawlerRunConfig: CrawlerRunConfig) -> Tuple[Page, BrowserContext]:
async def _generate_page(self, crawlerRunConfig: CrawlerRunConfig) -> Tuple[Page, BrowserContext]:
# For CDP, we typically use the shared default_context
context = self.default_context
pages = context.pages
# Otherwise, check if we have an existing context for this config
config_signature = self._make_config_signature(crawlerRunConfig)
self.contexts_by_config[config_signature] = context
await self.setup_context(context, crawlerRunConfig)
# Check if there's already a page with the target URL
page = next((p for p in pages if p.url == crawlerRunConfig.url), None)
# If not found, create a new page
if not page:
page = await context.new_page()
return page, context
async def _get_page(self, crawlerRunConfig: CrawlerRunConfig) -> Tuple[Page, BrowserContext]:
"""Get a page for the given configuration.
Args:
@@ -221,15 +239,8 @@ class CDPBrowserStrategy(BaseBrowserStrategy):
Returns:
Tuple of (Page, BrowserContext)
"""
# Clean up expired sessions using base class method
self._cleanup_expired_sessions()
# If a session_id is provided and we already have it, reuse that page + context
if crawlerRunConfig.session_id and crawlerRunConfig.session_id in self.sessions:
context, page, _ = self.sessions[crawlerRunConfig.session_id]
# Update last-used timestamp
self.sessions[crawlerRunConfig.session_id] = (context, page, time.time())
return page, context
# Call parent method to ensure browser is started
await super().get_page(crawlerRunConfig)
# For CDP, we typically use the shared default_context
context = self.default_context
@@ -266,24 +277,5 @@ class CDPBrowserStrategy(BaseBrowserStrategy):
await super().close()
# Additional CDP-specific cleanup
if self.browser_process:
await asyncio.sleep(0.5)
await self._cleanup_process()
self.browser_process = None
if self.logger:
self.logger.debug("Cleaned up CDP browser process", tag="CDP")
# Clean up temporary directory
if self.temp_dir and os.path.exists(self.temp_dir):
try:
shutil.rmtree(self.temp_dir)
self.temp_dir = None
if self.logger:
self.logger.debug("Removed temporary directory", tag="CDP")
except Exception as e:
if self.logger:
self.logger.error(
message="Error removing temporary directory: {error}",
tag="CDP",
params={"error": str(e)}
)
await asyncio.sleep(0.5)
await self._cleanup_process()

View File

@@ -15,7 +15,7 @@ from ..models import DockerConfig
from ..docker_registry import DockerRegistry
from ..docker_utils import DockerUtils
from .builtin import CDPBrowserStrategy
from .base import BaseBrowserStrategy
class DockerBrowserStrategy(CDPBrowserStrategy):
"""Docker-based browser strategy.
@@ -79,9 +79,7 @@ class DockerBrowserStrategy(CDPBrowserStrategy):
self: For method chaining
"""
# Initialize Playwright
from ..utils import get_playwright
self.playwright = await get_playwright()
await BaseBrowserStrategy.start(self)
if self.logger:
self.logger.info(
@@ -172,121 +170,6 @@ class DockerBrowserStrategy(CDPBrowserStrategy):
# Use the utility method to generate the hash
return self.docker_utils.generate_config_hash(config_dict)
async def _get_or_create_cdp_url1(self) -> str:
"""Get CDP URL by either creating a new container or using an existing one.
Returns:
CDP URL for connecting to the browser
Raises:
Exception: If container creation or browser launch fails
"""
# If CDP URL is explicitly provided, use it
if self.config.cdp_url:
return self.config.cdp_url
# Ensure Docker image exists (will build if needed)
image_name = await self.docker_utils.ensure_docker_image_exists(
self.docker_config.image, self.docker_config.mode
)
# Generate config hash for container matching
config_hash = await self._generate_config_hash()
# Look for existing container with matching config
container_id = self.registry.find_container_by_config(
config_hash, self.docker_utils
)
if container_id:
# Use existing container
self.container_id = container_id
host_port = self.registry.get_container_host_port(container_id)
if self.logger:
self.logger.info(
f"Using existing Docker container: {container_id[:12]}",
tag="DOCKER",
)
else:
# Get a port for the new container
host_port = (
self.docker_config.host_port
or self.registry.get_next_available_port(self.docker_utils)
)
# Prepare volumes list
volumes = list(self.docker_config.volumes)
# Add user data directory if specified
if self.docker_config.user_data_dir:
# Ensure user data directory exists
os.makedirs(self.docker_config.user_data_dir, exist_ok=True)
volumes.append(
f"{self.docker_config.user_data_dir}:{self.docker_config.container_user_data_dir}"
)
# Update config user_data_dir to point to container path
self.config.user_data_dir = self.docker_config.container_user_data_dir
# Create a new container
container_id = await self.docker_utils.create_container(
image_name=image_name,
host_port=host_port,
container_name=self.container_name,
volumes=volumes,
network=self.docker_config.network,
env_vars=self.docker_config.env_vars,
extra_args=self.docker_config.extra_args,
)
if not container_id:
raise Exception("Failed to create Docker container")
self.container_id = container_id
# Register the container
self.registry.register_container(container_id, host_port, config_hash)
# Wait for container to be ready
await self.docker_utils.wait_for_container_ready(container_id)
# Handle specific setup based on mode
if self.docker_config.mode == "launch":
# In launch mode, we need to start socat and Chrome
await self.docker_utils.start_socat_in_container(container_id)
# Build browser arguments
browser_args = self._build_browser_args()
# Launch Chrome
await self.docker_utils.launch_chrome_in_container(
container_id, browser_args
)
# Get PIDs for later cleanup
self.chrome_process_id = (
await self.docker_utils.get_process_id_in_container(
container_id, "chrome"
)
)
self.socat_process_id = (
await self.docker_utils.get_process_id_in_container(
container_id, "socat"
)
)
# Wait for CDP to be ready
await self.docker_utils.wait_for_cdp_ready(host_port)
if self.logger:
self.logger.success(
f"Docker container ready: {container_id[:12]} on port {host_port}",
tag="DOCKER",
)
# Return CDP URL
return f"http://localhost:{host_port}"
async def _get_or_create_cdp_url(self) -> str:
"""Get CDP URL by either creating a new container or using an existing one.
@@ -465,8 +348,7 @@ class DockerBrowserStrategy(CDPBrowserStrategy):
async def close(self):
"""Close the browser and clean up Docker container if needed."""
# Set flag to track if we were the ones initiating shutdown
initiated_shutdown = not getattr(self, "shutting_down", False)
initiated_shutdown = not self.shutting_down
# Storage persistence for Docker needs special handling
# We need to store state before calling super().close() which will close the browser
if (

View File

@@ -80,8 +80,26 @@ class PlaywrightBrowserStrategy(BaseBrowserStrategy):
raise
return self
async def get_page(self, crawlerRunConfig: CrawlerRunConfig) -> Tuple[Page, BrowserContext]:
async def _generate_page(self, crawlerRunConfig: CrawlerRunConfig) -> Tuple[Page, BrowserContext]:
# Otherwise, check if we have an existing context for this config
config_signature = self._make_config_signature(crawlerRunConfig)
async with self._contexts_lock:
if config_signature in self.contexts_by_config:
context = self.contexts_by_config[config_signature]
else:
# Create and setup a new context
context = await self.create_browser_context(crawlerRunConfig)
await self.setup_context(context, crawlerRunConfig)
self.contexts_by_config[config_signature] = context
# Create a new page from the chosen context
page = await context.new_page()
return page, context
async def _get_page(self, crawlerRunConfig: CrawlerRunConfig) -> Tuple[Page, BrowserContext]:
"""Get a page for the given configuration.
Args:
@@ -90,15 +108,8 @@ class PlaywrightBrowserStrategy(BaseBrowserStrategy):
Returns:
Tuple of (Page, BrowserContext)
"""
# Clean up expired sessions first
self._cleanup_expired_sessions()
# If a session_id is provided and we already have it, reuse that page + context
if crawlerRunConfig.session_id and crawlerRunConfig.session_id in self.sessions:
context, page, _ = self.sessions[crawlerRunConfig.session_id]
# Update last-used timestamp
self.sessions[crawlerRunConfig.session_id] = (context, page, time.time())
return page, context
# Call parent method to ensure browser is started
await super().get_page(crawlerRunConfig)
# Otherwise, check if we have an existing context for this config
config_signature = self._make_config_signature(crawlerRunConfig)
@@ -121,8 +132,3 @@ class PlaywrightBrowserStrategy(BaseBrowserStrategy):
return page, context
async def close(self):
"""Close the Playwright browser and clean up resources."""
# The base implementation already handles everything needed for Playwright
# including storage persistence, sessions, contexts, browser and playwright
await super().close()

View File

@@ -1,5 +1,7 @@
from pydantic import BaseModel, HttpUrl, PrivateAttr
from typing import List, Dict, Optional, Callable, Awaitable, Union, Any
from typing import AsyncGenerator
from typing import Generic, TypeVar
from enum import Enum
from dataclasses import dataclass
from .ssl_certificate import SSLCertificate
@@ -34,34 +36,12 @@ class CrawlerTaskResult:
def success(self) -> bool:
return self.result.success
class CrawlStatus(Enum):
QUEUED = "QUEUED"
IN_PROGRESS = "IN_PROGRESS"
COMPLETED = "COMPLETED"
FAILED = "FAILED"
# @dataclass
# class CrawlStats:
# task_id: str
# url: str
# status: CrawlStatus
# start_time: Optional[datetime] = None
# end_time: Optional[datetime] = None
# memory_usage: float = 0.0
# peak_memory: float = 0.0
# error_message: str = ""
# @property
# def duration(self) -> str:
# if not self.start_time:
# return "0:00"
# end = self.end_time or datetime.now()
# duration = end - self.start_time
# return str(timedelta(seconds=int(duration.total_seconds())))
@dataclass
class CrawlStats:
task_id: str
@@ -95,7 +75,6 @@ class CrawlStats:
duration = end - start
return str(timedelta(seconds=int(duration.total_seconds())))
class DisplayMode(Enum):
DETAILED = "DETAILED"
AGGREGATED = "AGGREGATED"
@@ -112,12 +91,10 @@ class TokenUsage:
completion_tokens_details: Optional[dict] = None
prompt_tokens_details: Optional[dict] = None
class UrlModel(BaseModel):
url: HttpUrl
forced: bool = False
class MarkdownGenerationResult(BaseModel):
raw_markdown: str
markdown_with_citations: str
@@ -284,6 +261,40 @@ class StringCompatibleMarkdown(str):
def __getattr__(self, name):
return getattr(self._markdown_result, name)
CrawlResultT = TypeVar('CrawlResultT', bound=CrawlResult)
class CrawlResultContainer(Generic[CrawlResultT]):
def __init__(self, results: Union[CrawlResultT, List[CrawlResultT]]):
# Normalize to a list
if isinstance(results, list):
self._results = results
else:
self._results = [results]
def __iter__(self):
return iter(self._results)
def __getitem__(self, index):
return self._results[index]
def __len__(self):
return len(self._results)
def __getattr__(self, attr):
# Delegate attribute access to the first element.
if self._results:
return getattr(self._results[0], attr)
raise AttributeError(f"{self.__class__.__name__} object has no attribute '{attr}'")
def __repr__(self):
return f"{self.__class__.__name__}({self._results!r})"
RunManyReturn = Union[
CrawlResultContainer[CrawlResultT],
AsyncGenerator[CrawlResultT, None]
]
# END of backward compatibility code for markdown/markdown_v2.
# When removing this code in the future, make sure to:
# 1. Replace the private attribute and property with a standard field
@@ -304,7 +315,6 @@ class AsyncCrawlResponse(BaseModel):
class Config:
arbitrary_types_allowed = True
###############################
# Scraping Models
###############################

View File

@@ -530,7 +530,7 @@ async def test_docker_registry_reuse():
logger.info("First browser started successfully", tag="TEST")
# Get container ID from the strategy
docker_strategy1 = manager1._strategy
docker_strategy1 = manager1.strategy
container_id1 = docker_strategy1.container_id
logger.info(f"First browser container ID: {container_id1[:12]}", tag="TEST")
@@ -560,7 +560,7 @@ async def test_docker_registry_reuse():
logger.info("Second browser started successfully", tag="TEST")
# Get container ID from the second strategy
docker_strategy2 = manager2._strategy
docker_strategy2 = manager2.strategy
container_id2 = docker_strategy2.container_id
logger.info(f"Second browser container ID: {container_id2[:12]}", tag="TEST")

View File

@@ -56,13 +56,13 @@ async def test_builtin_browser_creation():
# Step 2: Check if we have a BuiltinBrowserStrategy
print(f"\n{INFO}2. Checking if we have a BuiltinBrowserStrategy{RESET}")
if isinstance(manager._strategy, BuiltinBrowserStrategy):
if isinstance(manager.strategy, BuiltinBrowserStrategy):
print(
f"{SUCCESS}Correct strategy type: {manager._strategy.__class__.__name__}{RESET}"
f"{SUCCESS}Correct strategy type: {manager.strategy.__class__.__name__}{RESET}"
)
else:
print(
f"{ERROR}Wrong strategy type: {manager._strategy.__class__.__name__}{RESET}"
f"{ERROR}Wrong strategy type: {manager.strategy.__class__.__name__}{RESET}"
)
return None
@@ -77,7 +77,7 @@ async def test_builtin_browser_creation():
# Step 4: Get browser info from the strategy
print(f"\n{INFO}4. Getting browser information{RESET}")
browser_info = manager._strategy.get_browser_info()
browser_info = manager.strategy.get_browser_info()
if browser_info:
print(f"{SUCCESS}Browser info retrieved:{RESET}")
for key, value in browser_info.items():
@@ -149,7 +149,7 @@ async def test_browser_status_management(manager: BrowserManager):
# Step 1: Get browser status
print(f"\n{INFO}1. Getting browser status{RESET}")
try:
status = await manager._strategy.get_builtin_browser_status()
status = await manager.strategy.get_builtin_browser_status()
print(f"{SUCCESS}Browser status:{RESET}")
print(f" Running: {status['running']}")
print(f" CDP URL: {status['cdp_url']}")
@@ -160,7 +160,7 @@ async def test_browser_status_management(manager: BrowserManager):
# Step 2: Test killing the browser
print(f"\n{INFO}2. Testing killing the browser{RESET}")
try:
result = await manager._strategy.kill_builtin_browser()
result = await manager.strategy.kill_builtin_browser()
if result:
print(f"{SUCCESS}Browser killed successfully{RESET}")
else:
@@ -172,7 +172,7 @@ async def test_browser_status_management(manager: BrowserManager):
# Step 3: Check status after kill
print(f"\n{INFO}3. Checking status after kill{RESET}")
try:
status = await manager._strategy.get_builtin_browser_status()
status = await manager.strategy.get_builtin_browser_status()
if not status["running"]:
print(f"{SUCCESS}Browser is correctly reported as not running{RESET}")
else:
@@ -184,7 +184,7 @@ async def test_browser_status_management(manager: BrowserManager):
# Step 4: Launch a new browser
print(f"\n{INFO}4. Launching a new browser{RESET}")
try:
cdp_url = await manager._strategy.launch_builtin_browser(
cdp_url = await manager.strategy.launch_builtin_browser(
browser_type="chromium", headless=True
)
if cdp_url:
@@ -223,8 +223,8 @@ async def test_multiple_managers():
print(f"{SUCCESS}Second manager started{RESET}")
# Check if they got the same CDP URL
cdp_url1 = manager1._strategy.config.cdp_url
cdp_url2 = manager2._strategy.config.cdp_url
cdp_url1 = manager1.strategy.config.cdp_url
cdp_url2 = manager2.strategy.config.cdp_url
if cdp_url1 == cdp_url2:
print(
@@ -316,7 +316,7 @@ async def test_edge_cases():
# Kill the browser directly
print(f"{INFO}Killing the browser...{RESET}")
await manager._strategy.kill_builtin_browser()
await manager.strategy.kill_builtin_browser()
print(f"{SUCCESS}Browser killed{RESET}")
# Try to get a page (should fail or launch a new browser)
@@ -350,7 +350,7 @@ async def cleanup_browsers():
try:
# No need to start, just access the strategy directly
strategy = manager._strategy
strategy = manager.strategy
if isinstance(strategy, BuiltinBrowserStrategy):
result = await strategy.kill_builtin_browser()
if result:
@@ -420,7 +420,7 @@ async def test_performance_scaling():
user_data_dir=os.path.join(temp_dir, f"browser_profile_{i}"),
)
manager = BrowserManager(browser_config=browser_config, logger=logger)
manager._strategy.shutting_down = True
manager.strategy.shutting_down = True
manager_configs.append((manager, i, port))
# Define async function to start a single manager
@@ -614,7 +614,7 @@ async def test_performance_scaling_lab( num_browsers: int = 10, pages_per_browse
user_data_dir=os.path.join(temp_dir, f"browser_profile_{i}"),
)
manager = BrowserManager(browser_config=browser_config, logger=logger)
manager._strategy.shutting_down = True
manager.strategy.shutting_down = True
manager_configs.append((manager, i, port))
# Define async function to start a single manager