diff --git a/crawl4ai/browser_profiler.py b/crawl4ai/browser_profiler.py index bc902f61..f09fa989 100644 --- a/crawl4ai/browser_profiler.py +++ b/crawl4ai/browser_profiler.py @@ -180,42 +180,97 @@ class BrowserProfiler: # Run keyboard input loop in a separate task async def listen_for_quit_command(): - import termios - import tty - import select - + import sys + # First output the prompt - self.logger.info("Press 'q' when you've finished using the browser...", tag="PROFILE") - - # Save original terminal settings - fd = sys.stdin.fileno() - old_settings = termios.tcgetattr(fd) - - try: - # Switch to non-canonical mode (no line buffering) - tty.setcbreak(fd) - + self.logger.info( + "Press {segment} when you've finished using the browser...", + tag="PROFILE", + params={"segment": "'q'"}, colors={"segment": LogColor.YELLOW}, + base_color=LogColor.CYAN + ) + + async def check_browser_process(): + if ( + managed_browser.browser_process + and managed_browser.browser_process.poll() is not None + ): + self.logger.info( + "Browser already closed. Ending input listener.", tag="PROFILE" + ) + user_done_event.set() + return True + return False + + # Platform-specific handling + if sys.platform == "win32": + import msvcrt + while True: - # Check if input is available (non-blocking) - readable, _, _ = select.select([sys.stdin], [], [], 0.5) - if readable: - key = sys.stdin.read(1) - if key.lower() == 'q': - self.logger.info("Closing browser and saving profile...", tag="PROFILE", base_color=LogColor.GREEN) - user_done_event.set() + try: + if msvcrt.kbhit(): + raw = msvcrt.getch() + try: + key = raw.decode("utf-8") + except UnicodeDecodeError: + # Arrow/function keys come back as multi-byte sequences + continue + + # Skip control/multi-byte keys that decoded but aren't printable + if len(key) != 1 or not key.isprintable(): + continue + + if key.lower() == "q": + self.logger.info( + "Closing browser and saving profile...", + tag="PROFILE", + base_color=LogColor.GREEN + ) + user_done_event.set() + return + + if await check_browser_process(): return - - # Check if the browser process has already exited - if managed_browser.browser_process and managed_browser.browser_process.poll() is not None: - self.logger.info("Browser already closed. Ending input listener.", tag="PROFILE") - user_done_event.set() - return - - await asyncio.sleep(0.1) - - finally: - # Restore terminal settings - termios.tcsetattr(fd, termios.TCSADRAIN, old_settings) + + await asyncio.sleep(0.1) + except Exception as e: + self.logger.error(f"Error in keyboard listener: {e}", tag="PROFILE") + continue + + else: # Unix-like + import termios + import tty + import select + + # Save original terminal settings + fd = sys.stdin.fileno() + old_settings = termios.tcgetattr(fd) + + try: + # Switch to non-canonical mode (no line buffering) + tty.setcbreak(fd) + + while True: + # Check if input is available (non-blocking) + readable, _, _ = select.select([sys.stdin], [], [], 0.5) + if readable: + key = sys.stdin.read(1) + if key.lower() == "q": + self.logger.info( + "Closing browser and saving profile...", + tag="PROFILE", + base_color=LogColor.GREEN + ) + user_done_event.set() + return + + if await check_browser_process(): + return + + await asyncio.sleep(0.1) + finally: + # Restore terminal settings + termios.tcsetattr(fd, termios.TCSADRAIN, old_settings) try: from playwright.async_api import async_playwright @@ -682,42 +737,76 @@ class BrowserProfiler: # Run keyboard input loop in a separate task async def listen_for_quit_command(): - import termios - import tty - import select - + import sys + # First output the prompt - self.logger.info("Press 'q' to stop the browser and exit...", tag="CDP") - - # Save original terminal settings - fd = sys.stdin.fileno() - old_settings = termios.tcgetattr(fd) - - try: - # Switch to non-canonical mode (no line buffering) - tty.setcbreak(fd) - + self.logger.info( + "Press {segment} to stop the browser and exit...", + tag="CDP", + params={"segment": "'q'"}, colors={"segment": LogColor.YELLOW}, + base_color=LogColor.CYAN + ) + + async def check_browser_process(): + if managed_browser.browser_process and managed_browser.browser_process.poll() is not None: + self.logger.info("Browser already closed. Ending input listener.", tag="CDP") + user_done_event.set() + return True + return False + + if sys.platform == "win32": + import msvcrt + while True: - # Check if input is available (non-blocking) - readable, _, _ = select.select([sys.stdin], [], [], 0.5) - if readable: - key = sys.stdin.read(1) - if key.lower() == 'q': - self.logger.info("Closing browser...", tag="CDP") - user_done_event.set() + try: + if msvcrt.kbhit(): + raw = msvcrt.getch() + try: + key = raw.decode("utf-8") + except UnicodeDecodeError: + # Arrow/function keys come back as multi-byte sequences + continue + + # Skip control/multi-byte keys that decoded but aren't printable + if len(key) != 1 or not key.isprintable(): + continue + + if key.lower() == "q": + self.logger.info("Closing browser...", tag="CDP") + user_done_event.set() + return + + if await check_browser_process(): return - - # Check if the browser process has already exited - if managed_browser.browser_process and managed_browser.browser_process.poll() is not None: - self.logger.info("Browser already closed. Ending input listener.", tag="CDP") - user_done_event.set() - return - - await asyncio.sleep(0.1) - - finally: - # Restore terminal settings - termios.tcsetattr(fd, termios.TCSADRAIN, old_settings) + + await asyncio.sleep(0.1) + except Exception as e: + self.logger.error(f"Error in keyboard listener: {e}", tag="CDP") + continue + else: + import termios + import tty + import select + + fd = sys.stdin.fileno() + old_settings = termios.tcgetattr(fd) + + try: + tty.setcbreak(fd) + while True: + readable, _, _ = select.select([sys.stdin], [], [], 0.5) + if readable: + key = sys.stdin.read(1) + if key.lower() == "q": + self.logger.info("Closing browser...", tag="CDP") + user_done_event.set() + return + + if await check_browser_process(): + return + await asyncio.sleep(0.1) + finally: + termios.tcsetattr(fd, termios.TCSADRAIN, old_settings) # Function to retrieve and display CDP JSON config async def get_cdp_json(port): diff --git a/tests/browser/test_profiles.py b/tests/browser/test_profiles.py index 8325b561..e49a2506 100644 --- a/tests/browser/test_profiles.py +++ b/tests/browser/test_profiles.py @@ -10,11 +10,13 @@ import sys import uuid import shutil +from crawl4ai import BrowserProfiler +from crawl4ai.browser_manager import BrowserManager + # Add the project root to Python path if running directly if __name__ == "__main__": sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '../..'))) -from crawl4ai.browser import BrowserManager, BrowserProfileManager from crawl4ai.async_configs import BrowserConfig, CrawlerRunConfig from crawl4ai.async_logger import AsyncLogger @@ -25,7 +27,7 @@ async def test_profile_creation(): """Test creating and managing browser profiles.""" logger.info("Testing profile creation and management", tag="TEST") - profile_manager = BrowserProfileManager(logger=logger) + profile_manager = BrowserProfiler(logger=logger) try: # List existing profiles @@ -83,7 +85,7 @@ async def test_profile_with_browser(): """Test using a profile with a browser.""" logger.info("Testing using a profile with a browser", tag="TEST") - profile_manager = BrowserProfileManager(logger=logger) + profile_manager = BrowserProfiler(logger=logger) test_profile_name = f"test-browser-profile-{uuid.uuid4().hex[:8]}" profile_path = None @@ -101,6 +103,8 @@ async def test_profile_with_browser(): # Now use this profile with a browser browser_config = BrowserConfig( user_data_dir=profile_path, + use_managed_browser=True, + use_persistent_context=True, headless=True ) diff --git a/tests/profiler/test_crteate_profile.py b/tests/profiler/test_create_profile.py similarity index 100% rename from tests/profiler/test_crteate_profile.py rename to tests/profiler/test_create_profile.py diff --git a/tests/profiler/test_keyboard_handle.py b/tests/profiler/test_keyboard_handle.py new file mode 100644 index 00000000..8845c105 --- /dev/null +++ b/tests/profiler/test_keyboard_handle.py @@ -0,0 +1,55 @@ +import sys +import pytest +import asyncio +from unittest.mock import patch, MagicMock +from crawl4ai.browser_profiler import BrowserProfiler + +@pytest.mark.asyncio +@pytest.mark.skipif(sys.platform != "win32", reason="Windows-specific msvcrt test") +async def test_keyboard_input_handling(): + # Mock sequence of keystrokes: arrow key followed by 'q' + mock_keys = [b'\x00K', b'q'] + mock_kbhit = MagicMock(side_effect=[True, True, False]) + mock_getch = MagicMock(side_effect=mock_keys) + + with patch('msvcrt.kbhit', mock_kbhit), patch('msvcrt.getch', mock_getch): + # profiler = BrowserProfiler() + user_done_event = asyncio.Event() + + # Create a local async function to simulate the keyboard input handling + async def test_listen_for_quit_command(): + if sys.platform == "win32": + while True: + try: + if mock_kbhit(): + raw = mock_getch() + try: + key = raw.decode("utf-8") + except UnicodeDecodeError: + continue + + if len(key) != 1 or not key.isprintable(): + continue + + if key.lower() == "q": + user_done_event.set() + return + + await asyncio.sleep(0.1) + except Exception as e: + continue + + # Run the listener + listener_task = asyncio.create_task(test_listen_for_quit_command()) + + # Wait for the event to be set + try: + await asyncio.wait_for(user_done_event.wait(), timeout=1.0) + assert user_done_event.is_set() + finally: + if not listener_task.done(): + listener_task.cancel() + try: + await listener_task + except asyncio.CancelledError: + pass \ No newline at end of file