Implement new async crawler features and stability updates

- Introduced new async crawl strategy with session management.
  - Added BrowserManager for improved browser management.
  - Enhanced documentation, focusing on storage state and usage examples.
  - Improved error handling and logging for sessions.
  - Added JavaScript snippets for customizing navigator properties.
This commit is contained in:
UncleCode
2024-12-10 17:55:29 +08:00
parent 2d31915f0a
commit e130fd8db9
16 changed files with 2750 additions and 749 deletions

File diff suppressed because it is too large Load Diff

View File

@@ -5,7 +5,7 @@ from abc import ABC, abstractmethod
from typing import Callable, Dict, Any, List, Optional, Awaitable from typing import Callable, Dict, Any, List, Optional, Awaitable
import os, sys, shutil import os, sys, shutil
import tempfile, subprocess import tempfile, subprocess
from playwright.async_api import async_playwright, Page, Browser, Error from playwright.async_api import async_playwright, Page, Browser, Error, BrowserContext
from playwright.async_api import TimeoutError as PlaywrightTimeoutError from playwright.async_api import TimeoutError as PlaywrightTimeoutError
from io import BytesIO from io import BytesIO
from PIL import Image, ImageDraw, ImageFont from PIL import Image, ImageDraw, ImageFont
@@ -15,6 +15,7 @@ from pydantic import BaseModel
import hashlib import hashlib
import json import json
import uuid import uuid
from .js_snippet import load_js_script
from .models import AsyncCrawlResponse from .models import AsyncCrawlResponse
from .utils import create_box_message from .utils import create_box_message
from .user_agent_generator import UserAgentGenerator from .user_agent_generator import UserAgentGenerator
@@ -35,6 +36,28 @@ stealth_config = StealthConfig(
media_codecs=True, media_codecs=True,
) )
BROWSER_DISABLE_OPTIONS = [
"--disable-background-networking",
"--disable-background-timer-throttling",
"--disable-backgrounding-occluded-windows",
"--disable-breakpad",
"--disable-client-side-phishing-detection",
"--disable-component-extensions-with-background-pages",
"--disable-default-apps",
"--disable-extensions",
"--disable-features=TranslateUI",
"--disable-hang-monitor",
"--disable-ipc-flooding-protection",
"--disable-popup-blocking",
"--disable-prompt-on-repost",
"--disable-sync",
"--force-color-profile=srgb",
"--metrics-recording-only",
"--no-first-run",
"--password-store=basic",
"--use-mock-keychain"
]
class ManagedBrowser: class ManagedBrowser:
def __init__(self, browser_type: str = "chromium", user_data_dir: Optional[str] = None, headless: bool = False, logger = None, host: str = "localhost", debugging_port: int = 9222): def __init__(self, browser_type: str = "chromium", user_data_dir: Optional[str] = None, headless: bool = False, logger = None, host: str = "localhost", debugging_port: int = 9222):
@@ -197,10 +220,222 @@ class ManagedBrowser:
) )
class BrowserManager:
def __init__(self, use_managed_browser: bool, user_data_dir: Optional[str], headless: bool, logger, browser_type: str, proxy, proxy_config, chrome_channel: str, viewport_width: int, viewport_height: int, accept_downloads: bool, storage_state, ignore_https_errors: bool, java_script_enabled: bool, cookies: List[dict], headers: dict, extra_args: List[str], text_only: bool, light_mode: bool, user_agent: str, browser_hint: str, downloads_path: Optional[str]):
self.use_managed_browser = use_managed_browser
self.user_data_dir = user_data_dir
self.headless = headless
self.logger = logger
self.browser_type = browser_type
self.proxy = proxy
self.proxy_config = proxy_config
self.chrome_channel = chrome_channel
self.viewport_width = viewport_width
self.viewport_height = viewport_height
self.accept_downloads = accept_downloads
self.storage_state = storage_state
self.ignore_https_errors = ignore_https_errors
self.java_script_enabled = java_script_enabled
self.cookies = cookies or []
self.headers = headers or {}
self.extra_args = extra_args or []
self.text_only = text_only
self.light_mode = light_mode
self.browser = None
self.default_context : BrowserContext = None
self.managed_browser = None
self.sessions = {}
self.session_ttl = 1800
self.playwright = None
self.user_agent = user_agent
self.browser_hint = browser_hint
self.downloads_path = downloads_path
async def start(self):
if self.playwright is None:
from playwright.async_api import async_playwright
self.playwright = await async_playwright().start()
if self.use_managed_browser:
self.managed_browser = ManagedBrowser(
browser_type=self.browser_type,
user_data_dir=self.user_data_dir,
headless=self.headless,
logger=self.logger
)
cdp_url = await self.managed_browser.start()
self.browser = await self.playwright.chromium.connect_over_cdp(cdp_url)
contexts = self.browser.contexts
if contexts:
self.default_context = contexts[0]
else:
self.default_context = await self.browser.new_context(
viewport={"width": self.viewport_width, "height": self.viewport_height},
storage_state=self.storage_state,
user_agent=self.headers.get("User-Agent"),
accept_downloads=self.accept_downloads,
ignore_https_errors=self.ignore_https_errors,
java_script_enabled=self.java_script_enabled
)
await self.setup_context(self.default_context)
else:
browser_args = {
"headless": self.headless,
"args": [
"--no-sandbox",
"--disable-dev-shm-usage",
"--no-first-run",
"--no-default-browser-check",
"--disable-infobars",
"--window-position=0,0",
"--ignore-certificate-errors",
"--ignore-certificate-errors-spki-list",
"--disable-blink-features=AutomationControlled",
"--window-position=400,0",
f"--window-size={self.viewport_width},{self.viewport_height}",
]
}
if self.light_mode:
browser_args["args"].extend(BROWSER_DISABLE_OPTIONS)
if self.text_only:
browser_args["args"].extend(['--blink-settings=imagesEnabled=false','--disable-remote-fonts'])
if self.chrome_channel:
browser_args["channel"] = self.chrome_channel
if self.extra_args:
browser_args["args"].extend(self.extra_args)
if self.accept_downloads:
browser_args["downloads_path"] = os.path.join(os.getcwd(), "downloads")
os.makedirs(browser_args["downloads_path"], exist_ok=True)
if self.proxy:
from playwright.async_api import ProxySettings
proxy_settings = ProxySettings(server=self.proxy)
browser_args["proxy"] = proxy_settings
elif self.proxy_config:
from playwright.async_api import ProxySettings
proxy_settings = ProxySettings(
server=self.proxy_config.get("server"),
username=self.proxy_config.get("username"),
password=self.proxy_config.get("password")
)
browser_args["proxy"] = proxy_settings
if self.browser_type == "firefox":
self.browser = await self.playwright.firefox.launch(**browser_args)
elif self.browser_type == "webkit":
self.browser = await self.playwright.webkit.launch(**browser_args)
else:
self.browser = await self.playwright.chromium.launch(**browser_args)
self.default_context = self.browser
# Since default_context in non-managed mode is the browser, no setup needed here.
async def setup_context(self, context : BrowserContext, is_default=False):
# Set extra headers
if self.headers:
await context.set_extra_http_headers(self.headers)
# Add cookies if any
if self.cookies:
await context.add_cookies(self.cookies)
# Ensure storage_state if provided
if self.storage_state:
# If storage_state is a dictionary or file path, Playwright will handle it.
await context.storage_state(path=None)
# If accept_downloads, set timeouts and ensure properties
if self.accept_downloads:
await context.set_default_timeout(60000)
await context.set_default_navigation_timeout(60000)
if self.downloads_path:
context._impl_obj._options["accept_downloads"] = True
context._impl_obj._options["downloads_path"] = self.downloads_path
# If we have a user_agent, override it along with sec-ch-ua
if self.user_agent:
# Merge headers if needed
combined_headers = {"User-Agent": self.user_agent, "sec-ch-ua": self.browser_hint}
combined_headers.update(self.headers)
await context.set_extra_http_headers(combined_headers)
async def close(self):
# Close all active sessions
session_ids = list(self.sessions.keys())
for session_id in session_ids:
await self.kill_session(session_id)
if self.browser:
await self.browser.close()
self.browser = None
if self.managed_browser:
await asyncio.sleep(0.5)
await self.managed_browser.cleanup()
self.managed_browser = None
if self.playwright:
await self.playwright.stop()
self.playwright = None
async def get_page(self, session_id: Optional[str], user_agent: str):
# Cleanup expired sessions
self._cleanup_expired_sessions()
if session_id:
context, page, _ = self.sessions.get(session_id, (None, None, None))
if context and page:
self.sessions[session_id] = (context, page, time.time())
return page, context
# Create a new context/page pair
if self.use_managed_browser:
context = self.default_context
page = await context.new_page()
else:
context = await self.browser.new_context(
user_agent=user_agent,
viewport={"width": self.viewport_width, "height": self.viewport_height},
proxy={"server": self.proxy} if self.proxy else None,
accept_downloads=self.accept_downloads,
storage_state=self.storage_state,
ignore_https_errors=self.ignore_https_errors
)
await self.setup_context(context)
page = await context.new_page()
if session_id:
self.sessions[session_id] = (context, page, time.time())
return page, context
async def kill_session(self, session_id: str):
if session_id in self.sessions:
context, page, _ = self.sessions[session_id]
await page.close()
if not self.use_managed_browser:
await context.close()
del self.sessions[session_id]
def _cleanup_expired_sessions(self):
current_time = time.time()
expired_sessions = [
sid for sid, (_, _, last_used) in self.sessions.items()
if current_time - last_used > self.session_ttl
]
for sid in expired_sessions:
asyncio.create_task(self.kill_session(sid))
class AsyncCrawlerStrategy(ABC): class AsyncCrawlerStrategy(ABC):
@abstractmethod @abstractmethod
async def crawl(self, url: str, **kwargs) -> AsyncCrawlResponse: async def crawl(self, url: str, **kwargs) -> AsyncCrawlResponse:
pass pass # 4 + 3
@abstractmethod @abstractmethod
async def crawl_many(self, urls: List[str], **kwargs) -> List[AsyncCrawlResponse]: async def crawl_many(self, urls: List[str], **kwargs) -> List[AsyncCrawlResponse]:
@@ -265,6 +500,8 @@ class AsyncPlaywrightCrawlerStrategy(AsyncCrawlerStrategy):
self.use_managed_browser = kwargs.get("use_managed_browser", False) self.use_managed_browser = kwargs.get("use_managed_browser", False)
self.user_data_dir = kwargs.get("user_data_dir", None) self.user_data_dir = kwargs.get("user_data_dir", None)
self.use_persistent_context = kwargs.get("use_persistent_context", False) self.use_persistent_context = kwargs.get("use_persistent_context", False)
if self.use_persistent_context:
self.use_managed_browser = True
self.chrome_channel = kwargs.get("chrome_channel", "chrome") self.chrome_channel = kwargs.get("chrome_channel", "chrome")
self.managed_browser = None self.managed_browser = None
self.default_context = None self.default_context = None
@@ -278,13 +515,39 @@ class AsyncPlaywrightCrawlerStrategy(AsyncCrawlerStrategy):
'before_retrieve_html': None 'before_retrieve_html': None
} }
self.extra_args = kwargs.get("extra_args", []) self.extra_args = kwargs.get("extra_args", [])
self.ignore_https_errors = kwargs.get("ignore_https_errors", True)
self.java_script_enabled = kwargs.get("java_script_enabled", True)
self.accept_downloads = kwargs.get("accept_downloads", False) self.accept_downloads = kwargs.get("accept_downloads", False)
self.downloads_path = kwargs.get("downloads_path") self.downloads_path = kwargs.get("downloads_path")
self._downloaded_files = [] # Track downloaded files for current crawl self._downloaded_files = [] # Track downloaded files for current crawl
if self.accept_downloads and not self.downloads_path: if self.accept_downloads and not self.downloads_path:
self.downloads_path = os.path.join(os.getcwd(), "downloads") self.downloads_path = os.path.join(os.getcwd(), "downloads")
os.makedirs(self.downloads_path, exist_ok=True) os.makedirs(self.downloads_path, exist_ok=True)
self.browser_manager = BrowserManager(
use_managed_browser=self.use_managed_browser,
user_data_dir=self.user_data_dir,
headless=self.headless,
logger=self.logger,
browser_type=self.browser_type,
proxy=self.proxy,
proxy_config=self.proxy_config,
chrome_channel=self.chrome_channel,
viewport_width=self.viewport_width,
viewport_height=self.viewport_height,
accept_downloads=self.accept_downloads,
storage_state=self.storage_state,
ignore_https_errors=self.ignore_https_errors,
java_script_enabled=self.java_script_enabled,
cookies=self.cookies,
headers=self.headers,
extra_args=self.extra_args,
text_only=self.text_only,
light_mode=self.light_mode,
user_agent=self.user_agent,
browser_hint=self.browser_hint,
downloads_path=self.downloads_path
)
async def __aenter__(self): async def __aenter__(self):
await self.start() await self.start()
@@ -294,183 +557,14 @@ class AsyncPlaywrightCrawlerStrategy(AsyncCrawlerStrategy):
await self.close() await self.close()
async def start(self): async def start(self):
if self.playwright is None: await self.browser_manager.start()
self.playwright = await async_playwright().start() await self.execute_hook('on_browser_created', self.browser_manager.browser, context = self.browser_manager.default_context)
if self.browser is None:
if self.use_managed_browser:
# Use managed browser approach
self.managed_browser = ManagedBrowser(
browser_type=self.browser_type,
user_data_dir=self.user_data_dir,
headless=self.headless,
logger=self.logger
)
cdp_url = await self.managed_browser.start()
self.browser = await self.playwright.chromium.connect_over_cdp(cdp_url)
# Get the default context that maintains the user profile
contexts = self.browser.contexts
if contexts:
self.default_context = contexts[0]
else:
# If no default context exists, create one
self.default_context = await self.browser.new_context(
# viewport={"width": 1920, "height": 1080}
viewport={"width": self.viewport_width, "height": self.viewport_height},
storage_state=self.storage_state,
)
# Set up the default context
if self.default_context:
await self.default_context.set_extra_http_headers(self.headers)
if self.cookies:
await self.default_context.add_cookies(self.cookies)
if self.storage_state:
# If storage_state is a dictionary or file path, Playwright will handle it.
await self.default_context.storage_state(path=None) # Just ensuring default_context is ready
if self.accept_downloads:
await self.default_context.set_default_timeout(60000)
await self.default_context.set_default_navigation_timeout(60000)
self.default_context._impl_obj._options["accept_downloads"] = True
self.default_context._impl_obj._options["downloads_path"] = self.downloads_path
if self.user_agent:
await self.default_context.set_extra_http_headers({
"User-Agent": self.user_agent,
"sec-ch-ua": self.browser_hint,
# **self.headers
})
else:
# Base browser arguments
browser_args = {
"headless": self.headless,
"args": [
"--no-sandbox",
"--disable-dev-shm-usage",
"--no-first-run",
"--no-default-browser-check",
"--disable-infobars",
"--window-position=0,0",
"--ignore-certificate-errors",
"--ignore-certificate-errors-spki-list",
"--disable-blink-features=AutomationControlled",
"--window-position=400,0",
f"--window-size={self.viewport_width},{self.viewport_height}",
]
}
if self.light_mode:
browser_args["args"].extend([
# "--disable-background-networking",
"--disable-background-timer-throttling",
"--disable-backgrounding-occluded-windows",
"--disable-breakpad",
"--disable-client-side-phishing-detection",
"--disable-component-extensions-with-background-pages",
"--disable-default-apps",
"--disable-extensions",
"--disable-features=TranslateUI",
"--disable-hang-monitor",
"--disable-ipc-flooding-protection",
"--disable-popup-blocking",
"--disable-prompt-on-repost",
"--disable-sync",
"--force-color-profile=srgb",
"--metrics-recording-only",
"--no-first-run",
"--password-store=basic",
"--use-mock-keychain"
])
if self.text_only:
browser_args["args"].extend([
'--blink-settings=imagesEnabled=false',
'--disable-remote-fonts'
])
# Add channel if specified (try Chrome first)
if self.chrome_channel:
browser_args["channel"] = self.chrome_channel
# Add extra args if provided
if self.extra_args:
browser_args["args"].extend(self.extra_args)
# Add downloads path if downloads are enabled
if self.accept_downloads:
browser_args["downloads_path"] = self.downloads_path
# Add proxy settings if a proxy is specified
if self.proxy:
proxy_settings = ProxySettings(server=self.proxy)
browser_args["proxy"] = proxy_settings
elif self.proxy_config:
proxy_settings = ProxySettings(
server=self.proxy_config.get("server"),
username=self.proxy_config.get("username"),
password=self.proxy_config.get("password")
)
browser_args["proxy"] = proxy_settings
try:
# Select the appropriate browser based on the browser_type
if self.browser_type == "firefox":
self.browser = await self.playwright.firefox.launch(**browser_args)
elif self.browser_type == "webkit":
if "viewport" not in browser_args:
browser_args["viewport"] = {"width": self.viewport_width, "height": self.viewport_height}
self.browser = await self.playwright.webkit.launch(**browser_args)
else:
if self.use_persistent_context and self.user_data_dir:
self.browser = await self.playwright.chromium.launch_persistent_context(
user_data_dir=self.user_data_dir,
accept_downloads=self.accept_downloads,
downloads_path=self.downloads_path if self.accept_downloads else None,
**browser_args
)
self.default_context = self.browser
else:
self.browser = await self.playwright.chromium.launch(**browser_args)
self.default_context = self.browser
except Exception as e:
# Fallback to chromium if Chrome channel fails
if "chrome" in str(e) and browser_args.get("channel") == "chrome":
browser_args["channel"] = "chromium"
if self.use_persistent_context and self.user_data_dir:
self.browser = await self.playwright.chromium.launch_persistent_context(
user_data_dir=self.user_data_dir,
**browser_args
)
self.default_context = self.browser
else:
self.browser = await self.playwright.chromium.launch(**browser_args)
else:
raise
await self.execute_hook('on_browser_created', self.browser)
async def close(self): async def close(self):
if self.sleep_on_close: if self.sleep_on_close:
await asyncio.sleep(0.5) await asyncio.sleep(0.5)
# Close all active sessions await self.browser_manager.close()
session_ids = list(self.sessions.keys())
for session_id in session_ids:
await self.kill_session(session_id)
if self.browser:
await self.browser.close()
self.browser = None
if self.managed_browser:
await asyncio.sleep(0.5)
await self.managed_browser.cleanup()
self.managed_browser = None
if self.playwright:
await self.playwright.stop()
self.playwright = None
# Issue #256: Remove __del__ method to avoid potential issues with async cleanup # Issue #256: Remove __del__ method to avoid potential issues with async cleanup
# def __del__(self): # def __del__(self):
@@ -631,35 +725,13 @@ class AsyncPlaywrightCrawlerStrategy(AsyncCrawlerStrategy):
async def create_session(self, **kwargs) -> str: async def create_session(self, **kwargs) -> str:
"""Creates a new browser session and returns its ID.""" """Creates a new browser session and returns its ID."""
if not self.browser: await self.start()
await self.start()
session_id = kwargs.get('session_id') or str(uuid.uuid4()) session_id = kwargs.get('session_id') or str(uuid.uuid4())
if self.use_managed_browser: user_agent = kwargs.get("user_agent", self.user_agent)
page = await self.default_context.new_page() # Use browser_manager to get a fresh page & context assigned to this session_id
self.sessions[session_id] = (self.default_context, page, time.time()) page, context = await self.browser_manager.get_page(session_id, user_agent)
else:
if self.use_persistent_context and self.browser_type in ["chrome", "chromium"]:
context = self.browser
page = await context.new_page()
else:
context = await self.browser.new_context(
user_agent=kwargs.get("user_agent", self.user_agent),
viewport={"width": self.viewport_width, "height": self.viewport_height},
proxy={"server": self.proxy} if self.proxy else None,
accept_downloads=self.accept_downloads,
storage_state=self.storage_state,
ignore_https_errors=True
)
if self.cookies:
await context.add_cookies(self.cookies)
await context.set_extra_http_headers(self.headers)
page = await context.new_page()
self.sessions[session_id] = (context, page, time.time())
return session_id return session_id
async def crawl(self, url: str, **kwargs) -> AsyncCrawlResponse: async def crawl(self, url: str, **kwargs) -> AsyncCrawlResponse:
@@ -720,18 +792,7 @@ class AsyncPlaywrightCrawlerStrategy(AsyncCrawlerStrategy):
else: else:
raise ValueError("URL must start with 'http://', 'https://', 'file://', or 'raw:'") raise ValueError("URL must start with 'http://', 'https://', 'file://', or 'raw:'")
async def _crawl_web(self, url: str, **kwargs) -> AsyncCrawlResponse: async def _crawl_web(self, url: str, **kwargs) -> AsyncCrawlResponse:
"""
Existing web crawling logic remains unchanged.
Args:
url (str): The web URL to crawl.
**kwargs: Additional parameters.
Returns:
AsyncCrawlResponse: The response containing HTML, headers, status code, and optional screenshot.
"""
response_headers = {} response_headers = {}
status_code = None status_code = None
@@ -751,97 +812,13 @@ class AsyncPlaywrightCrawlerStrategy(AsyncCrawlerStrategy):
) )
# Handle page creation differently for managed browser # Handle page creation differently for managed browser
context = None page, context = await self.browser_manager.get_page(session_id, user_agent)
if self.use_managed_browser: await context.add_cookies([{"name": "cookiesEnabled", "value": "true", "url": url}])
if session_id:
# Reuse existing session if available if kwargs.get("override_navigator", False) or kwargs.get("simulate_user", False) or kwargs.get("magic", False):
context, page, _ = self.sessions.get(session_id, (None, None, None)) # Inject scripts to override navigator properties
if not page: await context.add_init_script(load_js_script("navigator_overrider"))
# Create new page in default context if session doesn't exist
page = await self.default_context.new_page()
self.sessions[session_id] = (self.default_context, page, time.time())
else:
# Create new page in default context for non-session requests
page = await self.default_context.new_page()
else:
if session_id:
context, page, _ = self.sessions.get(session_id, (None, None, None))
if not context:
if self.use_persistent_context and self.browser_type in ["chrome", "chromium"]:
# In persistent context, browser is the context
context = self.browser
else:
# Normal context creation for non-persistent or non-Chrome browsers
context = await self.browser.new_context(
user_agent=user_agent,
viewport={"width": self.viewport_width, "height": self.viewport_height},
proxy={"server": self.proxy} if self.proxy else None,
java_script_enabled=True,
accept_downloads=self.accept_downloads,
storage_state=self.storage_state,
# downloads_path=self.downloads_path if self.accept_downloads else None
)
await context.add_cookies([{"name": "cookiesEnabled", "value": "true", "url": url}])
if self.cookies:
await context.add_cookies(self.cookies)
await context.set_extra_http_headers(self.headers)
page = await context.new_page()
self.sessions[session_id] = (context, page, time.time())
else:
if self.use_persistent_context and self.browser_type in ["chrome", "chromium"]:
# In persistent context, browser is the context
context = self.browser
else:
# Normal context creation
context = await self.browser.new_context(
user_agent=user_agent,
# viewport={"width": 1920, "height": 1080},
viewport={"width": self.viewport_width, "height": self.viewport_height},
proxy={"server": self.proxy} if self.proxy else None,
accept_downloads=self.accept_downloads,
storage_state=self.storage_state,
ignore_https_errors=True # Add this line
)
if self.cookies:
await context.add_cookies(self.cookies)
await context.set_extra_http_headers(self.headers)
if kwargs.get("override_navigator", False) or kwargs.get("simulate_user", False) or kwargs.get("magic", False):
# Inject scripts to override navigator properties
await context.add_init_script("""
// Pass the Permissions Test.
const originalQuery = window.navigator.permissions.query;
window.navigator.permissions.query = (parameters) => (
parameters.name === 'notifications' ?
Promise.resolve({ state: Notification.permission }) :
originalQuery(parameters)
);
Object.defineProperty(navigator, 'webdriver', {
get: () => undefined
});
window.navigator.chrome = {
runtime: {},
// Add other properties if necessary
};
Object.defineProperty(navigator, 'plugins', {
get: () => [1, 2, 3, 4, 5],
});
Object.defineProperty(navigator, 'languages', {
get: () => ['en-US', 'en'],
});
Object.defineProperty(document, 'hidden', {
get: () => false
});
Object.defineProperty(document, 'visibilityState', {
get: () => 'visible'
});
""")
page = await context.new_page()
if kwargs.get("magic", False):
await stealth_async(page, stealth_config)
# Add console message and error logging # Add console message and error logging
if kwargs.get("log_console", False): if kwargs.get("log_console", False):
page.on("console", lambda msg: print(f"Console: {msg.text}")) page.on("console", lambda msg: print(f"Console: {msg.text}"))
@@ -1052,62 +1029,7 @@ class AsyncPlaywrightCrawlerStrategy(AsyncCrawlerStrategy):
# Update image dimensions # Update image dimensions
if not self.text_only: if not self.text_only:
update_image_dimensions_js = """ update_image_dimensions_js = load_js_script("update_image_dimensions")
() => {
return new Promise((resolve) => {
const filterImage = (img) => {
// Filter out images that are too small
if (img.width < 100 && img.height < 100) return false;
// Filter out images that are not visible
const rect = img.getBoundingClientRect();
if (rect.width === 0 || rect.height === 0) return false;
// Filter out images with certain class names (e.g., icons, thumbnails)
if (img.classList.contains('icon') || img.classList.contains('thumbnail')) return false;
// Filter out images with certain patterns in their src (e.g., placeholder images)
if (img.src.includes('placeholder') || img.src.includes('icon')) return false;
return true;
};
const images = Array.from(document.querySelectorAll('img')).filter(filterImage);
let imagesLeft = images.length;
if (imagesLeft === 0) {
resolve();
return;
}
const checkImage = (img) => {
if (img.complete && img.naturalWidth !== 0) {
img.setAttribute('width', img.naturalWidth);
img.setAttribute('height', img.naturalHeight);
imagesLeft--;
if (imagesLeft === 0) resolve();
}
};
images.forEach(img => {
checkImage(img);
if (!img.complete) {
img.onload = () => {
checkImage(img);
};
img.onerror = () => {
imagesLeft--;
if (imagesLeft === 0) resolve();
};
}
});
// Fallback timeout of 5 seconds
// setTimeout(() => resolve(), 5000);
resolve();
});
}
"""
try: try:
try: try:
@@ -1245,124 +1167,8 @@ class AsyncPlaywrightCrawlerStrategy(AsyncCrawlerStrategy):
Args: Args:
page (Page): The Playwright page instance page (Page): The Playwright page instance
""" """
remove_overlays_js = """ remove_overlays_js = load_js_script("remove_overlays")
async () => {
// Function to check if element is visible
const isVisible = (elem) => {
const style = window.getComputedStyle(elem);
return style.display !== 'none' &&
style.visibility !== 'hidden' &&
style.opacity !== '0';
};
// Common selectors for popups and overlays
const commonSelectors = [
// Close buttons first
'button[class*="close" i]', 'button[class*="dismiss" i]',
'button[aria-label*="close" i]', 'button[title*="close" i]',
'a[class*="close" i]', 'span[class*="close" i]',
// Cookie notices
'[class*="cookie-banner" i]', '[id*="cookie-banner" i]',
'[class*="cookie-consent" i]', '[id*="cookie-consent" i]',
// Newsletter/subscription dialogs
'[class*="newsletter" i]', '[class*="subscribe" i]',
// Generic popups/modals
'[class*="popup" i]', '[class*="modal" i]',
'[class*="overlay" i]', '[class*="dialog" i]',
'[role="dialog"]', '[role="alertdialog"]'
];
// Try to click close buttons first
for (const selector of commonSelectors.slice(0, 6)) {
const closeButtons = document.querySelectorAll(selector);
for (const button of closeButtons) {
if (isVisible(button)) {
try {
button.click();
await new Promise(resolve => setTimeout(resolve, 100));
} catch (e) {
console.log('Error clicking button:', e);
}
}
}
}
// Remove remaining overlay elements
const removeOverlays = () => {
// Find elements with high z-index
const allElements = document.querySelectorAll('*');
for (const elem of allElements) {
const style = window.getComputedStyle(elem);
const zIndex = parseInt(style.zIndex);
const position = style.position;
if (
isVisible(elem) &&
(zIndex > 999 || position === 'fixed' || position === 'absolute') &&
(
elem.offsetWidth > window.innerWidth * 0.5 ||
elem.offsetHeight > window.innerHeight * 0.5 ||
style.backgroundColor.includes('rgba') ||
parseFloat(style.opacity) < 1
)
) {
elem.remove();
}
}
// Remove elements matching common selectors
for (const selector of commonSelectors) {
const elements = document.querySelectorAll(selector);
elements.forEach(elem => {
if (isVisible(elem)) {
elem.remove();
}
});
}
};
// Remove overlay elements
removeOverlays();
// Remove any fixed/sticky position elements at the top/bottom
const removeFixedElements = () => {
const elements = document.querySelectorAll('*');
elements.forEach(elem => {
const style = window.getComputedStyle(elem);
if (
(style.position === 'fixed' || style.position === 'sticky') &&
isVisible(elem)
) {
elem.remove();
}
});
};
removeFixedElements();
// Remove empty block elements as: div, p, span, etc.
const removeEmptyBlockElements = () => {
const blockElements = document.querySelectorAll('div, p, span, section, article, header, footer, aside, nav, main, ul, ol, li, dl, dt, dd, h1, h2, h3, h4, h5, h6');
blockElements.forEach(elem => {
if (elem.innerText.trim() === '') {
elem.remove();
}
});
};
// Remove margin-right and padding-right from body (often added by modal scripts)
document.body.style.marginRight = '0px';
document.body.style.paddingRight = '0px';
document.body.style.overflow = 'auto';
// Wait a bit for any animations to complete
await new Promise(resolve => setTimeout(resolve, 100));
}
"""
try: try:
await page.evaluate(remove_overlays_js) await page.evaluate(remove_overlays_js)
await page.wait_for_timeout(500) # Wait for any animations to complete await page.wait_for_timeout(500) # Wait for any animations to complete
@@ -1440,9 +1246,10 @@ class AsyncPlaywrightCrawlerStrategy(AsyncCrawlerStrategy):
Optional[str]: Base64-encoded screenshot image or an error image if failed. Optional[str]: Base64-encoded screenshot image or an error image if failed.
""" """
try: try:
if not self.browser: await self.start()
await self.start() # Create a temporary page without a session_id
page = await self.browser.new_page() page, context = await self.browser_manager.get_page(None, self.user_agent)
await page.set_content(html, wait_until='networkidle') await page.set_content(html, wait_until='networkidle')
screenshot = await page.screenshot(full_page=True) screenshot = await page.screenshot(full_page=True)
await page.close() await page.close()

