Merge pull request #1170 from prokopis3/fix/create-profile

fix(browser_profiler): cross-platform 'q' to quit - create profile
This commit is contained in:
Nasrin
2025-08-06 16:29:14 +08:00
committed by GitHub
4 changed files with 217 additions and 69 deletions

View File

@@ -180,42 +180,97 @@ class BrowserProfiler:
# Run keyboard input loop in a separate task
async def listen_for_quit_command():
import termios
import tty
import select
import sys
# First output the prompt
self.logger.info("Press 'q' when you've finished using the browser...", tag="PROFILE")
# Save original terminal settings
fd = sys.stdin.fileno()
old_settings = termios.tcgetattr(fd)
try:
# Switch to non-canonical mode (no line buffering)
tty.setcbreak(fd)
self.logger.info(
"Press {segment} when you've finished using the browser...",
tag="PROFILE",
params={"segment": "'q'"}, colors={"segment": LogColor.YELLOW},
base_color=LogColor.CYAN
)
async def check_browser_process():
if (
managed_browser.browser_process
and managed_browser.browser_process.poll() is not None
):
self.logger.info(
"Browser already closed. Ending input listener.", tag="PROFILE"
)
user_done_event.set()
return True
return False
# Platform-specific handling
if sys.platform == "win32":
import msvcrt
while True:
# Check if input is available (non-blocking)
readable, _, _ = select.select([sys.stdin], [], [], 0.5)
if readable:
key = sys.stdin.read(1)
if key.lower() == 'q':
self.logger.info("Closing browser and saving profile...", tag="PROFILE", base_color=LogColor.GREEN)
user_done_event.set()
try:
if msvcrt.kbhit():
raw = msvcrt.getch()
try:
key = raw.decode("utf-8")
except UnicodeDecodeError:
# Arrow/function keys come back as multi-byte sequences
continue
# Skip control/multi-byte keys that decoded but aren't printable
if len(key) != 1 or not key.isprintable():
continue
if key.lower() == "q":
self.logger.info(
"Closing browser and saving profile...",
tag="PROFILE",
base_color=LogColor.GREEN
)
user_done_event.set()
return
if await check_browser_process():
return
# Check if the browser process has already exited
if managed_browser.browser_process and managed_browser.browser_process.poll() is not None:
self.logger.info("Browser already closed. Ending input listener.", tag="PROFILE")
user_done_event.set()
return
await asyncio.sleep(0.1)
finally:
# Restore terminal settings
termios.tcsetattr(fd, termios.TCSADRAIN, old_settings)
await asyncio.sleep(0.1)
except Exception as e:
self.logger.error(f"Error in keyboard listener: {e}", tag="PROFILE")
continue
else: # Unix-like
import termios
import tty
import select
# Save original terminal settings
fd = sys.stdin.fileno()
old_settings = termios.tcgetattr(fd)
try:
# Switch to non-canonical mode (no line buffering)
tty.setcbreak(fd)
while True:
# Check if input is available (non-blocking)
readable, _, _ = select.select([sys.stdin], [], [], 0.5)
if readable:
key = sys.stdin.read(1)
if key.lower() == "q":
self.logger.info(
"Closing browser and saving profile...",
tag="PROFILE",
base_color=LogColor.GREEN
)
user_done_event.set()
return
if await check_browser_process():
return
await asyncio.sleep(0.1)
finally:
# Restore terminal settings
termios.tcsetattr(fd, termios.TCSADRAIN, old_settings)
try:
from playwright.async_api import async_playwright
@@ -682,42 +737,76 @@ class BrowserProfiler:
# Run keyboard input loop in a separate task
async def listen_for_quit_command():
import termios
import tty
import select
import sys
# First output the prompt
self.logger.info("Press 'q' to stop the browser and exit...", tag="CDP")
# Save original terminal settings
fd = sys.stdin.fileno()
old_settings = termios.tcgetattr(fd)
try:
# Switch to non-canonical mode (no line buffering)
tty.setcbreak(fd)
self.logger.info(
"Press {segment} to stop the browser and exit...",
tag="CDP",
params={"segment": "'q'"}, colors={"segment": LogColor.YELLOW},
base_color=LogColor.CYAN
)
async def check_browser_process():
if managed_browser.browser_process and managed_browser.browser_process.poll() is not None:
self.logger.info("Browser already closed. Ending input listener.", tag="CDP")
user_done_event.set()
return True
return False
if sys.platform == "win32":
import msvcrt
while True:
# Check if input is available (non-blocking)
readable, _, _ = select.select([sys.stdin], [], [], 0.5)
if readable:
key = sys.stdin.read(1)
if key.lower() == 'q':
self.logger.info("Closing browser...", tag="CDP")
user_done_event.set()
try:
if msvcrt.kbhit():
raw = msvcrt.getch()
try:
key = raw.decode("utf-8")
except UnicodeDecodeError:
# Arrow/function keys come back as multi-byte sequences
continue
# Skip control/multi-byte keys that decoded but aren't printable
if len(key) != 1 or not key.isprintable():
continue
if key.lower() == "q":
self.logger.info("Closing browser...", tag="CDP")
user_done_event.set()
return
if await check_browser_process():
return
# Check if the browser process has already exited
if managed_browser.browser_process and managed_browser.browser_process.poll() is not None:
self.logger.info("Browser already closed. Ending input listener.", tag="CDP")
user_done_event.set()
return
await asyncio.sleep(0.1)
finally:
# Restore terminal settings
termios.tcsetattr(fd, termios.TCSADRAIN, old_settings)
await asyncio.sleep(0.1)
except Exception as e:
self.logger.error(f"Error in keyboard listener: {e}", tag="CDP")
continue
else:
import termios
import tty
import select
fd = sys.stdin.fileno()
old_settings = termios.tcgetattr(fd)
try:
tty.setcbreak(fd)
while True:
readable, _, _ = select.select([sys.stdin], [], [], 0.5)
if readable:
key = sys.stdin.read(1)
if key.lower() == "q":
self.logger.info("Closing browser...", tag="CDP")
user_done_event.set()
return
if await check_browser_process():
return
await asyncio.sleep(0.1)
finally:
termios.tcsetattr(fd, termios.TCSADRAIN, old_settings)
# Function to retrieve and display CDP JSON config
async def get_cdp_json(port):

