- Update version number to 0.3.71 - Add sleep_on_close option to AsyncPlaywrightCrawlerStrategy - Enhance context creation with additional options - Improve error message formatting and visibility - Update quickstart documentation
581 lines
24 KiB
Python
581 lines
24 KiB
Python
import asyncio
|
|
import base64
|
|
import time
|
|
from abc import ABC, abstractmethod
|
|
from typing import Callable, Dict, Any, List, Optional, Awaitable
|
|
import os
|
|
from playwright.async_api import async_playwright, Page, Browser, Error
|
|
from io import BytesIO
|
|
from PIL import Image, ImageDraw, ImageFont
|
|
from pathlib import Path
|
|
from playwright.async_api import ProxySettings
|
|
from pydantic import BaseModel
|
|
import hashlib
|
|
import json
|
|
import uuid
|
|
from playwright_stealth import StealthConfig, stealth_async
|
|
|
|
stealth_config = StealthConfig(
|
|
webdriver=True,
|
|
chrome_app=True,
|
|
chrome_csi=True,
|
|
chrome_load_times=True,
|
|
chrome_runtime=True,
|
|
navigator_languages=True,
|
|
navigator_plugins=True,
|
|
navigator_permissions=True,
|
|
webgl_vendor=True,
|
|
outerdimensions=True,
|
|
navigator_hardware_concurrency=True,
|
|
media_codecs=True,
|
|
)
|
|
|
|
|
|
class AsyncCrawlResponse(BaseModel):
|
|
html: str
|
|
response_headers: Dict[str, str]
|
|
status_code: int
|
|
screenshot: Optional[str] = None
|
|
get_delayed_content: Optional[Callable[[Optional[float]], Awaitable[str]]] = None
|
|
|
|
class Config:
|
|
arbitrary_types_allowed = True
|
|
|
|
class AsyncCrawlerStrategy(ABC):
|
|
@abstractmethod
|
|
async def crawl(self, url: str, **kwargs) -> AsyncCrawlResponse:
|
|
pass
|
|
|
|
@abstractmethod
|
|
async def crawl_many(self, urls: List[str], **kwargs) -> List[AsyncCrawlResponse]:
|
|
pass
|
|
|
|
@abstractmethod
|
|
async def take_screenshot(self, url: str) -> str:
|
|
pass
|
|
|
|
@abstractmethod
|
|
def update_user_agent(self, user_agent: str):
|
|
pass
|
|
|
|
@abstractmethod
|
|
def set_hook(self, hook_type: str, hook: Callable):
|
|
pass
|
|
|
|
class AsyncPlaywrightCrawlerStrategy(AsyncCrawlerStrategy):
|
|
def __init__(self, use_cached_html=False, js_code=None, **kwargs):
|
|
self.use_cached_html = use_cached_html
|
|
self.user_agent = kwargs.get(
|
|
"user_agent",
|
|
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 "
|
|
"(KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36"
|
|
)
|
|
self.proxy = kwargs.get("proxy")
|
|
self.headless = kwargs.get("headless", True)
|
|
self.browser_type = kwargs.get("browser_type", "chromium")
|
|
self.headers = kwargs.get("headers", {})
|
|
self.sessions = {}
|
|
self.session_ttl = 1800
|
|
self.js_code = js_code
|
|
self.verbose = kwargs.get("verbose", False)
|
|
self.playwright = None
|
|
self.browser = None
|
|
self.sleep_on_close = kwargs.get("sleep_on_close", False)
|
|
self.hooks = {
|
|
'on_browser_created': None,
|
|
'on_user_agent_updated': None,
|
|
'on_execution_started': None,
|
|
'before_goto': None,
|
|
'after_goto': None,
|
|
'before_return_html': None,
|
|
'before_retrieve_html': None
|
|
}
|
|
|
|
async def __aenter__(self):
|
|
await self.start()
|
|
return self
|
|
|
|
async def __aexit__(self, exc_type, exc_val, exc_tb):
|
|
await self.close()
|
|
|
|
async def start(self):
|
|
if self.playwright is None:
|
|
self.playwright = await async_playwright().start()
|
|
if self.browser is None:
|
|
browser_args = {
|
|
"headless": self.headless,
|
|
"args": [
|
|
"--disable-gpu",
|
|
"--no-sandbox",
|
|
"--disable-dev-shm-usage",
|
|
"--disable-blink-features=AutomationControlled",
|
|
"--disable-infobars",
|
|
"--window-position=0,0",
|
|
"--ignore-certificate-errors",
|
|
"--ignore-certificate-errors-spki-list",
|
|
# "--headless=new", # Use the new headless mode
|
|
]
|
|
}
|
|
|
|
# Add proxy settings if a proxy is specified
|
|
if self.proxy:
|
|
proxy_settings = ProxySettings(server=self.proxy)
|
|
browser_args["proxy"] = proxy_settings
|
|
|
|
# Select the appropriate browser based on the browser_type
|
|
if self.browser_type == "firefox":
|
|
self.browser = await self.playwright.firefox.launch(**browser_args)
|
|
elif self.browser_type == "webkit":
|
|
self.browser = await self.playwright.webkit.launch(**browser_args)
|
|
else:
|
|
self.browser = await self.playwright.chromium.launch(**browser_args)
|
|
|
|
await self.execute_hook('on_browser_created', self.browser)
|
|
|
|
async def close(self):
|
|
if self.sleep_on_close:
|
|
await asyncio.sleep(500)
|
|
if self.browser:
|
|
await self.browser.close()
|
|
self.browser = None
|
|
if self.playwright:
|
|
await self.playwright.stop()
|
|
self.playwright = None
|
|
|
|
def __del__(self):
|
|
if self.browser or self.playwright:
|
|
asyncio.get_event_loop().run_until_complete(self.close())
|
|
|
|
def set_hook(self, hook_type: str, hook: Callable):
|
|
if hook_type in self.hooks:
|
|
self.hooks[hook_type] = hook
|
|
else:
|
|
raise ValueError(f"Invalid hook type: {hook_type}")
|
|
|
|
async def execute_hook(self, hook_type: str, *args):
|
|
hook = self.hooks.get(hook_type)
|
|
if hook:
|
|
if asyncio.iscoroutinefunction(hook):
|
|
return await hook(*args)
|
|
else:
|
|
return hook(*args)
|
|
return args[0] if args else None
|
|
|
|
def update_user_agent(self, user_agent: str):
|
|
self.user_agent = user_agent
|
|
|
|
def set_custom_headers(self, headers: Dict[str, str]):
|
|
self.headers = headers
|
|
|
|
async def kill_session(self, session_id: str):
|
|
if session_id in self.sessions:
|
|
context, page, _ = self.sessions[session_id]
|
|
await page.close()
|
|
await context.close()
|
|
del self.sessions[session_id]
|
|
|
|
def _cleanup_expired_sessions(self):
|
|
current_time = time.time()
|
|
expired_sessions = [
|
|
sid for sid, (_, _, last_used) in self.sessions.items()
|
|
if current_time - last_used > self.session_ttl
|
|
]
|
|
for sid in expired_sessions:
|
|
asyncio.create_task(self.kill_session(sid))
|
|
|
|
async def smart_wait(self, page: Page, wait_for: str, timeout: float = 30000):
|
|
wait_for = wait_for.strip()
|
|
|
|
if wait_for.startswith('js:'):
|
|
# Explicitly specified JavaScript
|
|
js_code = wait_for[3:].strip()
|
|
return await self.csp_compliant_wait(page, js_code, timeout)
|
|
elif wait_for.startswith('css:'):
|
|
# Explicitly specified CSS selector
|
|
css_selector = wait_for[4:].strip()
|
|
try:
|
|
await page.wait_for_selector(css_selector, timeout=timeout)
|
|
except Error as e:
|
|
if 'Timeout' in str(e):
|
|
raise TimeoutError(f"Timeout after {timeout}ms waiting for selector '{css_selector}'")
|
|
else:
|
|
raise ValueError(f"Invalid CSS selector: '{css_selector}'")
|
|
else:
|
|
# Auto-detect based on content
|
|
if wait_for.startswith('()') or wait_for.startswith('function'):
|
|
# It's likely a JavaScript function
|
|
return await self.csp_compliant_wait(page, wait_for, timeout)
|
|
else:
|
|
# Assume it's a CSS selector first
|
|
try:
|
|
await page.wait_for_selector(wait_for, timeout=timeout)
|
|
except Error as e:
|
|
if 'Timeout' in str(e):
|
|
raise TimeoutError(f"Timeout after {timeout}ms waiting for selector '{wait_for}'")
|
|
else:
|
|
# If it's not a timeout error, it might be an invalid selector
|
|
# Let's try to evaluate it as a JavaScript function as a fallback
|
|
try:
|
|
return await self.csp_compliant_wait(page, f"() => {{{wait_for}}}", timeout)
|
|
except Error:
|
|
raise ValueError(f"Invalid wait_for parameter: '{wait_for}'. "
|
|
"It should be either a valid CSS selector, a JavaScript function, "
|
|
"or explicitly prefixed with 'js:' or 'css:'.")
|
|
|
|
async def csp_compliant_wait(self, page: Page, user_wait_function: str, timeout: float = 30000):
|
|
wrapper_js = f"""
|
|
async () => {{
|
|
const userFunction = {user_wait_function};
|
|
const startTime = Date.now();
|
|
while (true) {{
|
|
if (await userFunction()) {{
|
|
return true;
|
|
}}
|
|
if (Date.now() - startTime > {timeout}) {{
|
|
throw new Error('Timeout waiting for condition');
|
|
}}
|
|
await new Promise(resolve => setTimeout(resolve, 100));
|
|
}}
|
|
}}
|
|
"""
|
|
|
|
try:
|
|
await page.evaluate(wrapper_js)
|
|
except TimeoutError:
|
|
raise TimeoutError(f"Timeout after {timeout}ms waiting for condition")
|
|
except Exception as e:
|
|
raise RuntimeError(f"Error in wait condition: {str(e)}")
|
|
|
|
async def process_iframes(self, page):
|
|
# Find all iframes
|
|
iframes = await page.query_selector_all('iframe')
|
|
|
|
for i, iframe in enumerate(iframes):
|
|
try:
|
|
# Add a unique identifier to the iframe
|
|
await iframe.evaluate(f'(element) => element.id = "iframe-{i}"')
|
|
|
|
# Get the frame associated with this iframe
|
|
frame = await iframe.content_frame()
|
|
|
|
if frame:
|
|
# Wait for the frame to load
|
|
await frame.wait_for_load_state('load', timeout=30000) # 30 seconds timeout
|
|
|
|
# Extract the content of the iframe's body
|
|
iframe_content = await frame.evaluate('() => document.body.innerHTML')
|
|
|
|
# Generate a unique class name for this iframe
|
|
class_name = f'extracted-iframe-content-{i}'
|
|
|
|
# Replace the iframe with a div containing the extracted content
|
|
_iframe = iframe_content.replace('`', '\\`')
|
|
await page.evaluate(f"""
|
|
() => {{
|
|
const iframe = document.getElementById('iframe-{i}');
|
|
const div = document.createElement('div');
|
|
div.innerHTML = `{_iframe}`;
|
|
div.className = '{class_name}';
|
|
iframe.replaceWith(div);
|
|
}}
|
|
""")
|
|
else:
|
|
print(f"Warning: Could not access content frame for iframe {i}")
|
|
except Exception as e:
|
|
print(f"Error processing iframe {i}: {str(e)}")
|
|
|
|
# Return the page object
|
|
return page
|
|
|
|
async def crawl(self, url: str, **kwargs) -> AsyncCrawlResponse:
|
|
response_headers = {}
|
|
status_code = None
|
|
|
|
self._cleanup_expired_sessions()
|
|
session_id = kwargs.get("session_id")
|
|
if session_id:
|
|
context, page, _ = self.sessions.get(session_id, (None, None, None))
|
|
if not context:
|
|
context = await self.browser.new_context(
|
|
user_agent=self.user_agent,
|
|
viewport={"width": 1920, "height": 1080},
|
|
proxy={"server": self.proxy} if self.proxy else None,
|
|
accept_downloads=True,
|
|
java_script_enabled=True
|
|
)
|
|
await context.add_cookies([{"name": "cookiesEnabled", "value": "true", "url": url}])
|
|
await context.set_extra_http_headers(self.headers)
|
|
page = await context.new_page()
|
|
self.sessions[session_id] = (context, page, time.time())
|
|
else:
|
|
context = await self.browser.new_context(
|
|
user_agent=self.user_agent,
|
|
viewport={"width": 1920, "height": 1080},
|
|
proxy={"server": self.proxy} if self.proxy else None
|
|
)
|
|
await context.set_extra_http_headers(self.headers)
|
|
|
|
if kwargs.get("override_navigator", False) or kwargs.get("simulate_user", False) or kwargs.get("magic", False):
|
|
# Inject scripts to override navigator properties
|
|
await context.add_init_script("""
|
|
// Pass the Permissions Test.
|
|
const originalQuery = window.navigator.permissions.query;
|
|
window.navigator.permissions.query = (parameters) => (
|
|
parameters.name === 'notifications' ?
|
|
Promise.resolve({ state: Notification.permission }) :
|
|
originalQuery(parameters)
|
|
);
|
|
Object.defineProperty(navigator, 'webdriver', {
|
|
get: () => undefined
|
|
});
|
|
window.navigator.chrome = {
|
|
runtime: {},
|
|
// Add other properties if necessary
|
|
};
|
|
Object.defineProperty(navigator, 'plugins', {
|
|
get: () => [1, 2, 3, 4, 5],
|
|
});
|
|
Object.defineProperty(navigator, 'languages', {
|
|
get: () => ['en-US', 'en'],
|
|
});
|
|
Object.defineProperty(document, 'hidden', {
|
|
get: () => false
|
|
});
|
|
Object.defineProperty(document, 'visibilityState', {
|
|
get: () => 'visible'
|
|
});
|
|
""")
|
|
|
|
page = await context.new_page()
|
|
# await stealth_async(page) #, stealth_config)
|
|
|
|
# Add console message and error logging
|
|
if kwargs.get("log_console", False):
|
|
page.on("console", lambda msg: print(f"Console: {msg.text}"))
|
|
page.on("pageerror", lambda exc: print(f"Page Error: {exc}"))
|
|
|
|
try:
|
|
if self.verbose:
|
|
print(f"[LOG] 🕸️ Crawling {url} using AsyncPlaywrightCrawlerStrategy...")
|
|
|
|
if self.use_cached_html:
|
|
cache_file_path = os.path.join(
|
|
Path.home(), ".crawl4ai", "cache", hashlib.md5(url.encode()).hexdigest()
|
|
)
|
|
if os.path.exists(cache_file_path):
|
|
html = ""
|
|
with open(cache_file_path, "r") as f:
|
|
html = f.read()
|
|
# retrieve response headers and status code from cache
|
|
with open(cache_file_path + ".meta", "r") as f:
|
|
meta = json.load(f)
|
|
response_headers = meta.get("response_headers", {})
|
|
status_code = meta.get("status_code")
|
|
response = AsyncCrawlResponse(
|
|
html=html, response_headers=response_headers, status_code=status_code
|
|
)
|
|
return response
|
|
|
|
if not kwargs.get("js_only", False):
|
|
await self.execute_hook('before_goto', page)
|
|
|
|
response = await page.goto(
|
|
url, wait_until="domcontentloaded", timeout=kwargs.get("page_timeout", 60000)
|
|
)
|
|
|
|
# response = await page.goto("about:blank")
|
|
# await page.evaluate(f"window.location.href = '{url}'")
|
|
|
|
await self.execute_hook('after_goto', page)
|
|
|
|
# Get status code and headers
|
|
status_code = response.status
|
|
response_headers = response.headers
|
|
else:
|
|
status_code = 200
|
|
response_headers = {}
|
|
|
|
await page.wait_for_selector('body')
|
|
await page.evaluate("window.scrollTo(0, document.body.scrollHeight)")
|
|
|
|
js_code = kwargs.get("js_code", kwargs.get("js", self.js_code))
|
|
if js_code:
|
|
if isinstance(js_code, str):
|
|
await page.evaluate(js_code)
|
|
elif isinstance(js_code, list):
|
|
for js in js_code:
|
|
await page.evaluate(js)
|
|
|
|
await page.wait_for_load_state('networkidle')
|
|
# Check for on execution event
|
|
await self.execute_hook('on_execution_started', page)
|
|
|
|
if kwargs.get("simulate_user", False) or kwargs.get("magic", False):
|
|
# Simulate user interactions
|
|
await page.mouse.move(100, 100)
|
|
await page.mouse.down()
|
|
await page.mouse.up()
|
|
await page.keyboard.press('ArrowDown')
|
|
|
|
# Handle the wait_for parameter
|
|
wait_for = kwargs.get("wait_for")
|
|
if wait_for:
|
|
try:
|
|
await self.smart_wait(page, wait_for, timeout=kwargs.get("page_timeout", 60000))
|
|
except Exception as e:
|
|
raise RuntimeError(f"Wait condition failed: {str(e)}")
|
|
|
|
# Update image dimensions
|
|
update_image_dimensions_js = """
|
|
() => {
|
|
return new Promise((resolve) => {
|
|
const filterImage = (img) => {
|
|
// Filter out images that are too small
|
|
if (img.width < 100 && img.height < 100) return false;
|
|
|
|
// Filter out images that are not visible
|
|
const rect = img.getBoundingClientRect();
|
|
if (rect.width === 0 || rect.height === 0) return false;
|
|
|
|
// Filter out images with certain class names (e.g., icons, thumbnails)
|
|
if (img.classList.contains('icon') || img.classList.contains('thumbnail')) return false;
|
|
|
|
// Filter out images with certain patterns in their src (e.g., placeholder images)
|
|
if (img.src.includes('placeholder') || img.src.includes('icon')) return false;
|
|
|
|
return true;
|
|
};
|
|
|
|
const images = Array.from(document.querySelectorAll('img')).filter(filterImage);
|
|
let imagesLeft = images.length;
|
|
|
|
if (imagesLeft === 0) {
|
|
resolve();
|
|
return;
|
|
}
|
|
|
|
const checkImage = (img) => {
|
|
if (img.complete && img.naturalWidth !== 0) {
|
|
img.setAttribute('width', img.naturalWidth);
|
|
img.setAttribute('height', img.naturalHeight);
|
|
imagesLeft--;
|
|
if (imagesLeft === 0) resolve();
|
|
}
|
|
};
|
|
|
|
images.forEach(img => {
|
|
checkImage(img);
|
|
if (!img.complete) {
|
|
img.onload = () => {
|
|
checkImage(img);
|
|
};
|
|
img.onerror = () => {
|
|
imagesLeft--;
|
|
if (imagesLeft === 0) resolve();
|
|
};
|
|
}
|
|
});
|
|
|
|
// Fallback timeout of 5 seconds
|
|
setTimeout(() => resolve(), 5000);
|
|
});
|
|
}
|
|
"""
|
|
await page.evaluate(update_image_dimensions_js)
|
|
|
|
# Wait a bit for any onload events to complete
|
|
await page.wait_for_timeout(100)
|
|
|
|
# Process iframes
|
|
if kwargs.get("process_iframes", False):
|
|
page = await self.process_iframes(page)
|
|
|
|
await self.execute_hook('before_retrieve_html', page)
|
|
# Check if delay_before_return_html is set then wait for that time
|
|
delay_before_return_html = kwargs.get("delay_before_return_html")
|
|
if delay_before_return_html:
|
|
await asyncio.sleep(delay_before_return_html)
|
|
|
|
html = await page.content()
|
|
await self.execute_hook('before_return_html', page, html)
|
|
|
|
# Check if kwargs has screenshot=True then take screenshot
|
|
screenshot_data = None
|
|
if kwargs.get("screenshot"):
|
|
screenshot_data = await self.take_screenshot(url)
|
|
|
|
if self.verbose:
|
|
print(f"[LOG] ✅ Crawled {url} successfully!")
|
|
|
|
if self.use_cached_html:
|
|
cache_file_path = os.path.join(
|
|
Path.home(), ".crawl4ai", "cache", hashlib.md5(url.encode()).hexdigest()
|
|
)
|
|
with open(cache_file_path, "w", encoding="utf-8") as f:
|
|
f.write(html)
|
|
# store response headers and status code in cache
|
|
with open(cache_file_path + ".meta", "w", encoding="utf-8") as f:
|
|
json.dump({
|
|
"response_headers": response_headers,
|
|
"status_code": status_code
|
|
}, f)
|
|
|
|
async def get_delayed_content(delay: float = 5.0) -> str:
|
|
if self.verbose:
|
|
print(f"[LOG] Waiting for {delay} seconds before retrieving content for {url}")
|
|
await asyncio.sleep(delay)
|
|
return await page.content()
|
|
|
|
response = AsyncCrawlResponse(
|
|
html=html,
|
|
response_headers=response_headers,
|
|
status_code=status_code,
|
|
screenshot=screenshot_data,
|
|
get_delayed_content=get_delayed_content
|
|
)
|
|
return response
|
|
except Error as e:
|
|
raise Error(f"[ERROR] 🚫 crawl(): Failed to crawl {url}: {str(e)}")
|
|
# finally:
|
|
# if not session_id:
|
|
# await page.close()
|
|
# await context.close()
|
|
|
|
async def crawl_many(self, urls: List[str], **kwargs) -> List[AsyncCrawlResponse]:
|
|
semaphore_count = kwargs.get('semaphore_count', 5) # Adjust as needed
|
|
semaphore = asyncio.Semaphore(semaphore_count)
|
|
|
|
async def crawl_with_semaphore(url):
|
|
async with semaphore:
|
|
return await self.crawl(url, **kwargs)
|
|
|
|
tasks = [crawl_with_semaphore(url) for url in urls]
|
|
results = await asyncio.gather(*tasks, return_exceptions=True)
|
|
return [result if not isinstance(result, Exception) else str(result) for result in results]
|
|
|
|
async def take_screenshot(self, url: str, wait_time=1000) -> str:
|
|
async with await self.browser.new_context(user_agent=self.user_agent) as context:
|
|
page = await context.new_page()
|
|
try:
|
|
await page.goto(url, wait_until="domcontentloaded", timeout=30000)
|
|
# Wait for a specified time (default is 1 second)
|
|
await page.wait_for_timeout(wait_time)
|
|
screenshot = await page.screenshot(full_page=True)
|
|
return base64.b64encode(screenshot).decode('utf-8')
|
|
except Exception as e:
|
|
error_message = f"Failed to take screenshot: {str(e)}"
|
|
print(error_message)
|
|
|
|
# Generate an error image
|
|
img = Image.new('RGB', (800, 600), color='black')
|
|
draw = ImageDraw.Draw(img)
|
|
font = ImageFont.load_default()
|
|
draw.text((10, 10), error_message, fill=(255, 255, 255), font=font)
|
|
|
|
buffered = BytesIO()
|
|
img.save(buffered, format="JPEG")
|
|
return base64.b64encode(buffered.getvalue()).decode('utf-8')
|
|
finally:
|
|
await page.close()
|
|
|