183
crawl4ai/async_tools.py Normal file
View File

@@ -0,0 +1,183 @@
import asyncio
import base64
import time
from abc import ABC, abstractmethod
from typing import Callable, Dict, Any, List, Optional, Awaitable
import os, sys, shutil
import tempfile, subprocess
from playwright.async_api import async_playwright, Page, Browser, Error
from playwright.async_api import TimeoutError as PlaywrightTimeoutError
from io import BytesIO
from PIL import Image, ImageDraw, ImageFont
from pathlib import Path
from playwright.async_api import ProxySettings
from pydantic import BaseModel
import hashlib
import json
import uuid
from .models import AsyncCrawlResponse
from .utils import create_box_message
from .user_agent_generator import UserAgentGenerator
from playwright_stealth import StealthConfig, stealth_async
class ManagedBrowser:
def __init__(self, browser_type: str = "chromium", user_data_dir: Optional[str] = None, headless: bool = False, logger = None, host: str = "localhost", debugging_port: int = 9222):
self.browser_type = browser_type
self.user_data_dir = user_data_dir
self.headless = headless
self.browser_process = None
self.temp_dir = None
self.debugging_port = debugging_port
self.host = host
self.logger = logger
self.shutting_down = False
async def start(self) -> str:
"""
Starts the browser process and returns the CDP endpoint URL.
If user_data_dir is not provided, creates a temporary directory.
"""
# Create temp dir if needed
if not self.user_data_dir:
self.temp_dir = tempfile.mkdtemp(prefix="browser-profile-")
self.user_data_dir = self.temp_dir
# Get browser path and args based on OS and browser type
browser_path = self._get_browser_path()
args = self._get_browser_args()
# Start browser process
try:
self.browser_process = subprocess.Popen(
args,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE
)
# Monitor browser process output for errors
asyncio.create_task(self._monitor_browser_process())
await asyncio.sleep(2) # Give browser time to start
return f"http://{self.host}:{self.debugging_port}"
except Exception as e:
await self.cleanup()
raise Exception(f"Failed to start browser: {e}")
async def _monitor_browser_process(self):
"""Monitor the browser process for unexpected termination."""
if self.browser_process:
try:
stdout, stderr = await asyncio.gather(
asyncio.to_thread(self.browser_process.stdout.read),
asyncio.to_thread(self.browser_process.stderr.read)
)
# Check shutting_down flag BEFORE logging anything
if self.browser_process.poll() is not None:
if not self.shutting_down:
self.logger.error(
message="Browser process terminated unexpectedly | Code: {code} | STDOUT: {stdout} | STDERR: {stderr}",
tag="ERROR",
params={
"code": self.browser_process.returncode,
"stdout": stdout.decode(),
"stderr": stderr.decode()
}
)
await self.cleanup()
else:
self.logger.info(
message="Browser process terminated normally | Code: {code}",
tag="INFO",
params={"code": self.browser_process.returncode}
)
except Exception as e:
if not self.shutting_down:
self.logger.error(
message="Error monitoring browser process: {error}",
tag="ERROR",
params={"error": str(e)}
)
def _get_browser_path(self) -> str:
"""Returns the browser executable path based on OS and browser type"""
if sys.platform == "darwin": # macOS
paths = {
"chromium": "/Applications/Google Chrome.app/Contents/MacOS/Google Chrome",
"firefox": "/Applications/Firefox.app/Contents/MacOS/firefox",
"webkit": "/Applications/Safari.app/Contents/MacOS/Safari"
}
elif sys.platform == "win32": # Windows
paths = {
"chromium": "C:\\Program Files\\Google\\Chrome\\Application\\chrome.exe",
"firefox": "C:\\Program Files\\Mozilla Firefox\\firefox.exe",
"webkit": None # WebKit not supported on Windows
}
else: # Linux
paths = {
"chromium": "google-chrome",
"firefox": "firefox",
"webkit": None # WebKit not supported on Linux
}
return paths.get(self.browser_type)
def _get_browser_args(self) -> List[str]:
"""Returns browser-specific command line arguments"""
base_args = [self._get_browser_path()]
if self.browser_type == "chromium":
args = [
f"--remote-debugging-port={self.debugging_port}",
f"--user-data-dir={self.user_data_dir}",
]
if self.headless:
args.append("--headless=new")
elif self.browser_type == "firefox":
args = [
"--remote-debugging-port", str(self.debugging_port),
"--profile", self.user_data_dir,
]
if self.headless:
args.append("--headless")
else:
raise NotImplementedError(f"Browser type {self.browser_type} not supported")
return base_args + args
async def cleanup(self):
"""Cleanup browser process and temporary directory"""
# Set shutting_down flag BEFORE any termination actions
self.shutting_down = True
if self.browser_process:
try:
self.browser_process.terminate()
# Wait for process to end gracefully
for _ in range(10): # 10 attempts, 100ms each
if self.browser_process.poll() is not None:
break
await asyncio.sleep(0.1)
# Force kill if still running
if self.browser_process.poll() is None:
self.browser_process.kill()
await asyncio.sleep(0.1) # Brief wait for kill to take effect
except Exception as e:
self.logger.error(
message="Error terminating browser: {error}",
tag="ERROR",
params={"error": str(e)}
)
if self.temp_dir and os.path.exists(self.temp_dir):
try:
shutil.rmtree(self.temp_dir)
except Exception as e:
self.logger.error(
message="Error removing temporary directory: {error}",
tag="ERROR",
params={"error": str(e)}
)

