feat(browser-profiler): implement cross-platform keyboard listeners and improve quit handling

This commit is contained in:
AHMET YILMAZ
2025-08-08 11:18:34 +08:00
parent 64f37792a7
commit b61b2ee676

View File

@@ -65,6 +65,213 @@ class BrowserProfiler:
self.builtin_config_file = os.path.join(self.builtin_browser_dir, "browser_config.json")
os.makedirs(self.builtin_browser_dir, exist_ok=True)
def _is_windows(self) -> bool:
"""Check if running on Windows platform."""
return sys.platform.startswith('win') or sys.platform == 'cygwin'
def _is_macos(self) -> bool:
"""Check if running on macOS platform."""
return sys.platform == 'darwin'
def _is_linux(self) -> bool:
"""Check if running on Linux platform."""
return sys.platform.startswith('linux')
def _get_quit_message(self, tag: str) -> str:
"""Get appropriate quit message based on context."""
if tag == "PROFILE":
return "Closing browser and saving profile..."
elif tag == "CDP":
return "Closing browser..."
else:
return "Closing browser..."
async def _listen_windows(self, user_done_event, check_browser_process, tag: str):
"""Windows-specific keyboard listener using msvcrt."""
try:
import msvcrt
except ImportError:
raise ImportError("msvcrt module not available on this platform")
while True:
try:
# Check for keyboard input
if msvcrt.kbhit():
raw = msvcrt.getch()
# Handle Unicode decoding more robustly
key = None
try:
key = raw.decode("utf-8")
except UnicodeDecodeError:
try:
# Try different encodings
key = raw.decode("latin1")
except UnicodeDecodeError:
# Skip if we can't decode
continue
# Validate key
if not key or len(key) != 1:
continue
# Check for printable characters only
if not key.isprintable():
continue
# Check for quit command
if key.lower() == "q":
self.logger.info(
self._get_quit_message(tag),
tag=tag,
base_color=LogColor.GREEN
)
user_done_event.set()
return
# Check if browser process ended
if await check_browser_process():
return
# Small delay to prevent busy waiting
await asyncio.sleep(0.1)
except Exception as e:
self.logger.warning(f"Error in Windows keyboard listener: {e}", tag=tag)
# Continue trying instead of failing completely
await asyncio.sleep(0.1)
continue
async def _listen_unix(self, user_done_event: asyncio.Event, check_browser_process, tag: str):
"""Unix/Linux/macOS keyboard listener using termios and select."""
try:
import termios
import tty
import select
except ImportError:
raise ImportError("termios/tty/select modules not available on this platform")
# Get stdin file descriptor
try:
fd = sys.stdin.fileno()
except (AttributeError, OSError):
raise ImportError("stdin is not a terminal")
# Save original terminal settings
old_settings = None
try:
old_settings = termios.tcgetattr(fd)
except termios.error as e:
raise ImportError(f"Cannot get terminal attributes: {e}")
try:
# Switch to non-canonical mode (cbreak mode)
tty.setcbreak(fd)
while True:
try:
# Use select to check if input is available (non-blocking)
# Timeout of 0.5 seconds to periodically check browser process
readable, _, _ = select.select([sys.stdin], [], [], 0.5)
if readable:
# Read one character
key = sys.stdin.read(1)
if key and key.lower() == "q":
self.logger.info(
self._get_quit_message(tag),
tag=tag,
base_color=LogColor.GREEN
)
user_done_event.set()
return
# Check if browser process ended
if await check_browser_process():
return
# Small delay to prevent busy waiting
await asyncio.sleep(0.1)
except (KeyboardInterrupt, EOFError):
# Handle Ctrl+C or EOF gracefully
self.logger.info("Keyboard interrupt received", tag=tag)
user_done_event.set()
return
except Exception as e:
self.logger.warning(f"Error in Unix keyboard listener: {e}", tag=tag)
await asyncio.sleep(0.1)
continue
finally:
# Always restore terminal settings
if old_settings is not None:
try:
termios.tcsetattr(fd, termios.TCSADRAIN, old_settings)
except Exception as e:
self.logger.error(f"Failed to restore terminal settings: {e}", tag=tag)
async def _listen_fallback(self, user_done_event: asyncio.Event, check_browser_process, tag: str):
"""Fallback keyboard listener using simple input() method."""
self.logger.info("Using fallback input mode. Type 'q' and press Enter to quit.", tag=tag)
# Run input in a separate thread to avoid blocking
import threading
import queue
input_queue = queue.Queue()
def input_thread():
"""Thread function to handle input."""
try:
while not user_done_event.is_set():
try:
# Use input() with a prompt
user_input = input("Press 'q' + Enter to quit: ").strip().lower()
input_queue.put(user_input)
if user_input == 'q':
break
except (EOFError, KeyboardInterrupt):
input_queue.put('q')
break
except Exception as e:
self.logger.warning(f"Error in input thread: {e}", tag=tag)
break
except Exception as e:
self.logger.error(f"Input thread failed: {e}", tag=tag)
# Start input thread
thread = threading.Thread(target=input_thread, daemon=True)
thread.start()
try:
while not user_done_event.is_set():
# Check for user input
try:
user_input = input_queue.get_nowait()
if user_input == 'q':
self.logger.info(
self._get_quit_message(tag),
tag=tag,
base_color=LogColor.GREEN
)
user_done_event.set()
return
except queue.Empty:
pass
# Check if browser process ended
if await check_browser_process():
return
# Small delay
await asyncio.sleep(0.5)
except Exception as e:
self.logger.error(f"Fallback listener failed: {e}", tag=tag)
user_done_event.set()
async def create_profile(self,
profile_name: Optional[str] = None,
browser_config: Optional[BrowserConfig] = None) -> Optional[str]:
@@ -180,8 +387,7 @@ class BrowserProfiler:
# Run keyboard input loop in a separate task
async def listen_for_quit_command():
import sys
"""Cross-platform keyboard listener that waits for 'q' key press."""
# First output the prompt
self.logger.info(
"Press {segment} when you've finished using the browser...",
@@ -191,6 +397,7 @@ class BrowserProfiler:
)
async def check_browser_process():
"""Check if browser process is still running."""
if (
managed_browser.browser_process
and managed_browser.browser_process.poll() is not None
@@ -202,75 +409,16 @@ class BrowserProfiler:
return True
return False
# Platform-specific handling
if sys.platform == "win32":
import msvcrt
while True:
# Try platform-specific implementations with fallback
try:
if msvcrt.kbhit():
raw = msvcrt.getch()
try:
key = raw.decode("utf-8")
except UnicodeDecodeError:
# Arrow/function keys come back as multi-byte sequences
continue
# Skip control/multi-byte keys that decoded but aren't printable
if len(key) != 1 or not key.isprintable():
continue
if key.lower() == "q":
self.logger.info(
"Closing browser and saving profile...",
tag="PROFILE",
base_color=LogColor.GREEN
)
user_done_event.set()
return
if await check_browser_process():
return
await asyncio.sleep(0.1)
if self._is_windows():
await self._listen_windows(user_done_event, check_browser_process, "PROFILE")
else:
await self._listen_unix(user_done_event, check_browser_process, "PROFILE")
except Exception as e:
self.logger.error(f"Error in keyboard listener: {e}", tag="PROFILE")
continue
else: # Unix-like
import termios
import tty
import select
# Save original terminal settings
fd = sys.stdin.fileno()
old_settings = termios.tcgetattr(fd)
try:
# Switch to non-canonical mode (no line buffering)
tty.setcbreak(fd)
while True:
# Check if input is available (non-blocking)
readable, _, _ = select.select([sys.stdin], [], [], 0.5)
if readable:
key = sys.stdin.read(1)
if key.lower() == "q":
self.logger.info(
"Closing browser and saving profile...",
tag="PROFILE",
base_color=LogColor.GREEN
)
user_done_event.set()
return
if await check_browser_process():
return
await asyncio.sleep(0.1)
finally:
# Restore terminal settings
termios.tcsetattr(fd, termios.TCSADRAIN, old_settings)
self.logger.warning(f"Platform-specific keyboard listener failed: {e}", tag="PROFILE")
self.logger.info("Falling back to simple input mode...", tag="PROFILE")
await self._listen_fallback(user_done_event, check_browser_process, "PROFILE")
try:
from playwright.async_api import async_playwright
@@ -737,8 +885,7 @@ class BrowserProfiler:
# Run keyboard input loop in a separate task
async def listen_for_quit_command():
import sys
"""Cross-platform keyboard listener that waits for 'q' key press."""
# First output the prompt
self.logger.info(
"Press {segment} to stop the browser and exit...",
@@ -748,65 +895,23 @@ class BrowserProfiler:
)
async def check_browser_process():
"""Check if browser process is still running."""
if managed_browser.browser_process and managed_browser.browser_process.poll() is not None:
self.logger.info("Browser already closed. Ending input listener.", tag="CDP")
user_done_event.set()
return True
return False
if sys.platform == "win32":
import msvcrt
while True:
# Try platform-specific implementations with fallback
try:
if msvcrt.kbhit():
raw = msvcrt.getch()
try:
key = raw.decode("utf-8")
except UnicodeDecodeError:
# Arrow/function keys come back as multi-byte sequences
continue
# Skip control/multi-byte keys that decoded but aren't printable
if len(key) != 1 or not key.isprintable():
continue
if key.lower() == "q":
self.logger.info("Closing browser...", tag="CDP")
user_done_event.set()
return
if await check_browser_process():
return
await asyncio.sleep(0.1)
except Exception as e:
self.logger.error(f"Error in keyboard listener: {e}", tag="CDP")
continue
if self._is_windows():
await self._listen_windows(user_done_event, check_browser_process, "CDP")
else:
import termios
import tty
import select
fd = sys.stdin.fileno()
old_settings = termios.tcgetattr(fd)
try:
tty.setcbreak(fd)
while True:
readable, _, _ = select.select([sys.stdin], [], [], 0.5)
if readable:
key = sys.stdin.read(1)
if key.lower() == "q":
self.logger.info("Closing browser...", tag="CDP")
user_done_event.set()
return
if await check_browser_process():
return
await asyncio.sleep(0.1)
finally:
termios.tcsetattr(fd, termios.TCSADRAIN, old_settings)
await self._listen_unix(user_done_event, check_browser_process, "CDP")
except Exception as e:
self.logger.warning(f"Platform-specific keyboard listener failed: {e}", tag="CDP")
self.logger.info("Falling back to simple input mode...", tag="CDP")
await self._listen_fallback(user_done_event, check_browser_process, "CDP")
# Function to retrieve and display CDP JSON config
async def get_cdp_json(port):