diff --git a/crawl4ai/browser_profiler.py b/crawl4ai/browser_profiler.py index bc902f61..1a961e03 100644 --- a/crawl4ai/browser_profiler.py +++ b/crawl4ai/browser_profiler.py @@ -65,6 +65,213 @@ class BrowserProfiler: self.builtin_config_file = os.path.join(self.builtin_browser_dir, "browser_config.json") os.makedirs(self.builtin_browser_dir, exist_ok=True) + def _is_windows(self) -> bool: + """Check if running on Windows platform.""" + return sys.platform.startswith('win') or sys.platform == 'cygwin' + + def _is_macos(self) -> bool: + """Check if running on macOS platform.""" + return sys.platform == 'darwin' + + def _is_linux(self) -> bool: + """Check if running on Linux platform.""" + return sys.platform.startswith('linux') + + def _get_quit_message(self, tag: str) -> str: + """Get appropriate quit message based on context.""" + if tag == "PROFILE": + return "Closing browser and saving profile..." + elif tag == "CDP": + return "Closing browser..." + else: + return "Closing browser..." + + async def _listen_windows(self, user_done_event, check_browser_process, tag: str): + """Windows-specific keyboard listener using msvcrt.""" + try: + import msvcrt + except ImportError: + raise ImportError("msvcrt module not available on this platform") + + while True: + try: + # Check for keyboard input + if msvcrt.kbhit(): + raw = msvcrt.getch() + + # Handle Unicode decoding more robustly + key = None + try: + key = raw.decode("utf-8") + except UnicodeDecodeError: + try: + # Try different encodings + key = raw.decode("latin1") + except UnicodeDecodeError: + # Skip if we can't decode + continue + + # Validate key + if not key or len(key) != 1: + continue + + # Check for printable characters only + if not key.isprintable(): + continue + + # Check for quit command + if key.lower() == "q": + self.logger.info( + self._get_quit_message(tag), + tag=tag, + base_color=LogColor.GREEN + ) + user_done_event.set() + return + + # Check if browser process ended + if await check_browser_process(): + return + + # Small delay to prevent busy waiting + await asyncio.sleep(0.1) + + except Exception as e: + self.logger.warning(f"Error in Windows keyboard listener: {e}", tag=tag) + # Continue trying instead of failing completely + await asyncio.sleep(0.1) + continue + + async def _listen_unix(self, user_done_event: asyncio.Event, check_browser_process, tag: str): + """Unix/Linux/macOS keyboard listener using termios and select.""" + try: + import termios + import tty + import select + except ImportError: + raise ImportError("termios/tty/select modules not available on this platform") + + # Get stdin file descriptor + try: + fd = sys.stdin.fileno() + except (AttributeError, OSError): + raise ImportError("stdin is not a terminal") + + # Save original terminal settings + old_settings = None + try: + old_settings = termios.tcgetattr(fd) + except termios.error as e: + raise ImportError(f"Cannot get terminal attributes: {e}") + + try: + # Switch to non-canonical mode (cbreak mode) + tty.setcbreak(fd) + + while True: + try: + # Use select to check if input is available (non-blocking) + # Timeout of 0.5 seconds to periodically check browser process + readable, _, _ = select.select([sys.stdin], [], [], 0.5) + + if readable: + # Read one character + key = sys.stdin.read(1) + + if key and key.lower() == "q": + self.logger.info( + self._get_quit_message(tag), + tag=tag, + base_color=LogColor.GREEN + ) + user_done_event.set() + return + + # Check if browser process ended + if await check_browser_process(): + return + + # Small delay to prevent busy waiting + await asyncio.sleep(0.1) + + except (KeyboardInterrupt, EOFError): + # Handle Ctrl+C or EOF gracefully + self.logger.info("Keyboard interrupt received", tag=tag) + user_done_event.set() + return + except Exception as e: + self.logger.warning(f"Error in Unix keyboard listener: {e}", tag=tag) + await asyncio.sleep(0.1) + continue + + finally: + # Always restore terminal settings + if old_settings is not None: + try: + termios.tcsetattr(fd, termios.TCSADRAIN, old_settings) + except Exception as e: + self.logger.error(f"Failed to restore terminal settings: {e}", tag=tag) + + async def _listen_fallback(self, user_done_event: asyncio.Event, check_browser_process, tag: str): + """Fallback keyboard listener using simple input() method.""" + self.logger.info("Using fallback input mode. Type 'q' and press Enter to quit.", tag=tag) + + # Run input in a separate thread to avoid blocking + import threading + import queue + + input_queue = queue.Queue() + + def input_thread(): + """Thread function to handle input.""" + try: + while not user_done_event.is_set(): + try: + # Use input() with a prompt + user_input = input("Press 'q' + Enter to quit: ").strip().lower() + input_queue.put(user_input) + if user_input == 'q': + break + except (EOFError, KeyboardInterrupt): + input_queue.put('q') + break + except Exception as e: + self.logger.warning(f"Error in input thread: {e}", tag=tag) + break + except Exception as e: + self.logger.error(f"Input thread failed: {e}", tag=tag) + + # Start input thread + thread = threading.Thread(target=input_thread, daemon=True) + thread.start() + + try: + while not user_done_event.is_set(): + # Check for user input + try: + user_input = input_queue.get_nowait() + if user_input == 'q': + self.logger.info( + self._get_quit_message(tag), + tag=tag, + base_color=LogColor.GREEN + ) + user_done_event.set() + return + except queue.Empty: + pass + + # Check if browser process ended + if await check_browser_process(): + return + + # Small delay + await asyncio.sleep(0.5) + + except Exception as e: + self.logger.error(f"Fallback listener failed: {e}", tag=tag) + user_done_event.set() + async def create_profile(self, profile_name: Optional[str] = None, browser_config: Optional[BrowserConfig] = None) -> Optional[str]: @@ -180,42 +387,38 @@ class BrowserProfiler: # Run keyboard input loop in a separate task async def listen_for_quit_command(): - import termios - import tty - import select - + """Cross-platform keyboard listener that waits for 'q' key press.""" # First output the prompt - self.logger.info("Press 'q' when you've finished using the browser...", tag="PROFILE") - - # Save original terminal settings - fd = sys.stdin.fileno() - old_settings = termios.tcgetattr(fd) - + self.logger.info( + "Press {segment} when you've finished using the browser...", + tag="PROFILE", + params={"segment": "'q'"}, colors={"segment": LogColor.YELLOW}, + base_color=LogColor.CYAN + ) + + async def check_browser_process(): + """Check if browser process is still running.""" + if ( + managed_browser.browser_process + and managed_browser.browser_process.poll() is not None + ): + self.logger.info( + "Browser already closed. Ending input listener.", tag="PROFILE" + ) + user_done_event.set() + return True + return False + + # Try platform-specific implementations with fallback try: - # Switch to non-canonical mode (no line buffering) - tty.setcbreak(fd) - - while True: - # Check if input is available (non-blocking) - readable, _, _ = select.select([sys.stdin], [], [], 0.5) - if readable: - key = sys.stdin.read(1) - if key.lower() == 'q': - self.logger.info("Closing browser and saving profile...", tag="PROFILE", base_color=LogColor.GREEN) - user_done_event.set() - return - - # Check if the browser process has already exited - if managed_browser.browser_process and managed_browser.browser_process.poll() is not None: - self.logger.info("Browser already closed. Ending input listener.", tag="PROFILE") - user_done_event.set() - return - - await asyncio.sleep(0.1) - - finally: - # Restore terminal settings - termios.tcsetattr(fd, termios.TCSADRAIN, old_settings) + if self._is_windows(): + await self._listen_windows(user_done_event, check_browser_process, "PROFILE") + else: + await self._listen_unix(user_done_event, check_browser_process, "PROFILE") + except Exception as e: + self.logger.warning(f"Platform-specific keyboard listener failed: {e}", tag="PROFILE") + self.logger.info("Falling back to simple input mode...", tag="PROFILE") + await self._listen_fallback(user_done_event, check_browser_process, "PROFILE") try: from playwright.async_api import async_playwright @@ -682,42 +885,33 @@ class BrowserProfiler: # Run keyboard input loop in a separate task async def listen_for_quit_command(): - import termios - import tty - import select - + """Cross-platform keyboard listener that waits for 'q' key press.""" # First output the prompt - self.logger.info("Press 'q' to stop the browser and exit...", tag="CDP") - - # Save original terminal settings - fd = sys.stdin.fileno() - old_settings = termios.tcgetattr(fd) - + self.logger.info( + "Press {segment} to stop the browser and exit...", + tag="CDP", + params={"segment": "'q'"}, colors={"segment": LogColor.YELLOW}, + base_color=LogColor.CYAN + ) + + async def check_browser_process(): + """Check if browser process is still running.""" + if managed_browser.browser_process and managed_browser.browser_process.poll() is not None: + self.logger.info("Browser already closed. Ending input listener.", tag="CDP") + user_done_event.set() + return True + return False + + # Try platform-specific implementations with fallback try: - # Switch to non-canonical mode (no line buffering) - tty.setcbreak(fd) - - while True: - # Check if input is available (non-blocking) - readable, _, _ = select.select([sys.stdin], [], [], 0.5) - if readable: - key = sys.stdin.read(1) - if key.lower() == 'q': - self.logger.info("Closing browser...", tag="CDP") - user_done_event.set() - return - - # Check if the browser process has already exited - if managed_browser.browser_process and managed_browser.browser_process.poll() is not None: - self.logger.info("Browser already closed. Ending input listener.", tag="CDP") - user_done_event.set() - return - - await asyncio.sleep(0.1) - - finally: - # Restore terminal settings - termios.tcsetattr(fd, termios.TCSADRAIN, old_settings) + if self._is_windows(): + await self._listen_windows(user_done_event, check_browser_process, "CDP") + else: + await self._listen_unix(user_done_event, check_browser_process, "CDP") + except Exception as e: + self.logger.warning(f"Platform-specific keyboard listener failed: {e}", tag="CDP") + self.logger.info("Falling back to simple input mode...", tag="CDP") + await self._listen_fallback(user_done_event, check_browser_process, "CDP") # Function to retrieve and display CDP JSON config async def get_cdp_json(port): diff --git a/tests/browser/test_profiles.py b/tests/browser/test_profiles.py index 8325b561..e49a2506 100644 --- a/tests/browser/test_profiles.py +++ b/tests/browser/test_profiles.py @@ -10,11 +10,13 @@ import sys import uuid import shutil +from crawl4ai import BrowserProfiler +from crawl4ai.browser_manager import BrowserManager + # Add the project root to Python path if running directly if __name__ == "__main__": sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '../..'))) -from crawl4ai.browser import BrowserManager, BrowserProfileManager from crawl4ai.async_configs import BrowserConfig, CrawlerRunConfig from crawl4ai.async_logger import AsyncLogger @@ -25,7 +27,7 @@ async def test_profile_creation(): """Test creating and managing browser profiles.""" logger.info("Testing profile creation and management", tag="TEST") - profile_manager = BrowserProfileManager(logger=logger) + profile_manager = BrowserProfiler(logger=logger) try: # List existing profiles @@ -83,7 +85,7 @@ async def test_profile_with_browser(): """Test using a profile with a browser.""" logger.info("Testing using a profile with a browser", tag="TEST") - profile_manager = BrowserProfileManager(logger=logger) + profile_manager = BrowserProfiler(logger=logger) test_profile_name = f"test-browser-profile-{uuid.uuid4().hex[:8]}" profile_path = None @@ -101,6 +103,8 @@ async def test_profile_with_browser(): # Now use this profile with a browser browser_config = BrowserConfig( user_data_dir=profile_path, + use_managed_browser=True, + use_persistent_context=True, headless=True ) diff --git a/tests/profiler/test_crteate_profile.py b/tests/profiler/test_create_profile.py similarity index 100% rename from tests/profiler/test_crteate_profile.py rename to tests/profiler/test_create_profile.py diff --git a/tests/profiler/test_keyboard_handle.py b/tests/profiler/test_keyboard_handle.py new file mode 100644 index 00000000..8845c105 --- /dev/null +++ b/tests/profiler/test_keyboard_handle.py @@ -0,0 +1,55 @@ +import sys +import pytest +import asyncio +from unittest.mock import patch, MagicMock +from crawl4ai.browser_profiler import BrowserProfiler + +@pytest.mark.asyncio +@pytest.mark.skipif(sys.platform != "win32", reason="Windows-specific msvcrt test") +async def test_keyboard_input_handling(): + # Mock sequence of keystrokes: arrow key followed by 'q' + mock_keys = [b'\x00K', b'q'] + mock_kbhit = MagicMock(side_effect=[True, True, False]) + mock_getch = MagicMock(side_effect=mock_keys) + + with patch('msvcrt.kbhit', mock_kbhit), patch('msvcrt.getch', mock_getch): + # profiler = BrowserProfiler() + user_done_event = asyncio.Event() + + # Create a local async function to simulate the keyboard input handling + async def test_listen_for_quit_command(): + if sys.platform == "win32": + while True: + try: + if mock_kbhit(): + raw = mock_getch() + try: + key = raw.decode("utf-8") + except UnicodeDecodeError: + continue + + if len(key) != 1 or not key.isprintable(): + continue + + if key.lower() == "q": + user_done_event.set() + return + + await asyncio.sleep(0.1) + except Exception as e: + continue + + # Run the listener + listener_task = asyncio.create_task(test_listen_for_quit_command()) + + # Wait for the event to be set + try: + await asyncio.wait_for(user_done_event.wait(), timeout=1.0) + assert user_done_event.is_set() + finally: + if not listener_task.done(): + listener_task.cancel() + try: + await listener_task + except asyncio.CancelledError: + pass \ No newline at end of file