View File

@@ -7,7 +7,7 @@ from pathlib import Path
from typing import Optional, List, Union from typing import Optional, List, Union
import json import json
import asyncio import asyncio
from contextlib import nullcontext from contextlib import nullcontext, asynccontextmanager
from .models import CrawlResult, MarkdownGenerationResult from .models import CrawlResult, MarkdownGenerationResult
from .async_database import async_db_manager from .async_database import async_db_manager
from .chunking_strategy import * from .chunking_strategy import *
@@ -122,15 +122,14 @@ class AsyncWebCrawler:
async def __aexit__(self, exc_type, exc_val, exc_tb): async def __aexit__(self, exc_type, exc_val, exc_tb):
await self.crawler_strategy.__aexit__(exc_type, exc_val, exc_tb) await self.crawler_strategy.__aexit__(exc_type, exc_val, exc_tb)
@asynccontextmanager
async def nullcontext(self):
yield
async def awarmup(self): async def awarmup(self):
"""Initialize the crawler with warm-up sequence.""" """Initialize the crawler with warm-up sequence."""
self.logger.info(f"Crawl4AI {crawl4ai_version}", tag="INIT") self.logger.info(f"Crawl4AI {crawl4ai_version}", tag="INIT")
# if self.verbose:
# print(f"{Fore.CYAN}{self.tag_format('INIT')} {self.log_icons['INIT']} Crawl4AI {crawl4ai_version}{Style.RESET_ALL}")
# print(f"{Fore.CYAN}{self.tag_format('INIT')} {self.log_icons['INIT']} Warming up AsyncWebCrawler{Style.RESET_ALL}")
self.ready = True self.ready = True
# if self.verbose:
# print(f"{Fore.GREEN}{self.tag_format('READY')} {self.log_icons['READY']} AsyncWebCrawler initialized{Style.RESET_ALL}")
async def arun( async def arun(
self, self,
@@ -186,7 +185,7 @@ class AsyncWebCrawler:
if not isinstance(url, str) or not url: if not isinstance(url, str) or not url:
raise ValueError("Invalid URL, make sure the URL is a non-empty string") raise ValueError("Invalid URL, make sure the URL is a non-empty string")
async with self._lock or nullcontext(): async with self._lock or self.nullcontext(): # Lock for thread safety previously -> nullcontext():
try: try:
# Handle deprecated parameters # Handle deprecated parameters
if any([bypass_cache, disable_cache, no_cache_read, no_cache_write]): if any([bypass_cache, disable_cache, no_cache_read, no_cache_write]):

View File

@@ -14,15 +14,11 @@ from .content_filter_strategy import RelevantContentFilter, BM25ContentFilter#,
from .markdown_generation_strategy import MarkdownGenerationStrategy, DefaultMarkdownGenerator from .markdown_generation_strategy import MarkdownGenerationStrategy, DefaultMarkdownGenerator
from .models import MarkdownGenerationResult from .models import MarkdownGenerationResult
from .utils import ( from .utils import (
sanitize_input_encode,
sanitize_html,
extract_metadata, extract_metadata,
InvalidCSSSelectorError,
CustomHTML2Text,
normalize_url, normalize_url,
is_external_url is_external_url
) )
from .tools import profile_and_time
# Pre-compile regular expressions for Open Graph and Twitter metadata # Pre-compile regular expressions for Open Graph and Twitter metadata
OG_REGEX = re.compile(r'^og:') OG_REGEX = re.compile(r'^og:')
@@ -76,10 +72,10 @@ class WebScrapingStrategy(ContentScrapingStrategy):
log_method(message=message, tag=tag, **kwargs) log_method(message=message, tag=tag, **kwargs)
def scrap(self, url: str, html: str, **kwargs) -> Dict[str, Any]: def scrap(self, url: str, html: str, **kwargs) -> Dict[str, Any]:
return self._get_content_of_website_optimized(url, html, is_async=False, **kwargs) return self._scrap(url, html, is_async=False, **kwargs)
async def ascrap(self, url: str, html: str, **kwargs) -> Dict[str, Any]: async def ascrap(self, url: str, html: str, **kwargs) -> Dict[str, Any]:
return await asyncio.to_thread(self._get_content_of_website_optimized, url, html, **kwargs) return await asyncio.to_thread(self._scrap, url, html, **kwargs)
def _generate_markdown_content(self, def _generate_markdown_content(self,
cleaned_html: str, cleaned_html: str,
@@ -103,8 +99,6 @@ class WebScrapingStrategy(ContentScrapingStrategy):
html2text_options=kwargs.get('html2text', {}) html2text_options=kwargs.get('html2text', {})
) )
help_message = """"""
return { return {
'markdown': markdown_result.raw_markdown, 'markdown': markdown_result.raw_markdown,
'fit_markdown': markdown_result.fit_markdown, 'fit_markdown': markdown_result.fit_markdown,
@@ -126,38 +120,40 @@ class WebScrapingStrategy(ContentScrapingStrategy):
} }
# Legacy method # Legacy method
h = CustomHTML2Text() """
h.update_params(**kwargs.get('html2text', {})) # h = CustomHTML2Text()
markdown = h.handle(cleaned_html) # h.update_params(**kwargs.get('html2text', {}))
markdown = markdown.replace(' ```', '```') # markdown = h.handle(cleaned_html)
# markdown = markdown.replace(' ```', '```')
fit_markdown = "Set flag 'fit_markdown' to True to get cleaned HTML content." # fit_markdown = "Set flag 'fit_markdown' to True to get cleaned HTML content."
fit_html = "Set flag 'fit_markdown' to True to get cleaned HTML content." # fit_html = "Set flag 'fit_markdown' to True to get cleaned HTML content."
if kwargs.get('content_filter', None) or kwargs.get('fit_markdown', False): # if kwargs.get('content_filter', None) or kwargs.get('fit_markdown', False):
content_filter = kwargs.get('content_filter', None) # content_filter = kwargs.get('content_filter', None)
if not content_filter: # if not content_filter:
content_filter = BM25ContentFilter( # content_filter = BM25ContentFilter(
user_query=kwargs.get('fit_markdown_user_query', None), # user_query=kwargs.get('fit_markdown_user_query', None),
bm25_threshold=kwargs.get('fit_markdown_bm25_threshold', 1.0) # bm25_threshold=kwargs.get('fit_markdown_bm25_threshold', 1.0)
) # )
fit_html = content_filter.filter_content(html) # fit_html = content_filter.filter_content(html)
fit_html = '\n'.join('<div>{}</div>'.format(s) for s in fit_html) # fit_html = '\n'.join('<div>{}</div>'.format(s) for s in fit_html)
fit_markdown = h.handle(fit_html) # fit_markdown = h.handle(fit_html)
markdown_v2 = MarkdownGenerationResult( # markdown_v2 = MarkdownGenerationResult(
raw_markdown=markdown, # raw_markdown=markdown,
markdown_with_citations=markdown, # markdown_with_citations=markdown,
references_markdown=markdown, # references_markdown=markdown,
fit_markdown=fit_markdown # fit_markdown=fit_markdown
) # )
return { # return {
'markdown': markdown, # 'markdown': markdown,
'fit_markdown': fit_markdown, # 'fit_markdown': fit_markdown,
'fit_html': fit_html, # 'fit_html': fit_html,
'markdown_v2' : markdown_v2 # 'markdown_v2' : markdown_v2
} # }
"""
def flatten_nested_elements(self, node): def flatten_nested_elements(self, node):
if isinstance(node, NavigableString): if isinstance(node, NavigableString):
@@ -483,7 +479,7 @@ class WebScrapingStrategy(ContentScrapingStrategy):
) )
return False return False
def _get_content_of_website_optimized(self, url: str, html: str, word_count_threshold: int = MIN_WORD_THRESHOLD, css_selector: str = None, **kwargs) -> Dict[str, Any]: def _scrap(self, url: str, html: str, word_count_threshold: int = MIN_WORD_THRESHOLD, css_selector: str = None, **kwargs) -> Dict[str, Any]:
success = True success = True
if not html: if not html:
return None return None

