refactor(user-agent): improve user agent generation system

Redesign user agent generation to be more modular and reliable:
- Add abstract base class UAGen for user agent generation
- Implement ValidUAGenerator using fake-useragent library
- Add OnlineUAGenerator for fetching real-world user agents
- Update browser configurations to use new UA generation system
- Improve client hints generation

This change makes the user agent system more maintainable and provides better real-world user agent coverage.
This commit is contained in:
UncleCode
2025-01-25 21:16:39 +08:00
parent 69a77222ef
commit 4d7f91b378
3 changed files with 196 additions and 43 deletions

View File

@@ -7,7 +7,7 @@ from .config import (
SOCIAL_MEDIA_DOMAINS, SOCIAL_MEDIA_DOMAINS,
) )
from .user_agent_generator import UserAgentGenerator from .user_agent_generator import UserAgentGenerator, UAGen, ValidUAGenerator, OnlineUAGenerator
from .extraction_strategy import ExtractionStrategy from .extraction_strategy import ExtractionStrategy
from .chunking_strategy import ChunkingStrategy, RegexChunking from .chunking_strategy import ChunkingStrategy, RegexChunking
from .markdown_generation_strategy import MarkdownGenerationStrategy from .markdown_generation_strategy import MarkdownGenerationStrategy
@@ -100,11 +100,13 @@ class BrowserConfig:
cookies: list = None, cookies: list = None,
headers: dict = None, headers: dict = None,
user_agent: str = ( user_agent: str = (
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10.15; rv:109.0) AppleWebKit/537.36 " # "Mozilla/5.0 (Macintosh; Intel Mac OS X 10.15; rv:109.0) AppleWebKit/537.36 "
"(KHTML, like Gecko) Chrome/116.0.5845.187 Safari/604.1 Edg/117.0.2045.47" # "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 "
# "(KHTML, like Gecko) Chrome/116.0.5845.187 Safari/604.1 Edg/117.0.2045.47"
"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 Chrome/116.0.0.0 Safari/537.36"
), ),
user_agent_mode: str = None, user_agent_mode: str = "",
user_agent_generator_config: dict = None, user_agent_generator_config: dict = {},
text_mode: bool = False, text_mode: bool = False,
light_mode: bool = False, light_mode: bool = False,
extra_args: list = None, extra_args: list = None,
@@ -143,17 +145,15 @@ class BrowserConfig:
self.verbose = verbose self.verbose = verbose
self.debugging_port = debugging_port self.debugging_port = debugging_port
user_agenr_generator = UserAgentGenerator() fa_user_agenr_generator = ValidUAGenerator()
if self.user_agent_mode != "random" and self.user_agent_generator_config: if self.user_agent_mode == "random":
self.user_agent = user_agenr_generator.generate( self.user_agent = fa_user_agenr_generator.generate(
**(self.user_agent_generator_config or {}) **(self.user_agent_generator_config or {})
) )
elif self.user_agent_mode == "random":
self.user_agent = user_agenr_generator.generate()
else: else:
pass pass
self.browser_hint = user_agenr_generator.generate_client_hints(self.user_agent) self.browser_hint = UAGen.generate_client_hints(self.user_agent)
self.headers.setdefault("sec-ch-ua", self.browser_hint) self.headers.setdefault("sec-ch-ua", self.browser_hint)
# If persistent context is requested, ensure managed browser is enabled # If persistent context is requested, ensure managed browser is enabled
@@ -382,6 +382,11 @@ class CrawlerRunConfig:
stream (bool): If True, stream the page content as it is being loaded. stream (bool): If True, stream the page content as it is being loaded.
url: str = None # This is not a compulsory parameter url: str = None # This is not a compulsory parameter
check_robots_txt (bool): Whether to check robots.txt rules before crawling. Default: False check_robots_txt (bool): Whether to check robots.txt rules before crawling. Default: False
user_agent (str): Custom User-Agent string to use. Default: None
user_agent_mode (str or None): Mode for generating the user agent (e.g., "random"). If None, use the provided
user_agent as-is. Default: None.
user_agent_generator_config (dict or None): Configuration for user agent generation if user_agent_mode is set.
Default: None.
""" """
def __init__( def __init__(
@@ -453,6 +458,9 @@ class CrawlerRunConfig:
stream: bool = False, stream: bool = False,
url: str = None, url: str = None,
check_robots_txt: bool = False, check_robots_txt: bool = False,
user_agent: str = None,
user_agent_mode: str = None,
user_agent_generator_config: dict = {},
): ):
self.url = url self.url = url
@@ -535,6 +543,11 @@ class CrawlerRunConfig:
# Robots.txt Handling Parameters # Robots.txt Handling Parameters
self.check_robots_txt = check_robots_txt self.check_robots_txt = check_robots_txt
# User Agent Parameters
self.user_agent = user_agent
self.user_agent_mode = user_agent_mode
self.user_agent_generator_config = user_agent_generator_config
# Validate type of extraction strategy and chunking strategy if they are provided # Validate type of extraction strategy and chunking strategy if they are provided
if self.extraction_strategy is not None and not isinstance( if self.extraction_strategy is not None and not isinstance(
self.extraction_strategy, ExtractionStrategy self.extraction_strategy, ExtractionStrategy
@@ -632,6 +645,9 @@ class CrawlerRunConfig:
stream=kwargs.get("stream", False), stream=kwargs.get("stream", False),
url=kwargs.get("url"), url=kwargs.get("url"),
check_robots_txt=kwargs.get("check_robots_txt", False), check_robots_txt=kwargs.get("check_robots_txt", False),
user_agent=kwargs.get("user_agent"),
user_agent_mode=kwargs.get("user_agent_mode"),
user_agent_generator_config=kwargs.get("user_agent_generator_config", {}),
) )
# Create a funciton returns dict of the object # Create a funciton returns dict of the object
@@ -695,6 +711,9 @@ class CrawlerRunConfig:
"stream": self.stream, "stream": self.stream,
"url": self.url, "url": self.url,
"check_robots_txt": self.check_robots_txt, "check_robots_txt": self.check_robots_txt,
"user_agent": self.user_agent,
"user_agent_mode": self.user_agent_mode,
"user_agent_generator_config": self.user_agent_generator_config,
} }
def clone(self, **kwargs): def clone(self, **kwargs):