View File

@@ -10,11 +10,13 @@ import sys
import uuid
import shutil
from crawl4ai import BrowserProfiler
from crawl4ai.browser_manager import BrowserManager
# Add the project root to Python path if running directly
if __name__ == "__main__":
sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '../..')))
from crawl4ai.browser import BrowserManager, BrowserProfileManager
from crawl4ai.async_configs import BrowserConfig, CrawlerRunConfig
from crawl4ai.async_logger import AsyncLogger
@@ -25,7 +27,7 @@ async def test_profile_creation():
"""Test creating and managing browser profiles."""
logger.info("Testing profile creation and management", tag="TEST")
profile_manager = BrowserProfileManager(logger=logger)
profile_manager = BrowserProfiler(logger=logger)
try:
# List existing profiles
@@ -83,7 +85,7 @@ async def test_profile_with_browser():
"""Test using a profile with a browser."""
logger.info("Testing using a profile with a browser", tag="TEST")
profile_manager = BrowserProfileManager(logger=logger)
profile_manager = BrowserProfiler(logger=logger)
test_profile_name = f"test-browser-profile-{uuid.uuid4().hex[:8]}"
profile_path = None
@@ -101,6 +103,8 @@ async def test_profile_with_browser():
# Now use this profile with a browser
browser_config = BrowserConfig(
user_data_dir=profile_path,
use_managed_browser=True,
use_persistent_context=True,
headless=True
)

View File

@@ -0,0 +1,55 @@
import sys
import pytest
import asyncio
from unittest.mock import patch, MagicMock
from crawl4ai.browser_profiler import BrowserProfiler
@pytest.mark.asyncio
@pytest.mark.skipif(sys.platform != "win32", reason="Windows-specific msvcrt test")
async def test_keyboard_input_handling():
# Mock sequence of keystrokes: arrow key followed by 'q'
mock_keys = [b'\x00K', b'q']
mock_kbhit = MagicMock(side_effect=[True, True, False])
mock_getch = MagicMock(side_effect=mock_keys)
with patch('msvcrt.kbhit', mock_kbhit), patch('msvcrt.getch', mock_getch):
# profiler = BrowserProfiler()
user_done_event = asyncio.Event()
# Create a local async function to simulate the keyboard input handling
async def test_listen_for_quit_command():
if sys.platform == "win32":
while True:
try:
if mock_kbhit():
raw = mock_getch()
try:
key = raw.decode("utf-8")
except UnicodeDecodeError:
continue
if len(key) != 1 or not key.isprintable():
continue
if key.lower() == "q":
user_done_event.set()
return
await asyncio.sleep(0.1)
except Exception as e:
continue
# Run the listener
listener_task = asyncio.create_task(test_listen_for_quit_command())
# Wait for the event to be set
try:
await asyncio.wait_for(user_done_event.wait(), timeout=1.0)
assert user_done_event.is_set()
finally:
if not listener_task.done():
listener_task.cancel()
try:
await listener_task
except asyncio.CancelledError:
pass