View File

@@ -1006,10 +1006,136 @@ class HTML2Text(html.parser.HTMLParser):
newlines += 1 newlines += 1
return result return result
def html2text(html: str, baseurl: str = "", bodywidth: Optional[int] = None) -> str: def html2text(html: str, baseurl: str = "", bodywidth: Optional[int] = None) -> str:
if bodywidth is None: if bodywidth is None:
bodywidth = config.BODY_WIDTH bodywidth = config.BODY_WIDTH
h = HTML2Text(baseurl=baseurl, bodywidth=bodywidth) h = HTML2Text(baseurl=baseurl, bodywidth=bodywidth)
return h.handle(html) return h.handle(html)
class CustomHTML2Text(HTML2Text):
def __init__(self, *args, handle_code_in_pre=False, **kwargs):
super().__init__(*args, **kwargs)
self.inside_pre = False
self.inside_code = False
self.preserve_tags = set() # Set of tags to preserve
self.current_preserved_tag = None
self.preserved_content = []
self.preserve_depth = 0
self.handle_code_in_pre = handle_code_in_pre
# Configuration options
self.skip_internal_links = False
self.single_line_break = False
self.mark_code = False
self.include_sup_sub = False
self.body_width = 0
self.ignore_mailto_links = True
self.ignore_links = False
self.escape_backslash = False
self.escape_dot = False
self.escape_plus = False
self.escape_dash = False
self.escape_snob = False
def update_params(self, **kwargs):
"""Update parameters and set preserved tags."""
for key, value in kwargs.items():
if key == 'preserve_tags':
self.preserve_tags = set(value)
elif key == 'handle_code_in_pre':
self.handle_code_in_pre = value
else:
setattr(self, key, value)
def handle_tag(self, tag, attrs, start):
# Handle preserved tags
if tag in self.preserve_tags:
if start:
if self.preserve_depth == 0:
self.current_preserved_tag = tag
self.preserved_content = []
# Format opening tag with attributes
attr_str = ''.join(f' {k}="{v}"' for k, v in attrs.items() if v is not None)
self.preserved_content.append(f'<{tag}{attr_str}>')
self.preserve_depth += 1
return
else:
self.preserve_depth -= 1
if self.preserve_depth == 0:
self.preserved_content.append(f'</{tag}>')
# Output the preserved HTML block with proper spacing
preserved_html = ''.join(self.preserved_content)
self.o('\n' + preserved_html + '\n')
self.current_preserved_tag = None
return
# If we're inside a preserved tag, collect all content
if self.preserve_depth > 0:
if start:
# Format nested tags with attributes
attr_str = ''.join(f' {k}="{v}"' for k, v in attrs.items() if v is not None)
self.preserved_content.append(f'<{tag}{attr_str}>')
else:
self.preserved_content.append(f'</{tag}>')
return
# Handle pre tags
if tag == 'pre':
if start:
self.o('```\n') # Markdown code block start
self.inside_pre = True
else:
self.o('\n```\n') # Markdown code block end
self.inside_pre = False
elif tag == 'code':
if self.inside_pre and not self.handle_code_in_pre:
# Ignore code tags inside pre blocks if handle_code_in_pre is False
return
if start:
self.o('`') # Markdown inline code start
self.inside_code = True
else:
self.o('`') # Markdown inline code end
self.inside_code = False
else:
super().handle_tag(tag, attrs, start)
def handle_data(self, data, entity_char=False):
"""Override handle_data to capture content within preserved tags."""
if self.preserve_depth > 0:
self.preserved_content.append(data)
return
if self.inside_pre:
# Output the raw content for pre blocks, including content inside code tags
self.o(data) # Directly output the data as-is (preserve newlines)
return
if self.inside_code:
# Inline code: no newlines allowed
self.o(data.replace('\n', ' '))
return
# Default behavior for other tags
super().handle_data(data, entity_char)
# # Handle pre tags
# if tag == 'pre':
# if start:
# self.o('```\n')
# self.inside_pre = True
# else:
# self.o('\n```')
# self.inside_pre = False
# # elif tag in ["h1", "h2", "h3", "h4", "h5", "h6"]:
# # pass
# else:
# super().handle_tag(tag, attrs, start)
# def handle_data(self, data, entity_char=False):
# """Override handle_data to capture content within preserved tags."""
# if self.preserve_depth > 0:
# self.preserved_content.append(data)
# return
# super().handle_data(data, entity_char)

