fix(browser_profiler): improve keyboard input handling
- fix handling of special keys in Windows msvcrt implementation - Guard against UnicodeDecodeError from multi-byte key sequences - Filter out non-printable characters and control sequences - Add error handling to prevent coroutine crashes - Add unit test to verify keyboard input handling Key changes: - Safe UTF-8 decoding with try/except for special keys - Skip non-printable and multi-byte character sequences - Add broad exception handling in keyboard listener Test runs on Windows only due to msvcrt dependency.
This commit is contained in:
@@ -207,21 +207,35 @@ class BrowserProfiler:
|
||||
import msvcrt
|
||||
|
||||
while True:
|
||||
if msvcrt.kbhit():
|
||||
key = msvcrt.getch().decode("utf-8")
|
||||
if key.lower() == "q":
|
||||
self.logger.info(
|
||||
"Closing browser and saving profile...",
|
||||
tag="PROFILE",
|
||||
base_color=LogColor.GREEN
|
||||
)
|
||||
user_done_event.set()
|
||||
try:
|
||||
if msvcrt.kbhit():
|
||||
raw = msvcrt.getch()
|
||||
try:
|
||||
key = raw.decode("utf-8")
|
||||
except UnicodeDecodeError:
|
||||
# Arrow/function keys come back as multi-byte sequences
|
||||
continue
|
||||
|
||||
# Skip control/multi-byte keys that decoded but aren't printable
|
||||
if len(key) != 1 or not key.isprintable():
|
||||
continue
|
||||
|
||||
if key.lower() == "q":
|
||||
self.logger.info(
|
||||
"Closing browser and saving profile...",
|
||||
tag="PROFILE",
|
||||
base_color=LogColor.GREEN
|
||||
)
|
||||
user_done_event.set()
|
||||
return
|
||||
|
||||
if await check_browser_process():
|
||||
return
|
||||
|
||||
if await check_browser_process():
|
||||
return
|
||||
|
||||
await asyncio.sleep(0.1)
|
||||
await asyncio.sleep(0.1)
|
||||
except Exception as e:
|
||||
self.logger.error(f"Error in keyboard listener: {e}", tag="PROFILE")
|
||||
continue
|
||||
|
||||
else: # Unix-like
|
||||
import termios
|
||||
@@ -713,17 +727,31 @@ class BrowserProfiler:
|
||||
import msvcrt
|
||||
|
||||
while True:
|
||||
if msvcrt.kbhit():
|
||||
key = msvcrt.getch().decode("utf-8")
|
||||
if key.lower() == "q":
|
||||
self.logger.info("Closing browser...", tag="CDP")
|
||||
user_done_event.set()
|
||||
try:
|
||||
if msvcrt.kbhit():
|
||||
raw = msvcrt.getch()
|
||||
try:
|
||||
key = raw.decode("utf-8")
|
||||
except UnicodeDecodeError:
|
||||
# Arrow/function keys come back as multi-byte sequences
|
||||
continue
|
||||
|
||||
# Skip control/multi-byte keys that decoded but aren't printable
|
||||
if len(key) != 1 or not key.isprintable():
|
||||
continue
|
||||
|
||||
if key.lower() == "q":
|
||||
self.logger.info("Closing browser...", tag="CDP")
|
||||
user_done_event.set()
|
||||
return
|
||||
|
||||
if await check_browser_process():
|
||||
return
|
||||
|
||||
if await check_browser_process():
|
||||
return
|
||||
|
||||
await asyncio.sleep(0.1)
|
||||
await asyncio.sleep(0.1)
|
||||
except Exception as e:
|
||||
self.logger.error(f"Error in keyboard listener: {e}", tag="CDP")
|
||||
continue
|
||||
else:
|
||||
import termios
|
||||
import tty
|
||||
|
||||
@@ -10,11 +10,13 @@ import sys
|
||||
import uuid
|
||||
import shutil
|
||||
|
||||
from crawl4ai import BrowserProfiler
|
||||
from crawl4ai.browser_manager import BrowserManager
|
||||
|
||||
# Add the project root to Python path if running directly
|
||||
if __name__ == "__main__":
|
||||
sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '../..')))
|
||||
|
||||
from crawl4ai.browser import BrowserManager, BrowserProfileManager
|
||||
from crawl4ai.async_configs import BrowserConfig, CrawlerRunConfig
|
||||
from crawl4ai.async_logger import AsyncLogger
|
||||
|
||||
@@ -25,7 +27,7 @@ async def test_profile_creation():
|
||||
"""Test creating and managing browser profiles."""
|
||||
logger.info("Testing profile creation and management", tag="TEST")
|
||||
|
||||
profile_manager = BrowserProfileManager(logger=logger)
|
||||
profile_manager = BrowserProfiler(logger=logger)
|
||||
|
||||
try:
|
||||
# List existing profiles
|
||||
@@ -83,7 +85,7 @@ async def test_profile_with_browser():
|
||||
"""Test using a profile with a browser."""
|
||||
logger.info("Testing using a profile with a browser", tag="TEST")
|
||||
|
||||
profile_manager = BrowserProfileManager(logger=logger)
|
||||
profile_manager = BrowserProfiler(logger=logger)
|
||||
test_profile_name = f"test-browser-profile-{uuid.uuid4().hex[:8]}"
|
||||
profile_path = None
|
||||
|
||||
@@ -101,6 +103,8 @@ async def test_profile_with_browser():
|
||||
# Now use this profile with a browser
|
||||
browser_config = BrowserConfig(
|
||||
user_data_dir=profile_path,
|
||||
use_managed_browser=True,
|
||||
use_persistent_context=True,
|
||||
headless=True
|
||||
)
|
||||
|
||||
|
||||
55
tests/profiler/test_keyboard_handle.py
Normal file
55
tests/profiler/test_keyboard_handle.py
Normal file
@@ -0,0 +1,55 @@
|
||||
import sys
|
||||
import pytest
|
||||
import asyncio
|
||||
from unittest.mock import patch, MagicMock
|
||||
from crawl4ai.browser_profiler import BrowserProfiler
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@pytest.mark.skipif(sys.platform != "win32", reason="Windows-specific msvcrt test")
|
||||
async def test_keyboard_input_handling():
|
||||
# Mock sequence of keystrokes: arrow key followed by 'q'
|
||||
mock_keys = [b'\x00K', b'q']
|
||||
mock_kbhit = MagicMock(side_effect=[True, True, False])
|
||||
mock_getch = MagicMock(side_effect=mock_keys)
|
||||
|
||||
with patch('msvcrt.kbhit', mock_kbhit), patch('msvcrt.getch', mock_getch):
|
||||
# profiler = BrowserProfiler()
|
||||
user_done_event = asyncio.Event()
|
||||
|
||||
# Create a local async function to simulate the keyboard input handling
|
||||
async def test_listen_for_quit_command():
|
||||
if sys.platform == "win32":
|
||||
while True:
|
||||
try:
|
||||
if mock_kbhit():
|
||||
raw = mock_getch()
|
||||
try:
|
||||
key = raw.decode("utf-8")
|
||||
except UnicodeDecodeError:
|
||||
continue
|
||||
|
||||
if len(key) != 1 or not key.isprintable():
|
||||
continue
|
||||
|
||||
if key.lower() == "q":
|
||||
user_done_event.set()
|
||||
return
|
||||
|
||||
await asyncio.sleep(0.1)
|
||||
except Exception as e:
|
||||
continue
|
||||
|
||||
# Run the listener
|
||||
listener_task = asyncio.create_task(test_listen_for_quit_command())
|
||||
|
||||
# Wait for the event to be set
|
||||
try:
|
||||
await asyncio.wait_for(user_done_event.wait(), timeout=1.0)
|
||||
assert user_done_event.is_set()
|
||||
finally:
|
||||
if not listener_task.done():
|
||||
listener_task.cancel()
|
||||
try:
|
||||
await listener_task
|
||||
except asyncio.CancelledError:
|
||||
pass
|
||||
Reference in New Issue
Block a user