View File

@@ -23,6 +23,7 @@ from .async_logger import AsyncLogger
from playwright_stealth import StealthConfig from playwright_stealth import StealthConfig
from .ssl_certificate import SSLCertificate from .ssl_certificate import SSLCertificate
from .utils import get_home_folder, get_chromium_path from .utils import get_home_folder, get_chromium_path
from .user_agent_generator import ValidUAGenerator, OnlineUAGenerator
stealth_config = StealthConfig( stealth_config = StealthConfig(
webdriver=True, webdriver=True,
@@ -128,6 +129,7 @@ class ManagedBrowser:
self.host = host self.host = host
self.logger = logger self.logger = logger
self.shutting_down = False self.shutting_down = False
self.cdp_url = cdp_url
async def start(self) -> str: async def start(self) -> str:
""" """
@@ -563,7 +565,7 @@ class BrowserManager:
Context: Browser context object with the specified configurations Context: Browser context object with the specified configurations
""" """
# Base settings # Base settings
user_agent = self.config.headers.get("User-Agent", self.config.user_agent) user_agent = self.config.headers.get("User-Agent", self.config.user_agent)
viewport_settings = { viewport_settings = {
"width": self.config.viewport_width, "width": self.config.viewport_width,
"height": self.config.viewport_height, "height": self.config.viewport_height,
@@ -1269,10 +1271,12 @@ class AsyncPlaywrightCrawlerStrategy(AsyncCrawlerStrategy):
self._downloaded_files = [] self._downloaded_files = []
# Handle user agent with magic mode # Handle user agent with magic mode
user_agent = self.browser_config.user_agent user_agent_to_override = config.user_agent
if config.magic and self.browser_config.user_agent_mode != "random": if user_agent_to_override:
self.browser_config.user_agent = UserAgentGenerator().generate( self.browser_config.user_agent = user_agent_to_override
**(self.browser_config.user_agent_generator_config or {}) elif config.magic or config.user_agent_mode == "random":
self.browser_config.user_agent = ValidUAGenerator().generate(
**(config.user_agent_generator_config or {})
) )
# Get page for session # Get page for session

View File

@@ -2,8 +2,146 @@ import random
from typing import Optional, Literal, List, Dict, Tuple from typing import Optional, Literal, List, Dict, Tuple
import re import re
from abc import ABC, abstractmethod
import random
from fake_useragent import UserAgent
import requests
from lxml import html
import json
from typing import Optional, List, Union, Dict
class UserAgentGenerator: class UAGen(ABC):
@abstractmethod
def generate(self,
browsers: Optional[List[str]] = None,
os: Optional[Union[str, List[str]]] = None,
min_version: float = 0.0,
platforms: Optional[Union[str, List[str]]] = None,
pct_threshold: Optional[float] = None,
fallback: str = "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 Chrome/116.0.0.0 Safari/537.36") -> Union[str, Dict]:
pass
@staticmethod
def generate_client_hints( user_agent: str) -> str:
"""Generate Sec-CH-UA header value based on user agent string"""
def _parse_user_agent(user_agent: str) -> Dict[str, str]:
"""Parse a user agent string to extract browser and version information"""
browsers = {
"chrome": r"Chrome/(\d+)",
"edge": r"Edg/(\d+)",
"safari": r"Version/(\d+)",
"firefox": r"Firefox/(\d+)",
}
result = {}
for browser, pattern in browsers.items():
match = re.search(pattern, user_agent)
if match:
result[browser] = match.group(1)
return result
browsers = _parse_user_agent(user_agent)
# Client hints components
hints = []
# Handle different browser combinations
if "chrome" in browsers:
hints.append(f'"Chromium";v="{browsers["chrome"]}"')
hints.append('"Not_A Brand";v="8"')
if "edge" in browsers:
hints.append(f'"Microsoft Edge";v="{browsers["edge"]}"')
else:
hints.append(f'"Google Chrome";v="{browsers["chrome"]}"')
elif "firefox" in browsers:
# Firefox doesn't typically send Sec-CH-UA
return '""'
elif "safari" in browsers:
# Safari's format for client hints
hints.append(f'"Safari";v="{browsers["safari"]}"')
hints.append('"Not_A Brand";v="8"')
return ", ".join(hints)
class ValidUAGenerator(UAGen):
def __init__(self):
self.ua = UserAgent()
def generate(self,
browsers: Optional[List[str]] = None,
os: Optional[Union[str, List[str]]] = None,
min_version: float = 0.0,
platforms: Optional[Union[str, List[str]]] = None,
pct_threshold: Optional[float] = None,
fallback: str = "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 Chrome/116.0.0.0 Safari/537.36") -> str:
self.ua = UserAgent(
browsers=browsers or ['Chrome', 'Firefox', 'Edge'],
os=os or ['Windows', 'Mac OS X'],
min_version=min_version,
platforms=platforms or ['desktop'],
fallback=fallback
)
return self.ua.random
class OnlineUAGenerator(UAGen):
def __init__(self):
self.agents = []
self._fetch_agents()
def _fetch_agents(self):
try:
response = requests.get(
'https://www.useragents.me/',
timeout=5,
headers={'Accept': 'text/html,application/xhtml+xml'}
)
response.raise_for_status()
tree = html.fromstring(response.content)
json_text = tree.cssselect('#most-common-desktop-useragents-json-csv > div:nth-child(1) > textarea')[0].text
self.agents = json.loads(json_text)
except Exception as e:
print(f"Error fetching agents: {e}")
def generate(self,
browsers: Optional[List[str]] = None,
os: Optional[Union[str, List[str]]] = None,
min_version: float = 0.0,
platforms: Optional[Union[str, List[str]]] = None,
pct_threshold: Optional[float] = None,
fallback: str = "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 Chrome/116.0.0.0 Safari/537.36") -> Dict:
if not self.agents:
self._fetch_agents()
filtered_agents = self.agents
if pct_threshold:
filtered_agents = [a for a in filtered_agents if a['pct'] >= pct_threshold]
if browsers:
filtered_agents = [a for a in filtered_agents
if any(b.lower() in a['ua'].lower() for b in browsers)]
if os:
os_list = [os] if isinstance(os, str) else os
filtered_agents = [a for a in filtered_agents
if any(o.lower() in a['ua'].lower() for o in os_list)]
if platforms:
platform_list = [platforms] if isinstance(platforms, str) else platforms
filtered_agents = [a for a in filtered_agents
if any(p.lower() in a['ua'].lower() for p in platform_list)]
return filtered_agents[0] if filtered_agents else {'ua': fallback, 'pct': 0}
class UserAgentGenerator():
""" """
Generate random user agents with specified constraints. Generate random user agents with specified constraints.
@@ -187,9 +325,15 @@ class UserAgentGenerator:
browser_stack = self.get_browser_stack(num_browsers) browser_stack = self.get_browser_stack(num_browsers)
# Add appropriate legacy token based on browser stack # Add appropriate legacy token based on browser stack
if "Firefox" in str(browser_stack): if "Firefox" in str(browser_stack) or browser_type == "firefox":
components.append(random.choice(self.rendering_engines["gecko"])) components.append(random.choice(self.rendering_engines["gecko"]))
elif "Chrome" in str(browser_stack) or "Safari" in str(browser_stack): elif "Chrome" in str(browser_stack) or "Safari" in str(browser_stack) or browser_type == "chrome":
components.append(self.rendering_engines["chrome_webkit"])
components.append("(KHTML, like Gecko)")
elif "Edge" in str(browser_stack) or browser_type == "edge":
components.append(self.rendering_engines["safari_webkit"])
components.append("(KHTML, like Gecko)")
elif "Safari" in str(browser_stack) or browser_type == "safari":
components.append(self.rendering_engines["chrome_webkit"]) components.append(self.rendering_engines["chrome_webkit"])
components.append("(KHTML, like Gecko)") components.append("(KHTML, like Gecko)")
@@ -273,27 +417,13 @@ class UserAgentGenerator:
# Example usage: # Example usage:
if __name__ == "__main__": if __name__ == "__main__":
generator = UserAgentGenerator()
print(generator.generate()) # Usage example:
generator = ValidUAGenerator()
ua = generator.generate()
print(ua)
generator = OnlineUAGenerator()
ua = generator.generate()
print(ua)
print("\nSingle browser (Chrome):")
print(generator.generate(num_browsers=1, browser_type="chrome"))
print("\nTwo browsers (Gecko/Firefox):")
print(generator.generate(num_browsers=2))
print("\nThree browsers (Chrome/Safari/Edge):")
print(generator.generate(num_browsers=3))
print("\nFirefox on Linux:")
print(
generator.generate(
device_type="desktop",
os_type="linux",
browser_type="firefox",
num_browsers=2,
)
)
print("\nChrome/Safari/Edge on Windows:")
print(generator.generate(device_type="desktop", os_type="windows", num_browsers=3))