View File

@@ -0,0 +1,15 @@
import os, sys
# Create a function get name of a js script, then load from the CURRENT folder of this script and return its content as string, make sure its error free
def load_js_script(script_name):
# Get the path of the current script
current_script_path = os.path.dirname(os.path.realpath(__file__))
# Get the path of the script to load
script_path = os.path.join(current_script_path, script_name + '.js')
# Check if the script exists
if not os.path.exists(script_path):
raise ValueError(f"Script {script_name} not found in the folder {current_script_path}")
# Load the content of the script
with open(script_path, 'r') as f:
script_content = f.read()
return script_content

View File

@@ -0,0 +1,25 @@
// Pass the Permissions Test.
const originalQuery = window.navigator.permissions.query;
window.navigator.permissions.query = (parameters) =>
parameters.name === "notifications"
? Promise.resolve({ state: Notification.permission })
: originalQuery(parameters);
Object.defineProperty(navigator, "webdriver", {
get: () => undefined,
});
window.navigator.chrome = {
runtime: {},
// Add other properties if necessary
};
Object.defineProperty(navigator, "plugins", {
get: () => [1, 2, 3, 4, 5],
});
Object.defineProperty(navigator, "languages", {
get: () => ["en-US", "en"],
});
Object.defineProperty(document, "hidden", {
get: () => false,
});
Object.defineProperty(document, "visibilityState", {
get: () => "visible",
});

View File

@@ -0,0 +1,119 @@
async () => {
// Function to check if element is visible
const isVisible = (elem) => {
const style = window.getComputedStyle(elem);
return style.display !== "none" && style.visibility !== "hidden" && style.opacity !== "0";
};
// Common selectors for popups and overlays
const commonSelectors = [
// Close buttons first
'button[class*="close" i]',
'button[class*="dismiss" i]',
'button[aria-label*="close" i]',
'button[title*="close" i]',
'a[class*="close" i]',
'span[class*="close" i]',
// Cookie notices
'[class*="cookie-banner" i]',
'[id*="cookie-banner" i]',
'[class*="cookie-consent" i]',
'[id*="cookie-consent" i]',
// Newsletter/subscription dialogs
'[class*="newsletter" i]',
'[class*="subscribe" i]',
// Generic popups/modals
'[class*="popup" i]',
'[class*="modal" i]',
'[class*="overlay" i]',
'[class*="dialog" i]',
'[role="dialog"]',
'[role="alertdialog"]',
];
// Try to click close buttons first
for (const selector of commonSelectors.slice(0, 6)) {
const closeButtons = document.querySelectorAll(selector);
for (const button of closeButtons) {
if (isVisible(button)) {
try {
button.click();
await new Promise((resolve) => setTimeout(resolve, 100));
} catch (e) {
console.log("Error clicking button:", e);
}
}
}
}
// Remove remaining overlay elements
const removeOverlays = () => {
// Find elements with high z-index
const allElements = document.querySelectorAll("*");
for (const elem of allElements) {
const style = window.getComputedStyle(elem);
const zIndex = parseInt(style.zIndex);
const position = style.position;
if (
isVisible(elem) &&
(zIndex > 999 || position === "fixed" || position === "absolute") &&
(elem.offsetWidth > window.innerWidth * 0.5 ||
elem.offsetHeight > window.innerHeight * 0.5 ||
style.backgroundColor.includes("rgba") ||
parseFloat(style.opacity) < 1)
) {
elem.remove();
}
}
// Remove elements matching common selectors
for (const selector of commonSelectors) {
const elements = document.querySelectorAll(selector);
elements.forEach((elem) => {
if (isVisible(elem)) {
elem.remove();
}
});
}
};
// Remove overlay elements
removeOverlays();
// Remove any fixed/sticky position elements at the top/bottom
const removeFixedElements = () => {
const elements = document.querySelectorAll("*");
elements.forEach((elem) => {
const style = window.getComputedStyle(elem);
if ((style.position === "fixed" || style.position === "sticky") && isVisible(elem)) {
elem.remove();
}
});
};
removeFixedElements();
// Remove empty block elements as: div, p, span, etc.
const removeEmptyBlockElements = () => {
const blockElements = document.querySelectorAll(
"div, p, span, section, article, header, footer, aside, nav, main, ul, ol, li, dl, dt, dd, h1, h2, h3, h4, h5, h6"
);
blockElements.forEach((elem) => {
if (elem.innerText.trim() === "") {
elem.remove();
}
});
};
// Remove margin-right and padding-right from body (often added by modal scripts)
document.body.style.marginRight = "0px";
document.body.style.paddingRight = "0px";
document.body.style.overflow = "auto";
// Wait a bit for any animations to complete
await new Promise((resolve) => setTimeout(resolve, 100));
};

View File

@@ -0,0 +1,54 @@
() => {
return new Promise((resolve) => {
const filterImage = (img) => {
// Filter out images that are too small
if (img.width < 100 && img.height < 100) return false;
// Filter out images that are not visible
const rect = img.getBoundingClientRect();
if (rect.width === 0 || rect.height === 0) return false;
// Filter out images with certain class names (e.g., icons, thumbnails)
if (img.classList.contains("icon") || img.classList.contains("thumbnail")) return false;
// Filter out images with certain patterns in their src (e.g., placeholder images)
if (img.src.includes("placeholder") || img.src.includes("icon")) return false;
return true;
};
const images = Array.from(document.querySelectorAll("img")).filter(filterImage);
let imagesLeft = images.length;
if (imagesLeft === 0) {
resolve();
return;
}
const checkImage = (img) => {
if (img.complete && img.naturalWidth !== 0) {
img.setAttribute("width", img.naturalWidth);
img.setAttribute("height", img.naturalHeight);
imagesLeft--;
if (imagesLeft === 0) resolve();
}
};
images.forEach((img) => {
checkImage(img);
if (!img.complete) {
img.onload = () => {
checkImage(img);
};
img.onerror = () => {
imagesLeft--;
if (imagesLeft === 0) resolve();
};
}
});
// Fallback timeout of 5 seconds
// setTimeout(() => resolve(), 5000);
resolve();
});
};

View File

@@ -1,7 +1,7 @@
from abc import ABC, abstractmethod from abc import ABC, abstractmethod
from typing import Optional, Dict, Any, Tuple from typing import Optional, Dict, Any, Tuple
from .models import MarkdownGenerationResult from .models import MarkdownGenerationResult
from .utils import CustomHTML2Text from .html2text import CustomHTML2Text
from .content_filter_strategy import RelevantContentFilter, BM25ContentFilter from .content_filter_strategy import RelevantContentFilter, BM25ContentFilter
import re import re
from urllib.parse import urljoin from urllib.parse import urljoin
@@ -9,6 +9,17 @@ from urllib.parse import urljoin
# Pre-compile the regex pattern # Pre-compile the regex pattern
LINK_PATTERN = re.compile(r'!?\[([^\]]+)\]\(([^)]+?)(?:\s+"([^"]*)")?\)') LINK_PATTERN = re.compile(r'!?\[([^\]]+)\]\(([^)]+?)(?:\s+"([^"]*)")?\)')
def fast_urljoin(base: str, url: str) -> str:
"""Fast URL joining for common cases."""
if url.startswith(('http://', 'https://', 'mailto:', '//')):
return url
if url.startswith('/'):
# Handle absolute paths
if base.endswith('/'):
return base[:-1] + url
return base + url
return urljoin(base, url)
class MarkdownGenerationStrategy(ABC): class MarkdownGenerationStrategy(ABC):
"""Abstract base class for markdown generation strategies.""" """Abstract base class for markdown generation strategies."""
def __init__(self, content_filter: Optional[RelevantContentFilter] = None, options: Optional[Dict[str, Any]] = None): def __init__(self, content_filter: Optional[RelevantContentFilter] = None, options: Optional[Dict[str, Any]] = None):
@@ -118,13 +129,3 @@ class DefaultMarkdownGenerator(MarkdownGenerationStrategy):
fit_html=filtered_html, fit_html=filtered_html,
) )
def fast_urljoin(base: str, url: str) -> str:
"""Fast URL joining for common cases."""
if url.startswith(('http://', 'https://', 'mailto:', '//')):
return url
if url.startswith('/'):
# Handle absolute paths
if base.endswith('/'):
return base[:-1] + url
return base + url
return urljoin(base, url)

View File

@@ -1,34 +0,0 @@
import time
import cProfile
import pstats
from functools import wraps
def profile_and_time(func):
@wraps(func)
def wrapper(self, *args, **kwargs):
# Start timer
start_time = time.perf_counter()
# Setup profiler
profiler = cProfile.Profile()
profiler.enable()
# Run function
result = func(self, *args, **kwargs)
# Stop profiler
profiler.disable()
# Calculate elapsed time
elapsed_time = time.perf_counter() - start_time
# Print timing
print(f"[PROFILER] Scraping completed in {elapsed_time:.2f} seconds")
# Print profiling stats
stats = pstats.Stats(profiler)
stats.sort_stats('cumulative') # Sort by cumulative time
stats.print_stats(20) # Print top 20 time-consuming functions
return result
return wrapper

View File

@@ -19,139 +19,13 @@ from typing import Optional, Tuple, Dict, Any
import xxhash import xxhash
from colorama import Fore, Style, init from colorama import Fore, Style, init
import textwrap import textwrap
import cProfile
from .html2text import HTML2Text import pstats
class CustomHTML2Text(HTML2Text): from functools import wraps
def __init__(self, *args, handle_code_in_pre=False, **kwargs):
super().__init__(*args, **kwargs)
self.inside_pre = False
self.inside_code = False
self.preserve_tags = set() # Set of tags to preserve
self.current_preserved_tag = None
self.preserved_content = []
self.preserve_depth = 0
self.handle_code_in_pre = handle_code_in_pre
# Configuration options
self.skip_internal_links = False
self.single_line_break = False
self.mark_code = False
self.include_sup_sub = False
self.body_width = 0
self.ignore_mailto_links = True
self.ignore_links = False
self.escape_backslash = False
self.escape_dot = False
self.escape_plus = False
self.escape_dash = False
self.escape_snob = False
def update_params(self, **kwargs):
"""Update parameters and set preserved tags."""
for key, value in kwargs.items():
if key == 'preserve_tags':
self.preserve_tags = set(value)
elif key == 'handle_code_in_pre':
self.handle_code_in_pre = value
else:
setattr(self, key, value)
def handle_tag(self, tag, attrs, start):
# Handle preserved tags
if tag in self.preserve_tags:
if start:
if self.preserve_depth == 0:
self.current_preserved_tag = tag
self.preserved_content = []
# Format opening tag with attributes
attr_str = ''.join(f' {k}="{v}"' for k, v in attrs.items() if v is not None)
self.preserved_content.append(f'<{tag}{attr_str}>')
self.preserve_depth += 1
return
else:
self.preserve_depth -= 1
if self.preserve_depth == 0:
self.preserved_content.append(f'</{tag}>')
# Output the preserved HTML block with proper spacing
preserved_html = ''.join(self.preserved_content)
self.o('\n' + preserved_html + '\n')
self.current_preserved_tag = None
return
# If we're inside a preserved tag, collect all content
if self.preserve_depth > 0:
if start:
# Format nested tags with attributes
attr_str = ''.join(f' {k}="{v}"' for k, v in attrs.items() if v is not None)
self.preserved_content.append(f'<{tag}{attr_str}>')
else:
self.preserved_content.append(f'</{tag}>')
return
# Handle pre tags
if tag == 'pre':
if start:
self.o('```\n') # Markdown code block start
self.inside_pre = True
else:
self.o('\n```\n') # Markdown code block end
self.inside_pre = False
elif tag == 'code':
if self.inside_pre and not self.handle_code_in_pre:
# Ignore code tags inside pre blocks if handle_code_in_pre is False
return
if start:
self.o('`') # Markdown inline code start
self.inside_code = True
else:
self.o('`') # Markdown inline code end
self.inside_code = False
else:
super().handle_tag(tag, attrs, start)
def handle_data(self, data, entity_char=False):
"""Override handle_data to capture content within preserved tags."""
if self.preserve_depth > 0:
self.preserved_content.append(data)
return
if self.inside_pre:
# Output the raw content for pre blocks, including content inside code tags
self.o(data) # Directly output the data as-is (preserve newlines)
return
if self.inside_code:
# Inline code: no newlines allowed
self.o(data.replace('\n', ' '))
return
# Default behavior for other tags
super().handle_data(data, entity_char)
# # Handle pre tags
# if tag == 'pre':
# if start:
# self.o('```\n')
# self.inside_pre = True
# else:
# self.o('\n```')
# self.inside_pre = False
# # elif tag in ["h1", "h2", "h3", "h4", "h5", "h6"]:
# # pass
# else:
# super().handle_tag(tag, attrs, start)
# def handle_data(self, data, entity_char=False):
# """Override handle_data to capture content within preserved tags."""
# if self.preserve_depth > 0:
# self.preserved_content.append(data)
# return
# super().handle_data(data, entity_char)
class InvalidCSSSelectorError(Exception): class InvalidCSSSelectorError(Exception):
pass pass
def create_box_message( def create_box_message(
message: str, message: str,
type: str = "info", type: str = "info",
@@ -374,50 +248,6 @@ def escape_json_string(s):
return s return s
class CustomHTML2Text_v0(HTML2Text):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.inside_pre = False
self.inside_code = False
self.skip_internal_links = False
self.single_line_break = False
self.mark_code = False
self.include_sup_sub = False
self.body_width = 0
self.ignore_mailto_links = True
self.ignore_links = False
self.escape_backslash = False
self.escape_dot = False
self.escape_plus = False
self.escape_dash = False
self.escape_snob = False
def handle_tag(self, tag, attrs, start):
if tag == 'pre':
if start:
self.o('```\n')
self.inside_pre = True
else:
self.o('\n```')
self.inside_pre = False
elif tag in ["h1", "h2", "h3", "h4", "h5", "h6"]:
pass
# elif tag == 'code' and not self.inside_pre:
# if start:
# if not self.inside_pre:
# self.o('`')
# self.inside_code = True
# else:
# if not self.inside_pre:
# self.o('`')
# self.inside_code = False
super().handle_tag(tag, attrs, start)
def replace_inline_tags(soup, tags, only_text=False): def replace_inline_tags(soup, tags, only_text=False):
tag_replacements = { tag_replacements = {
'b': lambda tag: f"**{tag.text}**", 'b': lambda tag: f"**{tag.text}**",
@@ -979,7 +809,6 @@ def extract_metadata(html, soup=None):
return metadata return metadata
def extract_xml_tags(string): def extract_xml_tags(string):
tags = re.findall(r'<(\w+)>', string) tags = re.findall(r'<(\w+)>', string)
return list(set(tags)) return list(set(tags))
@@ -997,7 +826,6 @@ def extract_xml_data(tags, string):
return data return data
# Function to perform the completion with exponential backoff
def perform_completion_with_backoff( def perform_completion_with_backoff(
provider, provider,
prompt_with_variables, prompt_with_variables,
@@ -1351,6 +1179,35 @@ def clean_tokens(tokens: list[str]) -> list[str]:
and not token.startswith('') and not token.startswith('')
and not token.startswith('')] and not token.startswith('')]
def profile_and_time(func):
@wraps(func)
def wrapper(self, *args, **kwargs):
# Start timer
start_time = time.perf_counter()
# Setup profiler
profiler = cProfile.Profile()
profiler.enable()
# Run function
result = func(self, *args, **kwargs)
# Stop profiler
profiler.disable()
# Calculate elapsed time
elapsed_time = time.perf_counter() - start_time
# Print timing
print(f"[PROFILER] Scraping completed in {elapsed_time:.2f} seconds")
# Print profiling stats
stats = pstats.Stats(profiler)
stats.sort_stats('cumulative') # Sort by cumulative time
stats.print_stats(20) # Print top 20 time-consuming functions
return result
return wrapper
def generate_content_hash(content: str) -> str: def generate_content_hash(content: str) -> str:
"""Generate a unique hash for content""" """Generate a unique hash for content"""

View File

@@ -0,0 +1,225 @@
### Using `storage_state` to Pre-Load Cookies and LocalStorage
Crawl4ais `AsyncWebCrawler` lets you preserve and reuse session data, including cookies and localStorage, across multiple runs. By providing a `storage_state`, you can start your crawls already “logged in” or with any other necessary session data—no need to repeat the login flow every time.
#### What is `storage_state`?
`storage_state` can be:
- A dictionary containing cookies and localStorage data.
- A path to a JSON file that holds this information.
When you pass `storage_state` to the crawler, it applies these cookies and localStorage entries before loading any pages. This means your crawler effectively starts in a known authenticated or pre-configured state.
#### Example Structure
Heres an example storage state:
```json
{
"cookies": [
{
"name": "session",
"value": "abcd1234",
"domain": "example.com",
"path": "/",
"expires": 1675363572.037711,
"httpOnly": false,
"secure": false,
"sameSite": "None"
}
],
"origins": [
{
"origin": "https://example.com",
"localStorage": [
{ "name": "token", "value": "my_auth_token" },
{ "name": "refreshToken", "value": "my_refresh_token" }
]
}
]
}
```
This JSON sets a `session` cookie and two localStorage entries (`token` and `refreshToken`) for `https://example.com`.
---
### Passing `storage_state` as a Dictionary
You can directly provide the data as a dictionary:
```python
import asyncio
from crawl4ai import AsyncWebCrawler
async def main():
storage_dict = {
"cookies": [
{
"name": "session",
"value": "abcd1234",
"domain": "example.com",
"path": "/",
"expires": 1675363572.037711,
"httpOnly": False,
"secure": False,
"sameSite": "None"
}
],
"origins": [
{
"origin": "https://example.com",
"localStorage": [
{"name": "token", "value": "my_auth_token"},
{"name": "refreshToken", "value": "my_refresh_token"}
]
}
]
}
async with AsyncWebCrawler(
headless=True,
storage_state=storage_dict
) as crawler:
result = await crawler.arun(url='https://example.com/protected')
if result.success:
print("Crawl succeeded with pre-loaded session data!")
print("Page HTML length:", len(result.html))
if __name__ == "__main__":
asyncio.run(main())
```
---
### Passing `storage_state` as a File
If you prefer a file-based approach, save the JSON above to `mystate.json` and reference it:
```python
import asyncio
from crawl4ai import AsyncWebCrawler
async def main():
async with AsyncWebCrawler(
headless=True,
storage_state="mystate.json" # Uses a JSON file instead of a dictionary
) as crawler:
result = await crawler.arun(url='https://example.com/protected')
if result.success:
print("Crawl succeeded with pre-loaded session data!")
print("Page HTML length:", len(result.html))
if __name__ == "__main__":
asyncio.run(main())
```
---
### Using `storage_state` to Avoid Repeated Logins (Sign In Once, Use Later)
A common scenario is when you need to log in to a site (entering username/password, etc.) to access protected pages. Doing so every crawl is cumbersome. Instead, you can:
1. Perform the login once in a hook.
2. After login completes, export the resulting `storage_state` to a file.
3. On subsequent runs, provide that `storage_state` to skip the login step.
**Step-by-Step Example:**
**First Run (Perform Login and Save State):**
```python
import asyncio
from crawl4ai import AsyncWebCrawler, CacheMode
from crawl4ai.markdown_generation_strategy import DefaultMarkdownGenerator
async def on_browser_created_hook(browser):
# Access the default context and create a page
context = browser.contexts[0]
page = await context.new_page()
# Navigate to the login page
await page.goto("https://example.com/login", wait_until="domcontentloaded")
# Fill in credentials and submit
await page.fill("input[name='username']", "myuser")
await page.fill("input[name='password']", "mypassword")
await page.click("button[type='submit']")
await page.wait_for_load_state("networkidle")
# Now the site sets tokens in localStorage and cookies
# Export this state to a file so we can reuse it
await context.storage_state(path="my_storage_state.json")
await page.close()
async def main():
# First run: perform login and export the storage_state
async with AsyncWebCrawler(
headless=True,
verbose=True,
hooks={"on_browser_created": on_browser_created_hook},
use_persistent_context=True,
user_data_dir="./my_user_data"
) as crawler:
# After on_browser_created_hook runs, we have storage_state saved to my_storage_state.json
result = await crawler.arun(
url='https://example.com/protected-page',
cache_mode=CacheMode.BYPASS,
markdown_generator=DefaultMarkdownGenerator(options={"ignore_links": True}),
)
print("First run result success:", result.success)
if result.success:
print("Protected page HTML length:", len(result.html))
if __name__ == "__main__":
asyncio.run(main())
```
**Second Run (Reuse Saved State, No Login Needed):**
```python
import asyncio
from crawl4ai import AsyncWebCrawler, CacheMode
from crawl4ai.markdown_generation_strategy import DefaultMarkdownGenerator
async def main():
# Second run: no need to hook on_browser_created this time.
# Just provide the previously saved storage state.
async with AsyncWebCrawler(
headless=True,
verbose=True,
use_persistent_context=True,
user_data_dir="./my_user_data",
storage_state="my_storage_state.json" # Reuse previously exported state
) as crawler:
# Now the crawler starts already logged in
result = await crawler.arun(
url='https://example.com/protected-page',
cache_mode=CacheMode.BYPASS,
markdown_generator=DefaultMarkdownGenerator(options={"ignore_links": True}),
)
print("Second run result success:", result.success)
if result.success:
print("Protected page HTML length:", len(result.html))
if __name__ == "__main__":
asyncio.run(main())
```
**Whats Happening Here?**
- During the first run, the `on_browser_created_hook` logs into the site.
- After logging in, the crawler exports the current session (cookies, localStorage, etc.) to `my_storage_state.json`.
- On subsequent runs, passing `storage_state="my_storage_state.json"` starts the browser context with these tokens already in place, skipping the login steps.
**Sign Out Scenario:**
If the website allows you to sign out by clearing tokens or by navigating to a sign-out URL, you can also run a script that uses `on_browser_created_hook` or `arun` to simulate signing out, then export the resulting `storage_state` again. That would give you a baseline “logged out” state to start fresh from next time.
---
### Conclusion
By using `storage_state`, you can skip repetitive actions, like logging in, and jump straight into crawling protected content. Whether you provide a file path or a dictionary, this powerful feature helps maintain state between crawls, simplifying your data extraction pipelines.

View File

@@ -8,7 +8,7 @@ First, let's import the necessary modules and create an instance of `AsyncWebCra
```python ```python
import asyncio import asyncio
from crawl4ai import AsyncWebCrawler, CasheMode from crawl4ai import AsyncWebCrawler, CacheMode
async def main(): async def main():
async with AsyncWebCrawler(verbose=True) as crawler: async with AsyncWebCrawler(verbose=True) as crawler:

View File

@@ -0,0 +1,153 @@
import os, sys
parent_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
sys.path.append(parent_dir)
__location__ = os.path.realpath( os.path.join(os.getcwd(), os.path.dirname(__file__)))
import os, sys
import asyncio
from crawl4ai import AsyncWebCrawler, CacheMode
from crawl4ai.content_filter_strategy import PruningContentFilter
from crawl4ai.markdown_generation_strategy import DefaultMarkdownGenerator
# Assuming that the changes made allow different configurations
# for managed browser, persistent context, and so forth.
async def test_default_headless():
async with AsyncWebCrawler(
headless=True,
verbose=True,
user_agent_mode="random",
user_agent_generator_config={"device_type": "mobile", "os_type": "android"},
use_managed_browser=False,
use_persistent_context=False,
ignore_https_errors=True,
# Testing normal ephemeral context
) as crawler:
result = await crawler.arun(
url='https://www.kidocode.com/degrees/technology',
cache_mode=CacheMode.BYPASS,
markdown_generator=DefaultMarkdownGenerator(options={"ignore_links": True}),
)
print("[test_default_headless] success:", result.success)
print("HTML length:", len(result.html if result.html else ""))
async def test_managed_browser_persistent():
# Treating use_persistent_context=True as managed_browser scenario.
async with AsyncWebCrawler(
headless=False,
verbose=True,
user_agent_mode="random",
user_agent_generator_config={"device_type": "desktop", "os_type": "mac"},
use_managed_browser=True,
use_persistent_context=True, # now should behave same as managed browser
user_data_dir="./outpu/test_profile",
# This should store and reuse profile data across runs
) as crawler:
result = await crawler.arun(
url='https://www.google.com',
cache_mode=CacheMode.BYPASS,
markdown_generator=DefaultMarkdownGenerator(options={"ignore_links": True})
)
print("[test_managed_browser_persistent] success:", result.success)
print("HTML length:", len(result.html if result.html else ""))
async def test_session_reuse():
# Test creating a session, using it for multiple calls
session_id = "my_session"
async with AsyncWebCrawler(
headless=False,
verbose=True,
user_agent="Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36",
# Fixed user-agent for consistency
use_managed_browser=False,
use_persistent_context=False,
) as crawler:
# First call: create session
result1 = await crawler.arun(
url='https://www.example.com',
cache_mode=CacheMode.BYPASS,
session_id=session_id,
markdown_generator=DefaultMarkdownGenerator(options={"ignore_links": True})
)
print("[test_session_reuse first call] success:", result1.success)
# Second call: same session, possibly cookie retained
result2 = await crawler.arun(
url='https://www.example.com/about',
cache_mode=CacheMode.BYPASS,
session_id=session_id,
markdown_generator=DefaultMarkdownGenerator(options={"ignore_links": True})
)
print("[test_session_reuse second call] success:", result2.success)
async def test_magic_mode():
# Test magic mode with override_navigator and simulate_user
async with AsyncWebCrawler(
headless=False,
verbose=True,
user_agent_mode="random",
user_agent_generator_config={"device_type": "desktop", "os_type": "windows"},
use_managed_browser=False,
use_persistent_context=False,
magic=True,
override_navigator=True,
simulate_user=True,
) as crawler:
result = await crawler.arun(
url='https://www.kidocode.com/degrees/business',
cache_mode=CacheMode.BYPASS,
markdown_generator=DefaultMarkdownGenerator(options={"ignore_links": True})
)
print("[test_magic_mode] success:", result.success)
print("HTML length:", len(result.html if result.html else ""))
async def test_proxy_settings():
# Test with a proxy (if available) to ensure code runs with proxy
async with AsyncWebCrawler(
headless=True,
verbose=False,
user_agent="Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36",
proxy="http://127.0.0.1:8080", # Assuming local proxy server for test
use_managed_browser=False,
use_persistent_context=False,
) as crawler:
result = await crawler.arun(
url='https://httpbin.org/ip',
cache_mode=CacheMode.BYPASS,
markdown_generator=DefaultMarkdownGenerator(options={"ignore_links": True})
)
print("[test_proxy_settings] success:", result.success)
if result.success:
print("HTML preview:", result.html[:200] if result.html else "")
async def test_ignore_https_errors():
# Test ignore HTTPS errors with a self-signed or invalid cert domain
# This is just conceptual, the domain should be one that triggers SSL error.
# Using a hypothetical URL that fails SSL:
async with AsyncWebCrawler(
headless=True,
verbose=True,
user_agent="Mozilla/5.0",
ignore_https_errors=True,
use_managed_browser=False,
use_persistent_context=False,
) as crawler:
result = await crawler.arun(
url='https://self-signed.badssl.com/',
cache_mode=CacheMode.BYPASS,
markdown_generator=DefaultMarkdownGenerator(options={"ignore_links": True})
)
print("[test_ignore_https_errors] success:", result.success)
async def main():
print("Running tests...")
# await test_default_headless()
# await test_managed_browser_persistent()
# await test_session_reuse()
# await test_magic_mode()
# await test_proxy_settings()
await test_ignore_https_errors()
if __name__ == "__main__":
asyncio.run(main())