Compare commits

...

5 Commits

Author SHA1 Message Date
UncleCode
3a2cb7dacf test: Add comprehensive unit tests for AsyncExecutor functionality 2024-11-13 19:46:05 +08:00
UncleCode
c38ac29edb perf(crawler): major performance improvements & raw HTML support
- Switch to lxml parser (~4x speedup)
- Add raw HTML & local file crawling support
- Fix cache headers & async cleanup
- Add browser process monitoring
- Optimize BeautifulSoup operations
- Pre-compile regex patterns

Breaking: Raw HTML handling requires new URL prefixes
Fixes: #256, #253
2024-11-13 19:40:40 +08:00
UncleCode
61b93ebf36 Update change log 2024-11-13 15:38:30 +08:00
UncleCode
bf91adf3f8 fix: Resolve unexpected BrowserContext closure during crawl in Docker
- Removed __del__ method in AsyncPlaywrightCrawlerStrategy to ensure reliable browser lifecycle management by using explicit context managers.
- Added process monitoring in ManagedBrowser to detect and log unexpected terminations of the browser subprocess.
- Updated Docker configuration to expose port 9222 for remote debugging and allocate extra shared memory to prevent browser crashes.
- Improved error handling and resource cleanup for browser instances, particularly in Docker environments.

Resolves Issue #256
2024-11-13 15:37:16 +08:00
UncleCode
b6d6631b12 Enhance Async Crawler with Playwright support
- Implemented new async crawler strategy using Playwright.
- Introduced ManagedBrowser for better browser management.
- Added support for persistent browser sessions and improved error handling.
- Updated version from 0.3.73 to 0.3.731.
- Enhanced logic in main.py for conditional mounting of static files.
- Updated requirements to replace playwright_stealth with tf-playwright-stealth.
2024-11-12 12:10:58 +08:00
22 changed files with 5016 additions and 159 deletions

1
.gitignore vendored
View File

@@ -199,6 +199,7 @@ test_env/
**/.DS_Store
todo.md
todo_executor.md
git_changes.py
git_changes.md
pypi_build.sh

View File

@@ -1,6 +1,36 @@
# Changelog
# CHANGELOG
# Changelog - November 13, 2024
### Added
- Support for raw HTML and local file crawling via URL prefixes ('raw:', 'file://')
- Browser process monitoring for managed browser instances
- Screenshot capability for raw HTML and local file content
- Response headers storage in cache database
- New `fit_markdown` flag for optional markdown generation
### Changed
- Switched HTML parser from 'html.parser' to 'lxml' for ~4x performance improvement
- Optimized BeautifulSoup text conversion and element selection
- Pre-compiled regular expressions for better performance
- Improved metadata extraction efficiency
- Response headers now stored alongside HTML in cache
### Removed
- `__del__` method from AsyncPlaywrightCrawlerStrategy to prevent async cleanup issues
### Fixed
- Issue #256: Added support for crawling raw HTML content
- Issue #253: Implemented file:// protocol handling
- Missing response headers in cached results
- Memory leaks from improper async cleanup
## [v0.3.731] - 2024-11-13 Changelog for Issue 256 Fix
- Fixed: Browser context unexpectedly closing in Docker environment during crawl operations.
- Removed: __del__ method from AsyncPlaywrightCrawlerStrategy to prevent unreliable asynchronous cleanup, ensuring - browser context is closed explicitly within context managers.
- Added: Monitoring for ManagedBrowser subprocess to detect and log unexpected terminations.
- Updated: Dockerfile configurations to expose debugging port (9222) and allocate additional shared memory for improved browser stability.
- Improved: Error handling and resource cleanup processes for browser lifecycle management within the Docker environment.
## [v0.3.73] - 2024-11-05
@@ -180,7 +210,7 @@ This commit introduces several key enhancements, including improved error handli
## [v0.3.72] - 2024-10-20
### Fixed
- Added support for parsing Base64 encoded images in WebScrappingStrategy
- Added support for parsing Base64 encoded images in WebScrapingStrategy
### Added
- Forked and integrated a customized version of the html2text library for more control over Markdown generation
@@ -203,7 +233,7 @@ This commit introduces several key enhancements, including improved error handli
### Developer Notes
- The customized html2text library is now located within the crawl4ai package
- New configuration options are available in the `config.py` file for external content handling
- The `WebScrappingStrategy` class has been updated to accommodate new external content exclusion options
- The `WebScrapingStrategy` class has been updated to accommodate new external content exclusion options
## [v0.3.71] - 2024-10-19
@@ -280,7 +310,7 @@ These updates aim to provide more flexibility in text processing, improve perfor
### Improvements
1. **Better Error Handling**:
- Enhanced error reporting in WebScrappingStrategy with detailed error messages and suggestions.
- Enhanced error reporting in WebScrapingStrategy with detailed error messages and suggestions.
- Added console message and error logging for better debugging.
2. **Image Processing Enhancements**:
@@ -345,7 +375,7 @@ These updates aim to provide more flexibility in text processing, improve perfor
- Allows for more customized setups.
### 2. Image Processing Optimization
- Enhanced image handling in WebScrappingStrategy.
- Enhanced image handling in WebScrapingStrategy.
- Added filtering for small, invisible, or irrelevant images.
- Improved image scoring system for better content relevance.
- Implemented JavaScript-based image dimension updating for more accurate representation.

View File

@@ -115,7 +115,12 @@ HEALTHCHECK --interval=30s --timeout=30s --start-period=5s --retries=3 \
CMD curl -f http://localhost:8000/health || exit 1
# Expose port
EXPOSE 8000
EXPOSE 8000 11235 9222 8080
# Optional: Increase shared memory size to prevent browser crashes
# when loading heavy pages
RUN mkdir /dev/shm
VOLUME /dev/shm
# Start the FastAPI server
CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "11235"]

View File

@@ -1,4 +1,4 @@
# 🔥🕷️ Crawl4AI: LLM Friendly Web Crawler & Scrapper
# 🔥🕷️ Crawl4AI: LLM Friendly Web Crawler & Scraper
<a href="https://trendshift.io/repositories/11716" target="_blank"><img src="https://trendshift.io/api/badge/repositories/11716" alt="unclecode%2Fcrawl4ai | Trendshift" style="width: 250px; height: 55px;" width="250" height="55"/></a>
@@ -127,6 +127,9 @@ docker pull unclecode/crawl4ai:gpu # GPU-enabled version
# Run the container
docker run -p 11235:11235 unclecode/crawl4ai:basic # Replace 'basic' with your chosen version
# In case to allocate more shared memory for the container
docker run --shm-size=2gb -p 11235:11235 unclecode/crawl4ai:basic
```
#### Option 2: Build from Repository

View File

@@ -26,5 +26,5 @@ if is_sync_version_installed():
print("Warning: Failed to import WebCrawler even though selenium is installed. This might be due to other missing dependencies.")
else:
WebCrawler = None
import warnings
print("Warning: Synchronous WebCrawler is not available. Install crawl4ai[sync] for synchronous support. However, please note that the synchronous version will be deprecated soon.")
# import warnings
# print("Warning: Synchronous WebCrawler is not available. Install crawl4ai[sync] for synchronous support. However, please note that the synchronous version will be deprecated soon.")

View File

@@ -1,2 +1,2 @@
# crawl4ai/_version.py
__version__ = "0.3.73"
__version__ = "0.3.731"

View File

@@ -64,12 +64,27 @@ class ManagedBrowser:
stdout=subprocess.PIPE,
stderr=subprocess.PIPE
)
# Monitor browser process output for errors
asyncio.create_task(self._monitor_browser_process())
await asyncio.sleep(2) # Give browser time to start
return f"http://localhost:{self.debugging_port}"
except Exception as e:
await self.cleanup()
raise Exception(f"Failed to start browser: {e}")
async def _monitor_browser_process(self):
"""Monitor the browser process for unexpected termination."""
if self.browser_process:
stdout, stderr = await asyncio.gather(
asyncio.to_thread(self.browser_process.stdout.read),
asyncio.to_thread(self.browser_process.stderr.read)
)
if self.browser_process.poll() is not None:
print(f"Browser process terminated unexpectedly with code {self.browser_process.returncode}")
print(f"STDOUT: {stdout.decode()}")
print(f"STDERR: {stderr.decode()}")
await self.cleanup()
def _get_browser_path(self) -> str:
"""Returns the browser executable path based on OS and browser type"""
if sys.platform == "darwin": # macOS
@@ -186,6 +201,8 @@ class AsyncPlaywrightCrawlerStrategy(AsyncCrawlerStrategy):
self.sleep_on_close = kwargs.get("sleep_on_close", False)
self.use_managed_browser = kwargs.get("use_managed_browser", False)
self.user_data_dir = kwargs.get("user_data_dir", None)
self.use_persistent_context = kwargs.get("use_persistent_context", False)
self.chrome_channel = kwargs.get("chrome_channel", "chrome")
self.managed_browser = None
self.default_context = None
self.hooks = {
@@ -197,6 +214,7 @@ class AsyncPlaywrightCrawlerStrategy(AsyncCrawlerStrategy):
'before_return_html': None,
'before_retrieve_html': None
}
self.extra_args = kwargs.get("extra_args", [])
async def __aenter__(self):
await self.start()
@@ -238,36 +256,71 @@ class AsyncPlaywrightCrawlerStrategy(AsyncCrawlerStrategy):
"User-Agent": self.user_agent
})
else:
# Base browser arguments
browser_args = {
"headless": self.headless,
"args": [
"--disable-gpu",
"--no-sandbox",
"--disable-dev-shm-usage",
"--disable-blink-features=AutomationControlled",
"--no-first-run",
"--no-default-browser-check",
"--disable-infobars",
"--window-position=0,0",
"--ignore-certificate-errors",
"--ignore-certificate-errors-spki-list",
# "--headless=new", # Use the new headless mode
]
}
# Add channel if specified (try Chrome first)
if self.chrome_channel:
browser_args["channel"] = self.chrome_channel
# Add extra args if provided
if self.extra_args:
browser_args["args"].extend(self.extra_args)
# Add proxy settings if a proxy is specified
if self.proxy:
proxy_settings = ProxySettings(server=self.proxy)
browser_args["proxy"] = proxy_settings
elif self.proxy_config:
proxy_settings = ProxySettings(server=self.proxy_config.get("server"), username=self.proxy_config.get("username"), password=self.proxy_config.get("password"))
proxy_settings = ProxySettings(
server=self.proxy_config.get("server"),
username=self.proxy_config.get("username"),
password=self.proxy_config.get("password")
)
browser_args["proxy"] = proxy_settings
# Select the appropriate browser based on the browser_type
if self.browser_type == "firefox":
self.browser = await self.playwright.firefox.launch(**browser_args)
elif self.browser_type == "webkit":
self.browser = await self.playwright.webkit.launch(**browser_args)
else:
self.browser = await self.playwright.chromium.launch(**browser_args)
try:
# Select the appropriate browser based on the browser_type
if self.browser_type == "firefox":
self.browser = await self.playwright.firefox.launch(**browser_args)
elif self.browser_type == "webkit":
self.browser = await self.playwright.webkit.launch(**browser_args)
else:
if self.use_persistent_context and self.user_data_dir:
self.browser = await self.playwright.chromium.launch_persistent_context(
user_data_dir=self.user_data_dir,
**browser_args
)
self.default_context = self.browser
else:
self.browser = await self.playwright.chromium.launch(**browser_args)
except Exception as e:
# Fallback to chromium if Chrome channel fails
if "chrome" in str(e) and browser_args.get("channel") == "chrome":
browser_args["channel"] = "chromium"
if self.use_persistent_context and self.user_data_dir:
self.browser = await self.playwright.chromium.launch_persistent_context(
user_data_dir=self.user_data_dir,
**browser_args
)
self.default_context = self.browser
else:
self.browser = await self.playwright.chromium.launch(**browser_args)
else:
raise
await self.execute_hook('on_browser_created', self.browser)
@@ -292,9 +345,10 @@ class AsyncPlaywrightCrawlerStrategy(AsyncCrawlerStrategy):
await self.playwright.stop()
self.playwright = None
def __del__(self):
if self.browser or self.playwright:
asyncio.get_event_loop().run_until_complete(self.close())
# Issue #256: Remove __del__ method to avoid potential issues with async cleanup
# def __del__(self):
# if self.browser or self.playwright:
# asyncio.get_event_loop().run_until_complete(self.close())
def set_hook(self, hook_type: str, hook: Callable):
if hook_type in self.hooks:
@@ -439,6 +493,75 @@ class AsyncPlaywrightCrawlerStrategy(AsyncCrawlerStrategy):
return page
async def crawl(self, url: str, **kwargs) -> AsyncCrawlResponse:
"""
Crawls a given URL or processes raw HTML/local file content based on the URL prefix.
Args:
url (str): The URL to crawl. Supported prefixes:
- 'http://' or 'https://': Web URL to crawl.
- 'file://': Local file path to process.
- 'raw:': Raw HTML content to process.
**kwargs: Additional parameters:
- 'screenshot' (bool): Whether to take a screenshot.
- ... [other existing parameters]
Returns:
AsyncCrawlResponse: The response containing HTML, headers, status code, and optional screenshot.
"""
response_headers = {}
status_code = 200 # Default to 200 for local/raw HTML
screenshot_requested = kwargs.get('screenshot', False)
screenshot_data = None
if url.startswith(('http://', 'https://')):
# Proceed with standard web crawling
return await self._crawl_web(url, **kwargs)
elif url.startswith('file://'):
# Process local file
local_file_path = url[7:] # Remove 'file://' prefix
if not os.path.exists(local_file_path):
raise FileNotFoundError(f"Local file not found: {local_file_path}")
with open(local_file_path, 'r', encoding='utf-8') as f:
html = f.read()
if screenshot_requested:
screenshot_data = await self._generate_screenshot_from_html(html)
return AsyncCrawlResponse(
html=html,
response_headers=response_headers,
status_code=status_code,
screenshot=screenshot_data,
get_delayed_content=None
)
elif url.startswith('raw:'):
# Process raw HTML content
raw_html = url[4:] # Remove 'raw:' prefix
html = raw_html
if screenshot_requested:
screenshot_data = await self._generate_screenshot_from_html(html)
return AsyncCrawlResponse(
html=html,
response_headers=response_headers,
status_code=status_code,
screenshot=screenshot_data,
get_delayed_content=None
)
else:
raise ValueError("URL must start with 'http://', 'https://', 'file://', or 'raw:'")
async def _crawl_web(self, url: str, **kwargs) -> AsyncCrawlResponse:
"""
Existing web crawling logic remains unchanged.
Args:
url (str): The web URL to crawl.
**kwargs: Additional parameters.
Returns:
AsyncCrawlResponse: The response containing HTML, headers, status code, and optional screenshot.
"""
response_headers = {}
status_code = None
@@ -461,24 +584,35 @@ class AsyncPlaywrightCrawlerStrategy(AsyncCrawlerStrategy):
if session_id:
context, page, _ = self.sessions.get(session_id, (None, None, None))
if not context:
if self.use_persistent_context and self.browser_type in ["chrome", "chromium"]:
# In persistent context, browser is the context
context = self.browser
page = await context.new_page()
else:
# Normal context creation for non-persistent or non-Chrome browsers
context = await self.browser.new_context(
user_agent=self.user_agent,
viewport={"width": 1920, "height": 1080},
proxy={"server": self.proxy} if self.proxy else None,
accept_downloads=True,
java_script_enabled=True
)
await context.add_cookies([{"name": "cookiesEnabled", "value": "true", "url": url}])
await context.set_extra_http_headers(self.headers)
page = await context.new_page()
self.sessions[session_id] = (context, page, time.time())
else:
if self.use_persistent_context and self.browser_type in ["chrome", "chromium"]:
# In persistent context, browser is the context
context = self.browser
else:
# Normal context creation
context = await self.browser.new_context(
user_agent=self.user_agent,
viewport={"width": 1920, "height": 1080},
proxy={"server": self.proxy} if self.proxy else None,
accept_downloads=True,
java_script_enabled=True
proxy={"server": self.proxy} if self.proxy else None
)
await context.add_cookies([{"name": "cookiesEnabled", "value": "true", "url": url}])
await context.set_extra_http_headers(self.headers)
page = await context.new_page()
self.sessions[session_id] = (context, page, time.time())
else:
context = await self.browser.new_context(
user_agent=self.user_agent,
viewport={"width": 1920, "height": 1080},
proxy={"server": self.proxy} if self.proxy else None
)
await context.set_extra_http_headers(self.headers)
if kwargs.get("override_navigator", False) or kwargs.get("simulate_user", False) or kwargs.get("magic", False):
# Inject scripts to override navigator properties
@@ -512,7 +646,8 @@ class AsyncPlaywrightCrawlerStrategy(AsyncCrawlerStrategy):
""")
page = await context.new_page()
# await stealth_async(page) #, stealth_config)
if kwargs.get("magic", False):
await stealth_async(page, stealth_config)
# Add console message and error logging
if kwargs.get("log_console", False):
@@ -544,8 +679,12 @@ class AsyncPlaywrightCrawlerStrategy(AsyncCrawlerStrategy):
if not kwargs.get("js_only", False):
await self.execute_hook('before_goto', page)
response = await page.goto(
url, wait_until="domcontentloaded", timeout=kwargs.get("page_timeout", 60000)
url,
# wait_until=kwargs.get("wait_until", ["domcontentloaded", "networkidle"]),
wait_until=kwargs.get("wait_until", "domcontentloaded"),
timeout=kwargs.get("page_timeout", 60000)
)
# response = await page.goto("about:blank")
@@ -722,7 +861,7 @@ class AsyncPlaywrightCrawlerStrategy(AsyncCrawlerStrategy):
if self.verbose:
print(f"[LOG] ✅ Crawled {url} successfully!")
if self.use_cached_html:
cache_file_path = os.path.join(
Path.home(), ".crawl4ai", "cache", hashlib.md5(url.encode()).hexdigest()
@@ -902,6 +1041,15 @@ class AsyncPlaywrightCrawlerStrategy(AsyncCrawlerStrategy):
print(f"Warning: Failed to remove overlay elements: {str(e)}")
async def take_screenshot(self, page: Page) -> str:
"""
Takes a screenshot of the current page.
Args:
page (Page): The Playwright page instance
Returns:
str: Base64-encoded screenshot image
"""
try:
# The page is already loaded, just take the screenshot
screenshot = await page.screenshot(full_page=True)
@@ -921,4 +1069,36 @@ class AsyncPlaywrightCrawlerStrategy(AsyncCrawlerStrategy):
return base64.b64encode(buffered.getvalue()).decode('utf-8')
finally:
await page.close()
async def _generate_screenshot_from_html(self, html: str) -> Optional[str]:
"""
Generates a screenshot from raw HTML content.
Args:
html (str): The HTML content to render and capture.
Returns:
Optional[str]: Base64-encoded screenshot image or an error image if failed.
"""
try:
if not self.browser:
await self.start()
page = await self.browser.new_page()
await page.set_content(html, wait_until='networkidle')
screenshot = await page.screenshot(full_page=True)
await page.close()
return base64.b64encode(screenshot).decode('utf-8')
except Exception as e:
error_message = f"Failed to take screenshot: {str(e)}"
print(error_message)
# Generate an error image
img = Image.new('RGB', (800, 600), color='black')
draw = ImageDraw.Draw(img)
font = ImageFont.load_default()
draw.text((10, 10), error_message, fill=(255, 255, 255), font=font)
buffered = BytesIO()
img.save(buffered, format="JPEG")
return base64.b64encode(buffered.getvalue()).decode('utf-8')

View File

@@ -0,0 +1,965 @@
import asyncio
import base64
import time
from abc import ABC, abstractmethod
from typing import Callable, Dict, Any, List, Optional, Awaitable
import os, sys, shutil
import tempfile, subprocess
from playwright.async_api import async_playwright, Page, Browser, Error
from io import BytesIO
from PIL import Image, ImageDraw, ImageFont
from pathlib import Path
from playwright.async_api import ProxySettings
from pydantic import BaseModel
import hashlib
import json
import uuid
from playwright_stealth import StealthConfig, stealth_async
stealth_config = StealthConfig(
webdriver=True,
chrome_app=True,
chrome_csi=True,
chrome_load_times=True,
chrome_runtime=True,
navigator_languages=True,
navigator_plugins=True,
navigator_permissions=True,
webgl_vendor=True,
outerdimensions=True,
navigator_hardware_concurrency=True,
media_codecs=True,
)
class ManagedBrowser:
def __init__(self, browser_type: str = "chromium", user_data_dir: Optional[str] = None, headless: bool = False):
self.browser_type = browser_type
self.user_data_dir = user_data_dir
self.headless = headless
self.browser_process = None
self.temp_dir = None
self.debugging_port = 9222
async def start(self) -> str:
"""
Starts the browser process and returns the CDP endpoint URL.
If user_data_dir is not provided, creates a temporary directory.
"""
# Create temp dir if needed
if not self.user_data_dir:
self.temp_dir = tempfile.mkdtemp(prefix="browser-profile-")
self.user_data_dir = self.temp_dir
# Get browser path and args based on OS and browser type
browser_path = self._get_browser_path()
args = self._get_browser_args()
# Start browser process
try:
self.browser_process = subprocess.Popen(
args,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE
)
await asyncio.sleep(2) # Give browser time to start
return f"http://localhost:{self.debugging_port}"
except Exception as e:
await self.cleanup()
raise Exception(f"Failed to start browser: {e}")
def _get_browser_path(self) -> str:
"""Returns the browser executable path based on OS and browser type"""
if sys.platform == "darwin": # macOS
paths = {
"chromium": "/Applications/Google Chrome.app/Contents/MacOS/Google Chrome",
"firefox": "/Applications/Firefox.app/Contents/MacOS/firefox",
"webkit": "/Applications/Safari.app/Contents/MacOS/Safari"
}
elif sys.platform == "win32": # Windows
paths = {
"chromium": "C:\\Program Files\\Google\\Chrome\\Application\\chrome.exe",
"firefox": "C:\\Program Files\\Mozilla Firefox\\firefox.exe",
"webkit": None # WebKit not supported on Windows
}
else: # Linux
paths = {
"chromium": "google-chrome",
"firefox": "firefox",
"webkit": None # WebKit not supported on Linux
}
return paths.get(self.browser_type)
def _get_browser_args(self) -> List[str]:
"""Returns browser-specific command line arguments"""
base_args = [self._get_browser_path()]
if self.browser_type == "chromium":
args = [
f"--remote-debugging-port={self.debugging_port}",
f"--user-data-dir={self.user_data_dir}",
]
if self.headless:
args.append("--headless=new")
elif self.browser_type == "firefox":
args = [
"--remote-debugging-port", str(self.debugging_port),
"--profile", self.user_data_dir,
]
if self.headless:
args.append("--headless")
else:
raise NotImplementedError(f"Browser type {self.browser_type} not supported")
return base_args + args
async def cleanup(self):
"""Cleanup browser process and temporary directory"""
if self.browser_process:
try:
self.browser_process.terminate()
await asyncio.sleep(1)
if self.browser_process.poll() is None:
self.browser_process.kill()
except Exception as e:
print(f"Error terminating browser: {e}")
if self.temp_dir and os.path.exists(self.temp_dir):
try:
shutil.rmtree(self.temp_dir)
except Exception as e:
print(f"Error removing temporary directory: {e}")
class AsyncCrawlResponse(BaseModel):
html: str
response_headers: Dict[str, str]
status_code: int
screenshot: Optional[str] = None
get_delayed_content: Optional[Callable[[Optional[float]], Awaitable[str]]] = None
class Config:
arbitrary_types_allowed = True
class AsyncCrawlerStrategy(ABC):
@abstractmethod
async def crawl(self, url: str, **kwargs) -> AsyncCrawlResponse:
pass
@abstractmethod
async def crawl_many(self, urls: List[str], **kwargs) -> List[AsyncCrawlResponse]:
pass
@abstractmethod
async def take_screenshot(self, **kwargs) -> str:
pass
@abstractmethod
def update_user_agent(self, user_agent: str):
pass
@abstractmethod
def set_hook(self, hook_type: str, hook: Callable):
pass
class AsyncPlaywrightCrawlerStrategy(AsyncCrawlerStrategy):
def __init__(self, use_cached_html=False, js_code=None, **kwargs):
self.use_cached_html = use_cached_html
self.user_agent = kwargs.get(
"user_agent",
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 "
"(KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36"
)
self.proxy = kwargs.get("proxy")
self.proxy_config = kwargs.get("proxy_config")
self.headless = kwargs.get("headless", True)
self.browser_type = kwargs.get("browser_type", "chromium")
self.headers = kwargs.get("headers", {})
self.sessions = {}
self.session_ttl = 1800
self.js_code = js_code
self.verbose = kwargs.get("verbose", False)
self.playwright = None
self.browser = None
self.sleep_on_close = kwargs.get("sleep_on_close", False)
self.use_managed_browser = kwargs.get("use_managed_browser", False)
self.user_data_dir = kwargs.get("user_data_dir", None)
self.use_persistent_context = kwargs.get("use_persistent_context", False)
self.chrome_channel = kwargs.get("chrome_channel", "chrome")
self.managed_browser = None
self.default_context = None
self.hooks = {
'on_browser_created': None,
'on_user_agent_updated': None,
'on_execution_started': None,
'before_goto': None,
'after_goto': None,
'before_return_html': None,
'before_retrieve_html': None
}
self.extra_args = kwargs.get("extra_args", [])
async def __aenter__(self):
await self.start()
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
await self.close()
async def start(self):
if self.playwright is None:
self.playwright = await async_playwright().start()
if self.browser is None:
if self.use_managed_browser:
# Use managed browser approach
self.managed_browser = ManagedBrowser(
browser_type=self.browser_type,
user_data_dir=self.user_data_dir,
headless=self.headless
)
cdp_url = await self.managed_browser.start()
self.browser = await self.playwright.chromium.connect_over_cdp(cdp_url)
# Get the default context that maintains the user profile
contexts = self.browser.contexts
if contexts:
self.default_context = contexts[0]
else:
# If no default context exists, create one
self.default_context = await self.browser.new_context(
viewport={"width": 1920, "height": 1080}
)
# Set up the default context
if self.default_context:
await self.default_context.set_extra_http_headers(self.headers)
if self.user_agent:
await self.default_context.set_extra_http_headers({
"User-Agent": self.user_agent
})
else:
browser_args = {
"headless": self.headless,
"args": [
"--disable-gpu",
"--no-sandbox",
"--disable-dev-shm-usage",
"--disable-blink-features=AutomationControlled",
"--disable-infobars",
"--window-position=0,0",
"--ignore-certificate-errors",
"--ignore-certificate-errors-spki-list",
# "--disable-http2",
# "--headless=new", # Use the new headless mode
]
}
# Add extra args if provided
if self.extra_args:
browser_args["args"].extend(self.extra_args)
# Add proxy settings if a proxy is specified
if self.proxy:
proxy_settings = ProxySettings(server=self.proxy)
browser_args["proxy"] = proxy_settings
elif self.proxy_config:
proxy_settings = ProxySettings(server=self.proxy_config.get("server"), username=self.proxy_config.get("username"), password=self.proxy_config.get("password"))
browser_args["proxy"] = proxy_settings
# Select the appropriate browser based on the browser_type
if self.browser_type == "firefox":
self.browser = await self.playwright.firefox.launch(**browser_args)
elif self.browser_type == "webkit":
self.browser = await self.playwright.webkit.launch(**browser_args)
else:
self.browser = await self.playwright.chromium.launch(**browser_args)
# Update the headless configuration
if self.headless:
# Use the new headless mode explicitly
browser_args["args"].append("--headless=new")
await self.execute_hook('on_browser_created', self.browser)
async def close(self):
if self.sleep_on_close:
await asyncio.sleep(0.5)
# Close all active sessions
session_ids = list(self.sessions.keys())
for session_id in session_ids:
await self.kill_session(session_id)
if self.browser:
await self.browser.close()
self.browser = None
if self.managed_browser:
await self.managed_browser.cleanup()
self.managed_browser = None
if self.playwright:
await self.playwright.stop()
self.playwright = None
def __del__(self):
if self.browser or self.playwright:
asyncio.get_event_loop().run_until_complete(self.close())
def set_hook(self, hook_type: str, hook: Callable):
if hook_type in self.hooks:
self.hooks[hook_type] = hook
else:
raise ValueError(f"Invalid hook type: {hook_type}")
async def execute_hook(self, hook_type: str, *args):
hook = self.hooks.get(hook_type)
if hook:
if asyncio.iscoroutinefunction(hook):
return await hook(*args)
else:
return hook(*args)
return args[0] if args else None
def update_user_agent(self, user_agent: str):
self.user_agent = user_agent
def set_custom_headers(self, headers: Dict[str, str]):
self.headers = headers
async def kill_session(self, session_id: str):
if session_id in self.sessions:
context, page, _ = self.sessions[session_id]
await page.close()
if not self.use_managed_browser:
await context.close()
del self.sessions[session_id]
def _cleanup_expired_sessions(self):
current_time = time.time()
expired_sessions = [
sid for sid, (_, _, last_used) in self.sessions.items()
if current_time - last_used > self.session_ttl
]
for sid in expired_sessions:
asyncio.create_task(self.kill_session(sid))
async def smart_wait(self, page: Page, wait_for: str, timeout: float = 30000):
wait_for = wait_for.strip()
if wait_for.startswith('js:'):
# Explicitly specified JavaScript
js_code = wait_for[3:].strip()
return await self.csp_compliant_wait(page, js_code, timeout)
elif wait_for.startswith('css:'):
# Explicitly specified CSS selector
css_selector = wait_for[4:].strip()
try:
await page.wait_for_selector(css_selector, timeout=timeout)
except Error as e:
if 'Timeout' in str(e):
raise TimeoutError(f"Timeout after {timeout}ms waiting for selector '{css_selector}'")
else:
raise ValueError(f"Invalid CSS selector: '{css_selector}'")
else:
# Auto-detect based on content
if wait_for.startswith('()') or wait_for.startswith('function'):
# It's likely a JavaScript function
return await self.csp_compliant_wait(page, wait_for, timeout)
else:
# Assume it's a CSS selector first
try:
await page.wait_for_selector(wait_for, timeout=timeout)
except Error as e:
if 'Timeout' in str(e):
raise TimeoutError(f"Timeout after {timeout}ms waiting for selector '{wait_for}'")
else:
# If it's not a timeout error, it might be an invalid selector
# Let's try to evaluate it as a JavaScript function as a fallback
try:
return await self.csp_compliant_wait(page, f"() => {{{wait_for}}}", timeout)
except Error:
raise ValueError(f"Invalid wait_for parameter: '{wait_for}'. "
"It should be either a valid CSS selector, a JavaScript function, "
"or explicitly prefixed with 'js:' or 'css:'.")
async def csp_compliant_wait(self, page: Page, user_wait_function: str, timeout: float = 30000):
wrapper_js = f"""
async () => {{
const userFunction = {user_wait_function};
const startTime = Date.now();
while (true) {{
if (await userFunction()) {{
return true;
}}
if (Date.now() - startTime > {timeout}) {{
throw new Error('Timeout waiting for condition');
}}
await new Promise(resolve => setTimeout(resolve, 100));
}}
}}
"""
try:
await page.evaluate(wrapper_js)
except TimeoutError:
raise TimeoutError(f"Timeout after {timeout}ms waiting for condition")
except Exception as e:
raise RuntimeError(f"Error in wait condition: {str(e)}")
async def process_iframes(self, page):
# Find all iframes
iframes = await page.query_selector_all('iframe')
for i, iframe in enumerate(iframes):
try:
# Add a unique identifier to the iframe
await iframe.evaluate(f'(element) => element.id = "iframe-{i}"')
# Get the frame associated with this iframe
frame = await iframe.content_frame()
if frame:
# Wait for the frame to load
await frame.wait_for_load_state('load', timeout=30000) # 30 seconds timeout
# Extract the content of the iframe's body
iframe_content = await frame.evaluate('() => document.body.innerHTML')
# Generate a unique class name for this iframe
class_name = f'extracted-iframe-content-{i}'
# Replace the iframe with a div containing the extracted content
_iframe = iframe_content.replace('`', '\\`')
await page.evaluate(f"""
() => {{
const iframe = document.getElementById('iframe-{i}');
const div = document.createElement('div');
div.innerHTML = `{_iframe}`;
div.className = '{class_name}';
iframe.replaceWith(div);
}}
""")
else:
print(f"Warning: Could not access content frame for iframe {i}")
except Exception as e:
print(f"Error processing iframe {i}: {str(e)}")
# Return the page object
return page
async def crawl(self, url: str, **kwargs) -> AsyncCrawlResponse:
response_headers = {}
status_code = None
self._cleanup_expired_sessions()
session_id = kwargs.get("session_id")
# Handle page creation differently for managed browser
if self.use_managed_browser:
if session_id:
# Reuse existing session if available
context, page, _ = self.sessions.get(session_id, (None, None, None))
if not page:
# Create new page in default context if session doesn't exist
page = await self.default_context.new_page()
self.sessions[session_id] = (self.default_context, page, time.time())
else:
# Create new page in default context for non-session requests
page = await self.default_context.new_page()
else:
if session_id:
context, page, _ = self.sessions.get(session_id, (None, None, None))
if not context:
context = await self.browser.new_context(
user_agent=self.user_agent,
viewport={"width": 1920, "height": 1080},
proxy={"server": self.proxy} if self.proxy else None,
accept_downloads=True,
java_script_enabled=True
)
await context.add_cookies([{"name": "cookiesEnabled", "value": "true", "url": url}])
await context.set_extra_http_headers(self.headers)
page = await context.new_page()
self.sessions[session_id] = (context, page, time.time())
else:
context = await self.browser.new_context(
user_agent=self.user_agent,
viewport={"width": 1920, "height": 1080},
proxy={"server": self.proxy} if self.proxy else None
)
await context.set_extra_http_headers(self.headers)
if kwargs.get("override_navigator", False) or kwargs.get("simulate_user", False) or kwargs.get("magic", False):
# Inject scripts to override navigator properties
await context.add_init_script("""
// Pass the Permissions Test.
const originalQuery = window.navigator.permissions.query;
window.navigator.permissions.query = (parameters) => (
parameters.name === 'notifications' ?
Promise.resolve({ state: Notification.permission }) :
originalQuery(parameters)
);
Object.defineProperty(navigator, 'webdriver', {
get: () => undefined
});
window.navigator.chrome = {
runtime: {},
// Add other properties if necessary
};
Object.defineProperty(navigator, 'plugins', {
get: () => [1, 2, 3, 4, 5],
});
Object.defineProperty(navigator, 'languages', {
get: () => ['en-US', 'en'],
});
Object.defineProperty(document, 'hidden', {
get: () => false
});
Object.defineProperty(document, 'visibilityState', {
get: () => 'visible'
});
""")
page = await context.new_page()
if kwargs.get("magic", False):
await stealth_async(page, stealth_config)
# Add console message and error logging
if kwargs.get("log_console", False):
page.on("console", lambda msg: print(f"Console: {msg.text}"))
page.on("pageerror", lambda exc: print(f"Page Error: {exc}"))
try:
if self.verbose:
print(f"[LOG] 🕸️ Crawling {url} using AsyncPlaywrightCrawlerStrategy...")
if self.use_cached_html:
cache_file_path = os.path.join(
Path.home(), ".crawl4ai", "cache", hashlib.md5(url.encode()).hexdigest()
)
if os.path.exists(cache_file_path):
html = ""
with open(cache_file_path, "r") as f:
html = f.read()
# retrieve response headers and status code from cache
with open(cache_file_path + ".meta", "r") as f:
meta = json.load(f)
response_headers = meta.get("response_headers", {})
status_code = meta.get("status_code")
response = AsyncCrawlResponse(
html=html, response_headers=response_headers, status_code=status_code
)
return response
if not kwargs.get("js_only", False):
await self.execute_hook('before_goto', page)
# response = await page.goto(
# url, wait_until="domcontentloaded", timeout=kwargs.get("page_timeout", 60000)
# )
# Add retry logic for HTTP2 errors
max_retries = kwargs.get("max_retries", 3)
current_try = 0
while current_try < max_retries:
try:
response = await page.goto(
url,
# wait_until=kwargs.get("wait_until", ["domcontentloaded", "networkidle"]),
wait_until=kwargs.get("wait_until", "networkidle"),
timeout=kwargs.get("page_timeout", 60000)
)
break
except Exception as e:
current_try += 1
if "ERR_HTTP2_PROTOCOL_ERROR" in str(e):
if current_try < max_retries:
# Add exponential backoff
await asyncio.sleep(2 ** current_try)
# Try with different protocol
if 'args' not in kwargs:
kwargs['args'] = []
kwargs['args'].extend(['--disable-http2'])
continue
if current_try == max_retries:
raise
# response = await page.goto("about:blank")
# await page.evaluate(f"window.location.href = '{url}'")
await self.execute_hook('after_goto', page)
# Get status code and headers
status_code = response.status
response_headers = response.headers
else:
status_code = 200
response_headers = {}
# Replace the current wait_for_selector line with this more robust check:
try:
# First wait for body to exist, regardless of visibility
await page.wait_for_selector('body', state='attached', timeout=30000)
# Then wait for it to become visible by checking CSS
await page.wait_for_function("""
() => {
const body = document.body;
const style = window.getComputedStyle(body);
return style.display !== 'none' &&
style.visibility !== 'hidden' &&
style.opacity !== '0';
}
""", timeout=30000)
except Error as e:
# If waiting fails, let's try to diagnose the issue
visibility_info = await page.evaluate("""
() => {
const body = document.body;
const style = window.getComputedStyle(body);
return {
display: style.display,
visibility: style.visibility,
opacity: style.opacity,
hasContent: body.innerHTML.length,
classList: Array.from(body.classList)
}
}
""")
if self.verbose:
print(f"Body visibility debug info: {visibility_info}")
# Even if body is hidden, we might still want to proceed
if kwargs.get('ignore_body_visibility', True):
if self.verbose:
print("Proceeding despite hidden body...")
pass
else:
raise Error(f"Body element is hidden: {visibility_info}")
await page.evaluate("window.scrollTo(0, document.body.scrollHeight)")
js_code = kwargs.get("js_code", kwargs.get("js", self.js_code))
if js_code:
if isinstance(js_code, str):
await page.evaluate(js_code)
elif isinstance(js_code, list):
for js in js_code:
await page.evaluate(js)
await page.wait_for_load_state('networkidle')
# Check for on execution event
await self.execute_hook('on_execution_started', page)
if kwargs.get("simulate_user", False) or kwargs.get("magic", False):
# Simulate user interactions
await page.mouse.move(100, 100)
await page.mouse.down()
await page.mouse.up()
await page.keyboard.press('ArrowDown')
# Handle the wait_for parameter
wait_for = kwargs.get("wait_for")
if wait_for:
try:
await self.smart_wait(page, wait_for, timeout=kwargs.get("page_timeout", 60000))
except Exception as e:
raise RuntimeError(f"Wait condition failed: {str(e)}")
# Update image dimensions
update_image_dimensions_js = """
() => {
return new Promise((resolve) => {
const filterImage = (img) => {
// Filter out images that are too small
if (img.width < 100 && img.height < 100) return false;
// Filter out images that are not visible
const rect = img.getBoundingClientRect();
if (rect.width === 0 || rect.height === 0) return false;
// Filter out images with certain class names (e.g., icons, thumbnails)
if (img.classList.contains('icon') || img.classList.contains('thumbnail')) return false;
// Filter out images with certain patterns in their src (e.g., placeholder images)
if (img.src.includes('placeholder') || img.src.includes('icon')) return false;
return true;
};
const images = Array.from(document.querySelectorAll('img')).filter(filterImage);
let imagesLeft = images.length;
if (imagesLeft === 0) {
resolve();
return;
}
const checkImage = (img) => {
if (img.complete && img.naturalWidth !== 0) {
img.setAttribute('width', img.naturalWidth);
img.setAttribute('height', img.naturalHeight);
imagesLeft--;
if (imagesLeft === 0) resolve();
}
};
images.forEach(img => {
checkImage(img);
if (!img.complete) {
img.onload = () => {
checkImage(img);
};
img.onerror = () => {
imagesLeft--;
if (imagesLeft === 0) resolve();
};
}
});
// Fallback timeout of 5 seconds
// setTimeout(() => resolve(), 5000);
resolve();
});
}
"""
await page.evaluate(update_image_dimensions_js)
# Wait a bit for any onload events to complete
await page.wait_for_timeout(100)
# Process iframes
if kwargs.get("process_iframes", False):
page = await self.process_iframes(page)
await self.execute_hook('before_retrieve_html', page)
# Check if delay_before_return_html is set then wait for that time
delay_before_return_html = kwargs.get("delay_before_return_html")
if delay_before_return_html:
await asyncio.sleep(delay_before_return_html)
# Check for remove_overlay_elements parameter
if kwargs.get("remove_overlay_elements", False):
await self.remove_overlay_elements(page)
html = await page.content()
await self.execute_hook('before_return_html', page, html)
# Check if kwargs has screenshot=True then take screenshot
screenshot_data = None
if kwargs.get("screenshot"):
# Check we have screenshot_wait_for parameter, if we have simply wait for that time
screenshot_wait_for = kwargs.get("screenshot_wait_for")
if screenshot_wait_for:
await asyncio.sleep(screenshot_wait_for)
screenshot_data = await self.take_screenshot(page)
if self.verbose:
print(f"[LOG] ✅ Crawled {url} successfully!")
if self.use_cached_html:
cache_file_path = os.path.join(
Path.home(), ".crawl4ai", "cache", hashlib.md5(url.encode()).hexdigest()
)
with open(cache_file_path, "w", encoding="utf-8") as f:
f.write(html)
# store response headers and status code in cache
with open(cache_file_path + ".meta", "w", encoding="utf-8") as f:
json.dump({
"response_headers": response_headers,
"status_code": status_code
}, f)
async def get_delayed_content(delay: float = 5.0) -> str:
if self.verbose:
print(f"[LOG] Waiting for {delay} seconds before retrieving content for {url}")
await asyncio.sleep(delay)
return await page.content()
response = AsyncCrawlResponse(
html=html,
response_headers=response_headers,
status_code=status_code,
screenshot=screenshot_data,
get_delayed_content=get_delayed_content
)
return response
except Error as e:
raise Error(f"[ERROR] 🚫 crawl(): Failed to crawl {url}: {str(e)}")
# finally:
# if not session_id:
# await page.close()
# await context.close()
async def crawl_many(self, urls: List[str], **kwargs) -> List[AsyncCrawlResponse]:
semaphore_count = kwargs.get('semaphore_count', 5) # Adjust as needed
semaphore = asyncio.Semaphore(semaphore_count)
async def crawl_with_semaphore(url):
async with semaphore:
return await self.crawl(url, **kwargs)
tasks = [crawl_with_semaphore(url) for url in urls]
results = await asyncio.gather(*tasks, return_exceptions=True)
return [result if not isinstance(result, Exception) else str(result) for result in results]
async def remove_overlay_elements(self, page: Page) -> None:
"""
Removes popup overlays, modals, cookie notices, and other intrusive elements from the page.
Args:
page (Page): The Playwright page instance
"""
remove_overlays_js = """
async () => {
// Function to check if element is visible
const isVisible = (elem) => {
const style = window.getComputedStyle(elem);
return style.display !== 'none' &&
style.visibility !== 'hidden' &&
style.opacity !== '0';
};
// Common selectors for popups and overlays
const commonSelectors = [
// Close buttons first
'button[class*="close" i]', 'button[class*="dismiss" i]',
'button[aria-label*="close" i]', 'button[title*="close" i]',
'a[class*="close" i]', 'span[class*="close" i]',
// Cookie notices
'[class*="cookie-banner" i]', '[id*="cookie-banner" i]',
'[class*="cookie-consent" i]', '[id*="cookie-consent" i]',
// Newsletter/subscription dialogs
'[class*="newsletter" i]', '[class*="subscribe" i]',
// Generic popups/modals
'[class*="popup" i]', '[class*="modal" i]',
'[class*="overlay" i]', '[class*="dialog" i]',
'[role="dialog"]', '[role="alertdialog"]'
];
// Try to click close buttons first
for (const selector of commonSelectors.slice(0, 6)) {
const closeButtons = document.querySelectorAll(selector);
for (const button of closeButtons) {
if (isVisible(button)) {
try {
button.click();
await new Promise(resolve => setTimeout(resolve, 100));
} catch (e) {
console.log('Error clicking button:', e);
}
}
}
}
// Remove remaining overlay elements
const removeOverlays = () => {
// Find elements with high z-index
const allElements = document.querySelectorAll('*');
for (const elem of allElements) {
const style = window.getComputedStyle(elem);
const zIndex = parseInt(style.zIndex);
const position = style.position;
if (
isVisible(elem) &&
(zIndex > 999 || position === 'fixed' || position === 'absolute') &&
(
elem.offsetWidth > window.innerWidth * 0.5 ||
elem.offsetHeight > window.innerHeight * 0.5 ||
style.backgroundColor.includes('rgba') ||
parseFloat(style.opacity) < 1
)
) {
elem.remove();
}
}
// Remove elements matching common selectors
for (const selector of commonSelectors) {
const elements = document.querySelectorAll(selector);
elements.forEach(elem => {
if (isVisible(elem)) {
elem.remove();
}
});
}
};
// Remove overlay elements
removeOverlays();
// Remove any fixed/sticky position elements at the top/bottom
const removeFixedElements = () => {
const elements = document.querySelectorAll('*');
elements.forEach(elem => {
const style = window.getComputedStyle(elem);
if (
(style.position === 'fixed' || style.position === 'sticky') &&
isVisible(elem)
) {
elem.remove();
}
});
};
removeFixedElements();
// Remove empty block elements as: div, p, span, etc.
const removeEmptyBlockElements = () => {
const blockElements = document.querySelectorAll('div, p, span, section, article, header, footer, aside, nav, main, ul, ol, li, dl, dt, dd, h1, h2, h3, h4, h5, h6');
blockElements.forEach(elem => {
if (elem.innerText.trim() === '') {
elem.remove();
}
});
};
// Remove margin-right and padding-right from body (often added by modal scripts)
document.body.style.marginRight = '0px';
document.body.style.paddingRight = '0px';
document.body.style.overflow = 'auto';
// Wait a bit for any animations to complete
await new Promise(resolve => setTimeout(resolve, 100));
}
"""
try:
await page.evaluate(remove_overlays_js)
await page.wait_for_timeout(500) # Wait for any animations to complete
except Exception as e:
if self.verbose:
print(f"Warning: Failed to remove overlay elements: {str(e)}")
async def take_screenshot(self, page: Page) -> str:
try:
# The page is already loaded, just take the screenshot
screenshot = await page.screenshot(full_page=True)
return base64.b64encode(screenshot).decode('utf-8')
except Exception as e:
error_message = f"Failed to take screenshot: {str(e)}"
print(error_message)
# Generate an error image
img = Image.new('RGB', (800, 600), color='black')
draw = ImageDraw.Draw(img)
font = ImageFont.load_default()
draw.text((10, 10), error_message, fill=(255, 255, 255), font=font)
buffered = BytesIO()
img.save(buffered, format="JPEG")
return base64.b64encode(buffered.getvalue()).decode('utf-8')
finally:
await page.close()

View File

@@ -5,6 +5,7 @@ import asyncio
from typing import Optional, Tuple, Dict
from contextlib import asynccontextmanager
import logging
import json # Added for serialization/deserialization
# Set up logging
logging.basicConfig(level=logging.INFO)
@@ -89,7 +90,8 @@ class AsyncDatabaseManager:
media TEXT DEFAULT "{}",
links TEXT DEFAULT "{}",
metadata TEXT DEFAULT "{}",
screenshot TEXT DEFAULT ""
screenshot TEXT DEFAULT "",
response_headers TEXT DEFAULT "{}" -- New column added
)
''')
@@ -105,26 +107,51 @@ class AsyncDatabaseManager:
column_names = await self.execute_with_retry(_check_columns)
for column in ['media', 'links', 'metadata', 'screenshot']:
# List of new columns to add
new_columns = ['media', 'links', 'metadata', 'screenshot', 'response_headers']
for column in new_columns:
if column not in column_names:
await self.aalter_db_add_column(column)
async def aalter_db_add_column(self, new_column: str):
"""Add new column to the database"""
async def _alter(db):
await db.execute(f'ALTER TABLE crawled_data ADD COLUMN {new_column} TEXT DEFAULT ""')
if new_column == 'response_headers':
await db.execute(f'ALTER TABLE crawled_data ADD COLUMN {new_column} TEXT DEFAULT "{{}}"')
else:
await db.execute(f'ALTER TABLE crawled_data ADD COLUMN {new_column} TEXT DEFAULT ""')
logger.info(f"Added column '{new_column}' to the database.")
await self.execute_with_retry(_alter)
async def aget_cached_url(self, url: str) -> Optional[Tuple[str, str, str, str, str, str, str, bool, str]]:
async def aget_cached_url(self, url: str) -> Optional[Tuple[str, str, str, str, str, bool, str, str, str, str]]:
"""Retrieve cached URL data"""
async def _get(db):
async with db.execute(
'SELECT url, html, cleaned_html, markdown, extracted_content, success, media, links, metadata, screenshot FROM crawled_data WHERE url = ?',
'''
SELECT url, html, cleaned_html, markdown, extracted_content, success, media, links, metadata, screenshot, response_headers
FROM crawled_data WHERE url = ?
''',
(url,)
) as cursor:
return await cursor.fetchone()
row = await cursor.fetchone()
if row:
# Deserialize JSON fields
return (
row[0], # url
row[1], # html
row[2], # cleaned_html
row[3], # markdown
row[4], # extracted_content
row[5], # success
json.loads(row[6] or '{}'), # media
json.loads(row[7] or '{}'), # links
json.loads(row[8] or '{}'), # metadata
row[9], # screenshot
json.loads(row[10] or '{}') # response_headers
)
return None
try:
return await self.execute_with_retry(_get)
@@ -132,12 +159,27 @@ class AsyncDatabaseManager:
logger.error(f"Error retrieving cached URL: {e}")
return None
async def acache_url(self, url: str, html: str, cleaned_html: str, markdown: str, extracted_content: str, success: bool, media: str = "{}", links: str = "{}", metadata: str = "{}", screenshot: str = ""):
async def acache_url(
self,
url: str,
html: str,
cleaned_html: str,
markdown: str,
extracted_content: str,
success: bool,
media: str = "{}",
links: str = "{}",
metadata: str = "{}",
screenshot: str = "",
response_headers: str = "{}" # New parameter added
):
"""Cache URL data with retry logic"""
async def _cache(db):
await db.execute('''
INSERT INTO crawled_data (url, html, cleaned_html, markdown, extracted_content, success, media, links, metadata, screenshot)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
INSERT INTO crawled_data (
url, html, cleaned_html, markdown, extracted_content, success, media, links, metadata, screenshot, response_headers
)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(url) DO UPDATE SET
html = excluded.html,
cleaned_html = excluded.cleaned_html,
@@ -147,8 +189,9 @@ class AsyncDatabaseManager:
media = excluded.media,
links = excluded.links,
metadata = excluded.metadata,
screenshot = excluded.screenshot
''', (url, html, cleaned_html, markdown, extracted_content, success, media, links, metadata, screenshot))
screenshot = excluded.screenshot,
response_headers = excluded.response_headers -- Update response_headers
''', (url, html, cleaned_html, markdown, extracted_content, success, media, links, metadata, screenshot, response_headers))
try:
await self.execute_with_retry(_cache)
@@ -189,4 +232,4 @@ class AsyncDatabaseManager:
logger.error(f"Error flushing database: {e}")
# Create a singleton instance
async_db_manager = AsyncDatabaseManager()
async_db_manager = AsyncDatabaseManager()

741
crawl4ai/async_executor.py Normal file
View File

@@ -0,0 +1,741 @@
from __future__ import annotations
import asyncio
import psutil
import logging
import time
import sqlite3
import aiosqlite
from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from enum import Enum
from typing import Dict, List, Optional, Callable, Any, Set, Type
from typing import Awaitable
from pathlib import Path
import json
from datetime import datetime
from typing import ClassVar, Type, Union
import inspect
# Imports from your crawler package
from .async_crawler_strategy import AsyncCrawlerStrategy, AsyncPlaywrightCrawlerStrategy
from .chunking_strategy import ChunkingStrategy, RegexChunking
from .extraction_strategy import ExtractionStrategy
from .models import CrawlResult
from .config import MIN_WORD_THRESHOLD
from .async_webcrawler import AsyncWebCrawler
from .config import MAX_METRICS_HISTORY
import logging
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s [%(levelname)s] %(name)s - %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)
# self.logger.error(f"Executor {self.__class__.__name__}: Error message", exc_info=True)
# self.logger.info(f"Executor {self.__class__.__name__}: Info message")
# self.logger.warning(f"Executor {self.__class__.__name__}: Warning message")
# Enums and Constants
class ExecutionMode(Enum):
"""Execution mode for the crawler executor."""
SPEED = "speed"
RESOURCE = "resource"
class TaskState(Enum):
"""Possible states for a crawling task."""
PENDING = "pending"
SCHEDULED = "scheduled"
RUNNING = "running"
COMPLETED = "completed"
FAILED = "failed"
RETRYING = "retrying"
# Types of callbacks we should support
class CallbackType(Enum):
PRE_EXECUTION = "pre_execution" # Before processing a URL
POST_EXECUTION = "post_execution" # After successful processing
ON_ERROR = "on_error" # When an error occurs
ON_RETRY = "on_retry" # Before retrying a failed URL
ON_BATCH_START = "on_batch_start" # Before starting a batch
ON_BATCH_END = "on_batch_end" # After completing a batch
ON_COMPLETE = "on_complete" # After all URLs are processed
@dataclass
class SystemMetrics:
"""System resource metrics."""
cpu_percent: float
memory_percent: float
available_memory: int
timestamp: float
@classmethod
def capture(cls) -> 'SystemMetrics':
"""Capture current system metrics."""
return cls(
cpu_percent=psutil.cpu_percent(),
memory_percent=psutil.virtual_memory().percent,
available_memory=psutil.virtual_memory().available,
timestamp=time.time()
)
@dataclass
class TaskMetadata:
"""Metadata for a crawling task."""
url: str
state: TaskState
attempts: int = 0
last_attempt: Optional[float] = None
error: Optional[str] = None
result: Optional[Any] = None
@dataclass
class ExecutorMetrics:
"""Performance and resource metrics for the executor."""
# Performance metrics
total_urls: int = 0
completed_urls: int = 0
failed_urls: int = 0
start_time: Optional[float] = None
total_retries: int = 0
response_times: List[float] = field(default_factory=list)
# Resource metrics
system_metrics: List[SystemMetrics] = field(default_factory=list)
active_connections: int = 0
def capture_system_metrics(self):
"""Capture system metrics and enforce history size limit."""
metrics = SystemMetrics.capture()
self.system_metrics.append(metrics)
if len(self.system_metrics) > MAX_METRICS_HISTORY:
self.system_metrics.pop(0) # Remove the oldest metric
@property
def urls_per_second(self) -> float:
"""Calculate URLs processed per second."""
if not self.start_time or not self.completed_urls:
return 0.0
duration = time.time() - self.start_time
return self.completed_urls / duration if duration > 0 else 0
@property
def success_rate(self) -> float:
"""Calculate success rate as percentage."""
if not self.total_urls:
return 0.0
return (self.completed_urls / self.total_urls) * 100
@property
def retry_rate(self) -> float:
"""Calculate retry rate as percentage."""
if not self.total_urls:
return 0.0
return (self.total_retries / self.total_urls) * 100
@property
def average_response_time(self) -> float:
"""Calculate average response time in seconds."""
if not self.response_times:
return 0.0
return sum(self.response_times) / len(self.response_times)
def to_dict(self) -> Dict[str, Any]:
"""Convert metrics to dictionary format."""
return {
"performance": {
"urls_per_second": self.urls_per_second,
"success_rate": self.success_rate,
"retry_rate": self.retry_rate,
"average_response_time": self.average_response_time,
"total_urls": self.total_urls,
"completed_urls": self.completed_urls,
"failed_urls": self.failed_urls
},
"resources": {
"cpu_utilization": self.system_metrics[-1].cpu_percent if self.system_metrics else 0,
"memory_usage": self.system_metrics[-1].memory_percent if self.system_metrics else 0,
"active_connections": self.active_connections
}
}
class ResourceMonitor:
"""Monitors and manages system resources."""
def __init__(self, mode: ExecutionMode):
self.mode = mode
self.metrics_history: List[SystemMetrics] = []
self._setup_thresholds()
def _setup_thresholds(self):
"""Set up resource thresholds based on execution mode."""
if self.mode == ExecutionMode.SPEED:
self.memory_threshold = 80 # 80% memory usage limit
self.cpu_threshold = 90 # 90% CPU usage limit
else:
self.memory_threshold = 40 # 40% memory usage limit
self.cpu_threshold = 30 # 30% CPU usage limit
async def check_resources(self) -> bool:
"""Check if system resources are within acceptable limits."""
metrics = SystemMetrics.capture()
self.metrics_history.append(metrics)
# Keep only last hour of metrics
cutoff_time = time.time() - 3600
self.metrics_history = [m for m in self.metrics_history if m.timestamp > cutoff_time]
return (metrics.cpu_percent < self.cpu_threshold and
metrics.memory_percent < self.memory_threshold)
def get_optimal_batch_size(self, total_urls: int) -> int:
metrics = SystemMetrics.capture()
if self.mode == ExecutionMode.SPEED:
base_size = min(1000, total_urls)
# Adjust based on resource usage
cpu_factor = max(0.0, (self.cpu_threshold - metrics.cpu_percent) / self.cpu_threshold)
mem_factor = max(0.0, (self.memory_threshold - metrics.memory_percent) / self.memory_threshold)
min_factor = min(cpu_factor, mem_factor)
adjusted_size = max(1, int(base_size * min_factor))
return min(total_urls, adjusted_size)
else:
# For resource optimization, use a conservative batch size based on resource usage
cpu_factor = max(0.1, (self.cpu_threshold - metrics.cpu_percent) / self.cpu_threshold)
mem_factor = max(0.1, (self.memory_threshold - metrics.memory_percent) / self.memory_threshold)
min_factor = min(cpu_factor, mem_factor)
adjusted_size = max(1, int(50 * min_factor))
return min(total_urls, adjusted_size)
class ExecutorControl:
"""Control interface for the executor."""
def __init__(self):
self._paused = False
self._cancelled = False
self._pause_event = asyncio.Event()
self._pause_event.set() # Not paused initially
self._lock = asyncio.Lock() # Lock to protect shared state
self.logger = logging.getLogger(f"{__name__}.{self.__class__.__name__}")
async def pause(self):
"""Pause the execution."""
async with self._lock:
self._paused = True
self._pause_event.clear()
async def resume(self):
"""Resume the execution."""
async with self._lock:
self._paused = False
self._pause_event.set()
async def cancel(self):
"""Cancel all pending operations."""
async with self._lock:
self._cancelled = True
self._pause_event.set() # Release any paused operations
async def is_paused(self) -> bool:
"""Check if execution is paused."""
async with self._lock:
return self._paused
async def is_cancelled(self) -> bool:
"""Check if execution is cancelled."""
async with self._lock:
return self._cancelled
async def wait_if_paused(self, timeout: Optional[float] = None):
"""Wait if execution is paused, with an optional timeout."""
try:
await asyncio.wait_for(self._pause_event.wait(), timeout=timeout)
except asyncio.TimeoutError:
# Timeout occurred, handle as needed
async with self._lock:
self._paused = False # Optionally reset the paused state
self._pause_event.set()
# Optionally log a warning
self.logger.warning(f"ExecutorControl: wait_if_paused() timed out after {timeout} seconds. Proceeding with execution.")
async def reset(self):
"""Reset control state."""
async with self._lock:
self._paused = False
self._cancelled = False
self._pause_event.set()
class ExecutorStrategy(ABC):
"""Abstract Base class for executor strategies.
Callbacks:
- PRE_EXECUTION: Callable[[str, Dict[str, Any]], None]
- POST_EXECUTION: Callable[[str, Any, Dict[str, Any]], None]
- ON_ERROR: Callable[[str, Exception, Dict[str, Any]], None]
- ON_RETRY: Callable[[str, int, Dict[str, Any]], None]
- ON_BATCH_START: Callable[[List[str], Dict[str, Any]], None]
- ON_BATCH_END: Callable[[List[str], Dict[str, Any]], None]
- ON_COMPLETE: Callable[[Dict[str, Any], Dict[str, Any]], None]
"""
def __init__(
self,
crawler: AsyncWebCrawler,
mode: ExecutionMode,
# callbacks: Optional[Dict[CallbackType, Callable]] = None,
callbacks: Optional[Dict[CallbackType, Callable[[Any], Union[Awaitable[None], None]]]] = None,
persistence_path: Optional[Path] = None,
**crawl_config_kwargs
):
self.crawler = crawler
self.mode = mode
self.callbacks = callbacks or {}
self.resource_monitor = ResourceMonitor(mode)
self.tasks: Dict[str, TaskMetadata] = {}
self.active_tasks: Set[str] = set()
self.logger = logging.getLogger(f"{__name__}.{self.__class__.__name__}")
self.metrics = ExecutorMetrics()
self.control = ExecutorControl()
self.crawl_config_kwargs = crawl_config_kwargs # Store parameters for arun
async def get_status(self) -> Dict[str, Any]:
"""Get current executor status and metrics."""
return {
"status": {
"paused": await self.control.is_paused(),
"cancelled": await self.control.is_cancelled(),
"active_tasks": len(self.active_tasks)
},
"metrics": self.metrics.to_dict()
}
async def clear_state(self):
"""Reset executor state."""
self.tasks.clear()
self.active_tasks.clear()
self.metrics = ExecutorMetrics()
await self.control.reset()
await self.persistence.clear() # Implement this method
async def _execute_callback(
self,
callback_type: CallbackType,
*args,
**kwargs
):
"""Execute callback if it exists."""
if callback := self.callbacks.get(callback_type):
try:
if inspect.iscoroutinefunction(callback):
await callback(*args, **kwargs)
else:
callback(*args, **kwargs)
except Exception as e:
# self.logger.error(f"Callback {callback_type} failed: {e}")
self.logger.error(f"Executor {self.__class__.__name__}: Callback {callback_type.value} failed: {e}", exc_info=True)
async def _process_url(self, url: str) -> CrawlResult:
max_retries = self.crawl_config_kwargs.get('max_retries', 3)
backoff_factor = self.crawl_config_kwargs.get('backoff_factor', 1)
attempts = 0
while attempts <= max_retries:
# Invoke PRE_EXECUTION callback
await self._execute_callback(CallbackType.PRE_EXECUTION, url, self.metrics.to_dict())
"""Process a single URL using the crawler."""
# Wait if execution is paused
await self.control.wait_if_paused(timeout=300)
# Check if cancelled
if await self.control.is_cancelled():
raise asyncio.CancelledError("Execution was cancelled")
start_time = time.time()
self.metrics.active_connections += 1
try:
result = await self.crawler.arun(url, **self.crawl_config_kwargs)
self.metrics.completed_urls += 1
self.metrics.response_times.append(time.time() - start_time)
# Invoke POST_EXECUTION callback
await self._execute_callback(CallbackType.POST_EXECUTION, url, result, self.metrics.to_dict())
return result
except Exception as e:
attempts += 1
self.metrics.failed_urls += 1
# self.logger.error(f"Error processing URL {url}: {e}")
self.logger.error(f"Executor {self.__class__.__name__}: Error processing URL {url}: {e}", exc_info=True)
# Invoke ON_ERROR callback
await self._execute_callback(CallbackType.ON_ERROR, url, e, self.metrics.to_dict())
if attempts <= max_retries:
# Invoke ON_RETRY callback
await self._execute_callback(CallbackType.ON_RETRY, url, attempts, self.metrics.to_dict())
# Wait before retrying
await asyncio.sleep(backoff_factor * attempts)
else:
raise e
finally:
self.metrics.active_connections -= 1
# Update system metrics
# INFO: Uncomment this line if you want to capture system metrics after each URL, but it causes a performance hit
# self.metrics.system_metrics.append(SystemMetrics.capture())
# Exit the loop if successful or retries exceeded
if attempts > max_retries:
break
async def execute(self, urls: List[str]) -> Dict[str, Any]:
"""Execute crawling tasks."""
# Initialize metrics
self.metrics.total_urls = len(urls)
self.metrics.start_time = time.time()
# Create context with metrics (used for callbacks)
context = {
"mode": self.mode,
"start_time": self.metrics.start_time,
"total_urls": self.metrics.total_urls
}
# Invoke ON_BATCH_START callback
await self._execute_callback(CallbackType.ON_BATCH_START, urls, context)
results = {}
batch_errors = []
# Use the crawler within an async context manager
async with self.crawler:
# Check for cancellation before starting
if await self.control.is_cancelled():
raise asyncio.CancelledError("Execution was cancelled")
# Wait if paused
await self.control.wait_if_paused(timeout=300)
# Prepare list of batches
batches = []
total_urls_remaining = len(urls)
index = 0
while index < len(urls):
batch_size = self.resource_monitor.get_optimal_batch_size(total_urls_remaining)
batch_urls = urls[index:index + batch_size]
batches.append(batch_urls)
index += batch_size
total_urls_remaining -= batch_size
# Process each batch
for batch_urls in batches:
# Check for cancellation
if await self.control.is_cancelled():
raise asyncio.CancelledError("Execution was cancelled")
# Wait if paused
await self.control.wait_if_paused(timeout=300)
try:
# Process the batch
batch_results = await self.process_batch(batch_urls)
# Update results
results.update(batch_results)
# Capture system metrics after each batch
self.metrics.capture_system_metrics()
# Update system metrics after each batch
# self.metrics.system_metrics.append(SystemMetrics.capture()) # Has memory leak issue
# Invoke ON_BATCH_END callback
await self._execute_callback(CallbackType.ON_BATCH_END, batch_urls, context)
except Exception as e:
# Handle batch-level exceptions
self.logger.error(f"Error processing batch: {e}")
await self._execute_callback(CallbackType.ON_ERROR, "batch", e, context)
# Collect the error
batch_errors.append((batch_urls, e))
# Continue to next batch instead of raising
continue
# Execution complete
await self._execute_callback(CallbackType.ON_COMPLETE, results, context)
# Log final metrics and batch errors if any
final_status = await self.get_status()
# self.logger.info(f"Execution completed. Metrics: {final_status}")
self.logger.info(f"Executor {self.__class__.__name__}: Execution completed. Metrics: {final_status}")
if batch_errors:
# self.logger.warning(f"Execution completed with errors in {len(batch_errors)} batches.")
self.logger.warning(f"Executor {self.__class__.__name__}: Execution completed with errors in {len(batch_errors)} batches.")
return results
@abstractmethod
async def process_batch(self, batch_urls: List[str]) -> Dict[str, Any]:
"""Process a batch of URLs."""
pass
class SpeedOptimizedExecutor(ExecutorStrategy):
"""Executor optimized for speed."""
def __init__(
self,
crawler: AsyncWebCrawler,
callbacks: Optional[Dict[CallbackType, Callable]] = None,
persistence_path: Optional[Path] = None,
word_count_threshold=MIN_WORD_THRESHOLD,
extraction_strategy: ExtractionStrategy = None,
chunking_strategy: ChunkingStrategy = None,
bypass_cache: bool = False,
css_selector: str = None,
screenshot: bool = False,
user_agent: str = None,
verbose=True,
connection_pool_size: int = 1000,
dns_cache_size: int = 10000,
backoff_factor: int = 1,
**kwargs
):
if chunking_strategy is None:
chunking_strategy = RegexChunking()
super().__init__(
crawler=crawler,
mode=ExecutionMode.SPEED,
callbacks=callbacks,
persistence_path=persistence_path,
word_count_threshold=word_count_threshold,
extraction_strategy=extraction_strategy,
chunking_strategy=chunking_strategy,
bypass_cache=bypass_cache,
css_selector=css_selector,
screenshot=screenshot,
user_agent=user_agent,
verbose=verbose,
**kwargs
)
self.connection_pool_size = connection_pool_size
self.dns_cache_size = dns_cache_size
self.backoff_factor = backoff_factor
self.logger.info(
# "Initialized speed-optimized executor with:"
f"Executor {self.__class__.__name__}: Initialized with:"
f" connection_pool_size={self.connection_pool_size},"
f" dns_cache_size={self.dns_cache_size}"
)
async def process_batch(self, batch_urls: List[str]) -> Dict[str, Any]:
"""Process a batch of URLs concurrently."""
batch_tasks = [self._process_url(url) for url in batch_urls]
# Execute batch with concurrency control
batch_results_list = await asyncio.gather(*batch_tasks, return_exceptions=True)
batch_results = {}
for url, result in zip(batch_urls, batch_results_list):
if isinstance(result, Exception):
batch_results[url] = {"success": False, "error": str(result)}
else:
batch_results[url] = {"success": True, "result": result}
return batch_results
class ResourceOptimizedExecutor(ExecutorStrategy):
"""Executor optimized for resource usage."""
def __init__(
self,
crawler: AsyncWebCrawler,
callbacks: Optional[Dict[CallbackType, Callable]] = None,
persistence_path: Optional[Path] = None,
word_count_threshold=MIN_WORD_THRESHOLD,
extraction_strategy: ExtractionStrategy = None,
chunking_strategy: ChunkingStrategy = None,
bypass_cache: bool = False,
css_selector: str = None,
screenshot: bool = False,
user_agent: str = None,
verbose=True,
connection_pool_size: int = 50,
dns_cache_size: int = 1000,
backoff_factor: int = 5,
max_concurrent_tasks: int = 5,
**kwargs
):
if chunking_strategy is None:
chunking_strategy = RegexChunking()
super().__init__(
crawler=crawler,
mode=ExecutionMode.RESOURCE,
callbacks=callbacks,
persistence_path=persistence_path,
word_count_threshold=word_count_threshold,
extraction_strategy=extraction_strategy,
chunking_strategy=chunking_strategy,
bypass_cache=bypass_cache,
css_selector=css_selector,
screenshot=screenshot,
user_agent=user_agent,
verbose=verbose,
**kwargs
)
self.connection_pool_size = connection_pool_size
self.dns_cache_size = dns_cache_size
self.backoff_factor = backoff_factor
self.max_concurrent_tasks = max_concurrent_tasks
self.logger.info(
# "Initialized resource-optimized executor with:"
f"Executor {self.__class__.__name__}: Initialized with:"
f" connection_pool_size={self.connection_pool_size},"
f" dns_cache_size={self.dns_cache_size},"
f" max_concurrent_tasks={self.max_concurrent_tasks}"
)
async def process_batch(self, batch_urls: List[str]) -> Dict[str, Any]:
"""Process a batch of URLs with resource optimization."""
batch_results = {}
semaphore = asyncio.Semaphore(self.max_concurrent_tasks)
# Wait until resources are available before processing batch
while not await self.resource_monitor.check_resources():
# self.logger.warning("Resource limits reached, waiting...")
self.logger.warning(f"Executor {self.__class__.__name__}: Resource limits reached, waiting...")
await asyncio.sleep(self.backoff_factor)
# Check for cancellation
if await self.control.is_cancelled():
raise asyncio.CancelledError("Execution was cancelled")
async def process_url_with_semaphore(url):
async with semaphore:
# Check for cancellation
if await self.control.is_cancelled():
raise asyncio.CancelledError("Execution was cancelled")
# Wait if paused
await self.control.wait_if_paused(timeout=300)
try:
result = await self._process_url(url)
batch_results[url] = {"success": True, "result": result}
except Exception as e:
batch_results[url] = {"success": False, "error": str(e)}
finally:
# Update system metrics after each URL
# INFO: Uncomment this line if you want to capture system metrics after each URL, but it causes a performance hit
# self.metrics.system_metrics.append(SystemMetrics.capture())
# Controlled delay between URLs
await asyncio.sleep(0.1) # Small delay for resource management
tasks = [process_url_with_semaphore(url) for url in batch_urls]
await asyncio.gather(*tasks)
return batch_results
async def main():
# Sample callback functions
async def pre_execution_callback(url: str, context: Dict[str, Any]):
print(f"Pre-execution callback: About to process URL {url}")
async def post_execution_callback(url: str, result: Any, context: Dict[str, Any]):
print(f"Post-execution callback: Successfully processed URL {url}")
async def on_error_callback(url: str, error: Exception, context: Dict[str, Any]):
print(f"Error callback: Error processing URL {url}: {error}")
async def on_retry_callback(url: str, attempt: int, context: Dict[str, Any]):
print(f"Retry callback: Retrying URL {url}, attempt {attempt}")
async def on_batch_start_callback(urls: List[str], context: Dict[str, Any]):
print(f"Batch start callback: Starting batch with {len(urls)} URLs")
async def on_batch_end_callback(urls: List[str], context: Dict[str, Any]):
print(f"Batch end callback: Completed batch with {len(urls)} URLs")
async def on_complete_callback(results: Dict[str, Any], context: Dict[str, Any]):
print(f"Complete callback: Execution completed with {len(results)} results")
# Sample URLs to crawl
urls = [
"https://www.example.com",
"https://www.python.org",
"https://www.asyncio.org",
# Add more URLs as needed
]
# Instantiate the crawler
crawler = AsyncWebCrawler()
# Set up callbacks
callbacks = {
CallbackType.PRE_EXECUTION: pre_execution_callback,
CallbackType.POST_EXECUTION: post_execution_callback,
CallbackType.ON_ERROR: on_error_callback,
CallbackType.ON_RETRY: on_retry_callback,
CallbackType.ON_BATCH_START: on_batch_start_callback,
CallbackType.ON_BATCH_END: on_batch_end_callback,
CallbackType.ON_COMPLETE: on_complete_callback,
}
# Instantiate the executors
speed_executor = SpeedOptimizedExecutor(
crawler=crawler,
callbacks=callbacks,
max_retries=2, # Example additional config
)
resource_executor = ResourceOptimizedExecutor(
crawler=crawler,
callbacks=callbacks,
max_concurrent_tasks=3, # Limit concurrency
max_retries=2, # Example additional config
)
# Choose which executor to use
executor = speed_executor # Or resource_executor
# Start the execution in a background task
execution_task = asyncio.create_task(executor.execute(urls))
# Simulate control operations
await asyncio.sleep(2) # Let it run for a bit
print("Pausing execution...")
await executor.control.pause()
await asyncio.sleep(2) # Wait while paused
print("Resuming execution...")
await executor.control.resume()
# Wait for execution to complete
results = await execution_task
# Print the results
print("Execution results:")
for url, result in results.items():
print(f"{url}: {result}")
# Get and print final metrics
final_status = await executor.get_status()
print("Final executor status and metrics:")
print(final_status)
# Run the main function
if __name__ == "__main__":
asyncio.run(main())
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -9,7 +9,7 @@ from .async_database import async_db_manager
from .chunking_strategy import *
from .extraction_strategy import *
from .async_crawler_strategy import AsyncCrawlerStrategy, AsyncPlaywrightCrawlerStrategy, AsyncCrawlResponse
from .content_scrapping_strategy import WebScrappingStrategy
from .content_scrapping_strategy import WebScrapingStrategy
from .config import MIN_WORD_THRESHOLD, IMAGE_DESCRIPTION_MIN_WORD_THRESHOLD
from .utils import (
sanitize_input_encode,
@@ -75,6 +75,19 @@ class AsyncWebCrawler:
verbose=True,
**kwargs,
) -> CrawlResult:
"""
Runs the crawler for a single source: URL (web, local file, or raw HTML).
Args:
url (str): The URL to crawl. Supported prefixes:
- 'http://' or 'https://': Web URL to crawl.
- 'file://': Local file path to process.
- 'raw:': Raw HTML content to process.
... [other existing parameters]
Returns:
CrawlResult: The result of the crawling and processing.
"""
try:
extraction_strategy = extraction_strategy or NoExtractionStrategy()
extraction_strategy.verbose = verbose
@@ -89,8 +102,13 @@ class AsyncWebCrawler:
cached = None
screenshot_data = None
extracted_content = None
if not bypass_cache and not self.always_by_pass_cache:
is_web_url = url.startswith(('http://', 'https://'))
if is_web_url and not bypass_cache and not self.always_by_pass_cache:
cached = await async_db_manager.aget_cached_url(url)
# if not bypass_cache and not self.always_by_pass_cache:
# cached = await async_db_manager.aget_cached_url(url)
if kwargs.get("warmup", True) and not self.ready:
return None
@@ -117,25 +135,32 @@ class AsyncWebCrawler:
)
crawl_result = await self.aprocess_html(
url,
html,
extracted_content,
word_count_threshold,
extraction_strategy,
chunking_strategy,
css_selector,
screenshot_data,
verbose,
bool(cached),
url=url,
html=html,
extracted_content=extracted_content,
word_count_threshold=word_count_threshold,
extraction_strategy=extraction_strategy,
chunking_strategy=chunking_strategy,
css_selector=css_selector,
screenshot=screenshot_data,
verbose=verbose,
is_cached=bool(cached),
async_response=async_response,
bypass_cache=bypass_cache,
**kwargs,
)
crawl_result.status_code = async_response.status_code if async_response else 200
crawl_result.response_headers = async_response.response_headers if async_response else {}
if async_response:
crawl_result.status_code = async_response.status_code
crawl_result.response_headers = async_response.response_headers
else:
crawl_result.status_code = 200
crawl_result.response_headers = cached[10]
crawl_result.success = bool(html)
crawl_result.session_id = kwargs.get("session_id", None)
return crawl_result
except Exception as e:
if not hasattr(e, "msg"):
e.msg = str(e)
@@ -155,22 +180,40 @@ class AsyncWebCrawler:
verbose=True,
**kwargs,
) -> List[CrawlResult]:
tasks = [
self.arun(
url,
word_count_threshold,
extraction_strategy,
chunking_strategy,
bypass_cache,
css_selector,
screenshot,
user_agent,
verbose,
**kwargs
)
for url in urls
]
return await asyncio.gather(*tasks)
"""
Runs the crawler for multiple sources: URLs (web, local files, or raw HTML).
Args:
urls (List[str]): A list of URLs with supported prefixes:
- 'http://' or 'https://': Web URL to crawl.
- 'file://': Local file path to process.
- 'raw:': Raw HTML content to process.
... [other existing parameters]
Returns:
List[CrawlResult]: The results of the crawling and processing.
"""
semaphore_count = kwargs.get('semaphore_count', 5) # Adjust as needed
semaphore = asyncio.Semaphore(semaphore_count)
async def crawl_with_semaphore(url):
async with semaphore:
return await self.arun(
url,
word_count_threshold=word_count_threshold,
extraction_strategy=extraction_strategy,
chunking_strategy=chunking_strategy,
bypass_cache=bypass_cache,
css_selector=css_selector,
screenshot=screenshot,
user_agent=user_agent,
verbose=verbose,
**kwargs,
)
tasks = [crawl_with_semaphore(url) for url in urls]
results = await asyncio.gather(*tasks, return_exceptions=True)
return [result if not isinstance(result, Exception) else str(result) for result in results]
async def aprocess_html(
self,
@@ -184,13 +227,14 @@ class AsyncWebCrawler:
screenshot: str,
verbose: bool,
is_cached: bool,
async_response: Optional[AsyncCrawlResponse],
**kwargs,
) -> CrawlResult:
t = time.time()
# Extract content from HTML
try:
t1 = time.time()
scrapping_strategy = WebScrappingStrategy()
scrapping_strategy = WebScrapingStrategy()
# result = await scrapping_strategy.ascrap(
result = scrapping_strategy.scrap(
url,
@@ -245,6 +289,12 @@ class AsyncWebCrawler:
)
screenshot = None if not screenshot else screenshot
response_headers = "{}" # Default value
if async_response:
# Serialize response_headers dict to JSON string
response_headers = json.dumps(async_response.response_headers, ensure_ascii=False)
if not is_cached or kwargs.get("bypass_cache", False) or self.always_by_pass_cache:
await async_db_manager.acache_url(
@@ -258,6 +308,7 @@ class AsyncWebCrawler:
json.dumps(links),
json.dumps(metadata),
screenshot=screenshot,
response_headers=response_headers,
)
return CrawlResult(

View File

@@ -51,3 +51,5 @@ SOCIAL_MEDIA_DOMAINS = [
# If image format is in jpg, png or webp
# If image is in the first half of the total images extracted from the page
IMAGE_SCORE_THRESHOLD = 2
MAX_METRICS_HISTORY = 1000

View File

@@ -15,7 +15,7 @@ class ContentCleaningStrategy:
self.link_density_threshold = 0.2
self.max_dom_depth = 10 # To prevent excessive DOM traversal
def clean(self, clean_html: str) -> str:
def clean(self, clean_html: str, soup = None) -> str:
"""
Main function that takes cleaned HTML and returns super cleaned HTML.
@@ -28,18 +28,20 @@ class ContentCleaningStrategy:
try:
if not clean_html or not isinstance(clean_html, str):
return ''
soup = BeautifulSoup(clean_html, 'html.parser')
if not soup:
# soup = BeautifulSoup(clean_html, 'html.parser')
soup = BeautifulSoup(clean_html, 'lxml')
main_content = self.extract_main_content(soup)
if main_content:
super_clean_element = self.clean_element(main_content)
return str(super_clean_element)
return super_clean_element.encode_contents().decode('utf-8')
else:
return ''
except Exception:
# Handle exceptions silently or log them as needed
return ''
def extract_main_content(self, soup: BeautifulSoup) -> Optional[Tag]:
def extract_main_content(self, soup) -> Optional[Tag]:
"""
Identifies and extracts the main content element from the HTML.

View File

@@ -1,3 +1,4 @@
import re # Point 1: Pre-Compile Regular Expressions
from abc import ABC, abstractmethod
from typing import Dict, Any
from bs4 import BeautifulSoup
@@ -105,7 +106,39 @@ class CustomHTML2Text(HTML2Text):
return
super().handle_data(data, entity_char)
class ContentScrappingStrategy(ABC):
# Pre-compile regular expressions for Open Graph and Twitter metadata
OG_REGEX = re.compile(r'^og:')
TWITTER_REGEX = re.compile(r'^twitter:')
DIMENSION_REGEX = re.compile(r"(\d+)(\D*)")
# Function to parse image height/width value and units
def parse_dimension(dimension):
if dimension:
# match = re.match(r"(\d+)(\D*)", dimension)
match = DIMENSION_REGEX.match(dimension)
if match:
number = int(match.group(1))
unit = match.group(2) or 'px' # Default unit is 'px' if not specified
return number, unit
return None, None
# Fetch image file metadata to extract size and extension
def fetch_image_file_size(img, base_url):
#If src is relative path construct full URL, if not it may be CDN URL
img_url = urljoin(base_url,img.get('src'))
try:
response = requests.head(img_url)
if response.status_code == 200:
return response.headers.get('Content-Length',None)
else:
print(f"Failed to retrieve file size for {img_url}")
return None
except InvalidSchema as e:
return None
finally:
return
class ContentScrapingStrategy(ABC):
@abstractmethod
def scrap(self, url: str, html: str, **kwargs) -> Dict[str, Any]:
pass
@@ -114,7 +147,7 @@ class ContentScrappingStrategy(ABC):
async def ascrap(self, url: str, html: str, **kwargs) -> Dict[str, Any]:
pass
class WebScrappingStrategy(ContentScrappingStrategy):
class WebScrapingStrategy(ContentScrapingStrategy):
def scrap(self, url: str, html: str, **kwargs) -> Dict[str, Any]:
return self._get_content_of_website_optimized(url, html, is_async=False, **kwargs)
@@ -126,9 +159,16 @@ class WebScrappingStrategy(ContentScrappingStrategy):
if not html:
return None
soup = BeautifulSoup(html, 'html.parser')
# soup = BeautifulSoup(html, 'html.parser')
soup = BeautifulSoup(html, 'lxml')
body = soup.body
try:
meta = extract_metadata("", soup)
except Exception as e:
print('Error extracting metadata:', str(e))
meta = {}
image_description_min_word_threshold = kwargs.get('image_description_min_word_threshold', IMAGE_DESCRIPTION_MIN_WORD_THRESHOLD)
@@ -187,31 +227,7 @@ class WebScrappingStrategy(ContentScrappingStrategy):
#Score an image for it's usefulness
def score_image_for_usefulness(img, base_url, index, images_count):
# Function to parse image height/width value and units
def parse_dimension(dimension):
if dimension:
match = re.match(r"(\d+)(\D*)", dimension)
if match:
number = int(match.group(1))
unit = match.group(2) or 'px' # Default unit is 'px' if not specified
return number, unit
return None, None
# Fetch image file metadata to extract size and extension
def fetch_image_file_size(img, base_url):
#If src is relative path construct full URL, if not it may be CDN URL
img_url = urljoin(base_url,img.get('src'))
try:
response = requests.head(img_url)
if response.status_code == 200:
return response.headers.get('Content-Length',None)
else:
print(f"Failed to retrieve file size for {img_url}")
return None
except InvalidSchema as e:
return None
finally:
return
image_height = img.get('height')
height_value, height_unit = parse_dimension(image_height)
@@ -294,7 +310,6 @@ class WebScrappingStrategy(ContentScrappingStrategy):
exclude_social_media_domains = SOCIAL_MEDIA_DOMAINS + kwargs.get('exclude_social_media_domains', [])
exclude_social_media_domains = list(set(exclude_social_media_domains))
try:
if element.name == 'a' and element.get('href'):
@@ -439,15 +454,7 @@ class WebScrappingStrategy(ContentScrappingStrategy):
except Exception as e:
print('Error processing element:', str(e))
return False
#process images by filtering and extracting contextual text from the page
# imgs = body.find_all('img')
# media['images'] = [
# result for result in
# (process_image(img, url, i, len(imgs)) for i, img in enumerate(imgs))
# if result is not None
# ]
process_element(body)
# Update the links dictionary with unique links
@@ -478,8 +485,9 @@ class WebScrappingStrategy(ContentScrappingStrategy):
# Replace base64 data with empty string
img['src'] = base64_pattern.sub('', src)
str_body = ""
try:
str(body)
str_body = body.encode_contents().decode('utf-8')
except Exception as e:
# Reset body to the original HTML
success = False
@@ -504,11 +512,12 @@ class WebScrappingStrategy(ContentScrappingStrategy):
# Append the error div to the body
body.body.append(error_div)
str_body = body.encode_contents().decode('utf-8')
print(f"[LOG] 😧 Error: After processing the crawled HTML and removing irrelevant tags, nothing was left in the page. Check the markdown for further details.")
cleaned_html = str(body).replace('\n\n', '\n').replace(' ', ' ')
cleaned_html = str_body.replace('\n\n', '\n').replace(' ', ' ')
try:
h = CustomHTML2Text()
@@ -518,15 +527,14 @@ class WebScrappingStrategy(ContentScrappingStrategy):
markdown = h.handle(sanitize_html(cleaned_html))
markdown = markdown.replace(' ```', '```')
try:
meta = extract_metadata(html, soup)
except Exception as e:
print('Error extracting metadata:', str(e))
meta = {}
cleaner = ContentCleaningStrategy()
fit_html = cleaner.clean(cleaned_html)
fit_markdown = h.handle(fit_html)
fit_markdown = "Set flag 'fit_markdown' to True to get cleaned HTML content."
fit_html = "Set flag 'fit_markdown' to True to get cleaned HTML content."
if kwargs.get('fit_markdown', False):
cleaner = ContentCleaningStrategy()
fit_html = cleaner.clean(cleaned_html)
fit_markdown = h.handle(fit_html)
cleaned_html = sanitize_html(cleaned_html)
return {

View File

@@ -736,46 +736,54 @@ def get_content_of_website_optimized(url: str, html: str, word_count_threshold:
'metadata': meta
}
def extract_metadata(html, soup = None):
def extract_metadata(html, soup=None):
metadata = {}
if not html:
if not html and not soup:
return {}
if not soup:
soup = BeautifulSoup(html, 'lxml')
head = soup.head
if not head:
return metadata
# Parse HTML content with BeautifulSoup
if not soup:
soup = BeautifulSoup(html, 'html.parser')
# Title
title_tag = soup.find('title')
metadata['title'] = title_tag.string if title_tag else None
title_tag = head.find('title')
metadata['title'] = title_tag.string.strip() if title_tag and title_tag.string else None
# Meta description
description_tag = soup.find('meta', attrs={'name': 'description'})
metadata['description'] = description_tag['content'] if description_tag else None
description_tag = head.find('meta', attrs={'name': 'description'})
metadata['description'] = description_tag.get('content', '').strip() if description_tag else None
# Meta keywords
keywords_tag = soup.find('meta', attrs={'name': 'keywords'})
metadata['keywords'] = keywords_tag['content'] if keywords_tag else None
keywords_tag = head.find('meta', attrs={'name': 'keywords'})
metadata['keywords'] = keywords_tag.get('content', '').strip() if keywords_tag else None
# Meta author
author_tag = soup.find('meta', attrs={'name': 'author'})
metadata['author'] = author_tag['content'] if author_tag else None
author_tag = head.find('meta', attrs={'name': 'author'})
metadata['author'] = author_tag.get('content', '').strip() if author_tag else None
# Open Graph metadata
og_tags = soup.find_all('meta', attrs={'property': lambda value: value and value.startswith('og:')})
og_tags = head.find_all('meta', attrs={'property': re.compile(r'^og:')})
for tag in og_tags:
property_name = tag['property']
metadata[property_name] = tag['content']
property_name = tag.get('property', '').strip()
content = tag.get('content', '').strip()
if property_name and content:
metadata[property_name] = content
# Twitter Card metadata
twitter_tags = soup.find_all('meta', attrs={'name': lambda value: value and value.startswith('twitter:')})
twitter_tags = head.find_all('meta', attrs={'name': re.compile(r'^twitter:')})
for tag in twitter_tags:
property_name = tag['name']
metadata[property_name] = tag['content']
property_name = tag.get('name', '').strip()
content = tag.get('content', '').strip()
if property_name and content:
metadata[property_name] = content
return metadata
def extract_xml_tags(string):
tags = re.findall(r'<(\w+)>', string)
return list(set(tags))

View File

@@ -10,6 +10,7 @@ from .extraction_strategy import *
from .crawler_strategy import *
from typing import List
from concurrent.futures import ThreadPoolExecutor
from .content_scrapping_strategy import WebScrapingStrategy
from .config import *
import warnings
import json
@@ -181,7 +182,21 @@ class WebCrawler:
# Extract content from HTML
try:
t1 = time.time()
result = get_content_of_website_optimized(url, html, word_count_threshold, css_selector=css_selector, only_text=kwargs.get("only_text", False))
scrapping_strategy = WebScrapingStrategy()
extra_params = {k: v for k, v in kwargs.items() if k not in ["only_text", "image_description_min_word_threshold"]}
result = scrapping_strategy.scrap(
url,
html,
word_count_threshold=word_count_threshold,
css_selector=css_selector,
only_text=kwargs.get("only_text", False),
image_description_min_word_threshold=kwargs.get(
"image_description_min_word_threshold", IMAGE_DESCRIPTION_MIN_WORD_THRESHOLD
),
**extra_params,
)
# result = get_content_of_website_optimized(url, html, word_count_threshold, css_selector=css_selector, only_text=kwargs.get("only_text", False))
if verbose:
print(f"[LOG] 🚀 Content extracted for {url}, success: True, time taken: {time.time() - t1:.2f} seconds")

View File

@@ -0,0 +1,235 @@
# Prefix-Based Input Handling in Crawl4AI
This guide will walk you through using the Crawl4AI library to crawl web pages, local HTML files, and raw HTML strings. We'll demonstrate these capabilities using a Wikipedia page as an example.
## Table of Contents
- [Prefix-Based Input Handling in Crawl4AI](#prefix-based-input-handling-in-crawl4ai)
- [Table of Contents](#table-of-contents)
- [Crawling a Web URL](#crawling-a-web-url)
- [Crawling a Local HTML File](#crawling-a-local-html-file)
- [Crawling Raw HTML Content](#crawling-raw-html-content)
- [Complete Example](#complete-example)
- [**How It Works**](#how-it-works)
- [**Running the Example**](#running-the-example)
- [Conclusion](#conclusion)
---
### Crawling a Web URL
To crawl a live web page, provide the URL starting with `http://` or `https://`.
```python
import asyncio
from crawl4ai import AsyncWebCrawler
async def crawl_web():
async with AsyncWebCrawler(verbose=True) as crawler:
result = await crawler.arun(url="https://en.wikipedia.org/wiki/apple", bypass_cache=True)
if result.success:
print("Markdown Content:")
print(result.markdown)
else:
print(f"Failed to crawl: {result.error_message}")
asyncio.run(crawl_web())
```
### Crawling a Local HTML File
To crawl a local HTML file, prefix the file path with `file://`.
```python
import asyncio
from crawl4ai import AsyncWebCrawler
async def crawl_local_file():
local_file_path = "/path/to/apple.html" # Replace with your file path
file_url = f"file://{local_file_path}"
async with AsyncWebCrawler(verbose=True) as crawler:
result = await crawler.arun(url=file_url, bypass_cache=True)
if result.success:
print("Markdown Content from Local File:")
print(result.markdown)
else:
print(f"Failed to crawl local file: {result.error_message}")
asyncio.run(crawl_local_file())
```
### Crawling Raw HTML Content
To crawl raw HTML content, prefix the HTML string with `raw:`.
```python
import asyncio
from crawl4ai import AsyncWebCrawler
async def crawl_raw_html():
raw_html = "<html><body><h1>Hello, World!</h1></body></html>"
raw_html_url = f"raw:{raw_html}"
async with AsyncWebCrawler(verbose=True) as crawler:
result = await crawler.arun(url=raw_html_url, bypass_cache=True)
if result.success:
print("Markdown Content from Raw HTML:")
print(result.markdown)
else:
print(f"Failed to crawl raw HTML: {result.error_message}")
asyncio.run(crawl_raw_html())
```
---
## Complete Example
Below is a comprehensive script that:
1. **Crawls the Wikipedia page for "Apple".**
2. **Saves the HTML content to a local file (`apple.html`).**
3. **Crawls the local HTML file and verifies the markdown length matches the original crawl.**
4. **Crawls the raw HTML content from the saved file and verifies consistency.**
```python
import os
import sys
import asyncio
from pathlib import Path
# Adjust the parent directory to include the crawl4ai module
parent_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
sys.path.append(parent_dir)
from crawl4ai import AsyncWebCrawler
async def main():
# Define the URL to crawl
wikipedia_url = "https://en.wikipedia.org/wiki/apple"
# Define the path to save the HTML file
# Save the file in the same directory as the script
script_dir = Path(__file__).parent
html_file_path = script_dir / "apple.html"
async with AsyncWebCrawler(verbose=True) as crawler:
print("\n=== Step 1: Crawling the Wikipedia URL ===")
# Crawl the Wikipedia URL
result = await crawler.arun(url=wikipedia_url, bypass_cache=True)
# Check if crawling was successful
if not result.success:
print(f"Failed to crawl {wikipedia_url}: {result.error_message}")
return
# Save the HTML content to a local file
with open(html_file_path, 'w', encoding='utf-8') as f:
f.write(result.html)
print(f"Saved HTML content to {html_file_path}")
# Store the length of the generated markdown
web_crawl_length = len(result.markdown)
print(f"Length of markdown from web crawl: {web_crawl_length}\n")
print("=== Step 2: Crawling from the Local HTML File ===")
# Construct the file URL with 'file://' prefix
file_url = f"file://{html_file_path.resolve()}"
# Crawl the local HTML file
local_result = await crawler.arun(url=file_url, bypass_cache=True)
# Check if crawling was successful
if not local_result.success:
print(f"Failed to crawl local file {file_url}: {local_result.error_message}")
return
# Store the length of the generated markdown from local file
local_crawl_length = len(local_result.markdown)
print(f"Length of markdown from local file crawl: {local_crawl_length}")
# Compare the lengths
assert web_crawl_length == local_crawl_length, (
f"Markdown length mismatch: Web crawl ({web_crawl_length}) != Local file crawl ({local_crawl_length})"
)
print("✅ Markdown length matches between web crawl and local file crawl.\n")
print("=== Step 3: Crawling Using Raw HTML Content ===")
# Read the HTML content from the saved file
with open(html_file_path, 'r', encoding='utf-8') as f:
raw_html_content = f.read()
# Prefix the raw HTML content with 'raw:'
raw_html_url = f"raw:{raw_html_content}"
# Crawl using the raw HTML content
raw_result = await crawler.arun(url=raw_html_url, bypass_cache=True)
# Check if crawling was successful
if not raw_result.success:
print(f"Failed to crawl raw HTML content: {raw_result.error_message}")
return
# Store the length of the generated markdown from raw HTML
raw_crawl_length = len(raw_result.markdown)
print(f"Length of markdown from raw HTML crawl: {raw_crawl_length}")
# Compare the lengths
assert web_crawl_length == raw_crawl_length, (
f"Markdown length mismatch: Web crawl ({web_crawl_length}) != Raw HTML crawl ({raw_crawl_length})"
)
print("✅ Markdown length matches between web crawl and raw HTML crawl.\n")
print("All tests passed successfully!")
# Clean up by removing the saved HTML file
if html_file_path.exists():
os.remove(html_file_path)
print(f"Removed the saved HTML file: {html_file_path}")
# Run the main function
if __name__ == "__main__":
asyncio.run(main())
```
### **How It Works**
1. **Step 1: Crawl the Web URL**
- Crawls `https://en.wikipedia.org/wiki/apple`.
- Saves the HTML content to `apple.html`.
- Records the length of the generated markdown.
2. **Step 2: Crawl from the Local HTML File**
- Uses the `file://` prefix to crawl `apple.html`.
- Ensures the markdown length matches the original web crawl.
3. **Step 3: Crawl Using Raw HTML Content**
- Reads the HTML from `apple.html`.
- Prefixes it with `raw:` and crawls.
- Verifies the markdown length matches the previous results.
4. **Cleanup**
- Deletes the `apple.html` file after testing.
### **Running the Example**
1. **Save the Script:**
- Save the above code as `test_crawl4ai.py` in your project directory.
2. **Execute the Script:**
- Run the script using:
```bash
python test_crawl4ai.py
```
3. **Observe the Output:**
- The script will print logs detailing each step.
- Assertions ensure consistency across different crawling methods.
- Upon success, it confirms that all markdown lengths match.
---
## Conclusion
With the new prefix-based input handling in **Crawl4AI**, you can effortlessly crawl web URLs, local HTML files, and raw HTML strings using a unified `url` parameter. This enhancement simplifies the API usage and provides greater flexibility for diverse crawling scenarios.

12
main.py
View File

@@ -321,7 +321,12 @@ app.add_middleware(
# Mount the pages directory as a static directory
app.mount("/pages", StaticFiles(directory=__location__ + "/pages"), name="pages")
app.mount("/mkdocs", StaticFiles(directory="site", html=True), name="mkdocs")
# Check if site directory exists
if os.path.exists(__location__ + "/site"):
# Mount the site directory as a static directory
app.mount("/mkdocs", StaticFiles(directory="site", html=True), name="mkdocs")
site_templates = Jinja2Templates(directory=__location__ + "/site")
templates = Jinja2Templates(directory=__location__ + "/pages")
@@ -337,7 +342,10 @@ async def shutdown_event():
@app.get("/")
def read_root():
return RedirectResponse(url="/mkdocs")
if os.path.exists(__location__ + "/site"):
return RedirectResponse(url="/mkdocs")
# Return a json response
return {"message": "Crawl4AI API service is running"}
@app.post("/crawl")

View File

@@ -8,4 +8,4 @@ playwright>=1.47,<1.48
python-dotenv~=1.0
requests~=2.26
beautifulsoup4~=4.12
playwright_stealth~=1.0
tf-playwright-stealth~=1.0

File diff suppressed because one or more lines are too long

View File

@@ -0,0 +1,219 @@
import os, sys
import unittest
import asynctest
import asyncio
import time
from typing import Dict, Any, List
from unittest.mock import AsyncMock, MagicMock, patch
# Add the parent directory to the Python path
parent_dir = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
sys.path.append(parent_dir)
# Assuming all classes and imports are already available from the code above
from crawl4ai.async_webcrawler import AsyncWebCrawler
from crawl4ai.config import MAX_METRICS_HISTORY
from crawl4ai.async_executor import (
SpeedOptimizedExecutor,
ResourceOptimizedExecutor,
AsyncWebCrawler,
ExecutionMode,
SystemMetrics,
CallbackType
)
class TestAsyncExecutor(asynctest.TestCase):
async def setUp(self):
# Set up a mock crawler
self.mock_crawler = AsyncMock(spec=AsyncWebCrawler)
self.mock_crawler.arun = AsyncMock(side_effect=self.mock_crawl)
# Sample URLs
self.urls = [
"https://www.example.com",
"https://www.python.org",
"https://www.asyncio.org",
"https://www.nonexistenturl.xyz", # This will simulate a failure
]
# Set up callbacks
self.callbacks = {
CallbackType.PRE_EXECUTION: AsyncMock(),
CallbackType.POST_EXECUTION: AsyncMock(),
CallbackType.ON_ERROR: AsyncMock(),
CallbackType.ON_RETRY: AsyncMock(),
CallbackType.ON_BATCH_START: AsyncMock(),
CallbackType.ON_BATCH_END: AsyncMock(),
CallbackType.ON_COMPLETE: AsyncMock(),
}
async def mock_crawl(self, url: str, **kwargs):
if "nonexistenturl" in url:
raise Exception("Failed to fetch URL")
return f"Mock content for {url}"
async def test_speed_executor_basic(self):
"""Test basic functionality of SpeedOptimizedExecutor."""
executor = SpeedOptimizedExecutor(
crawler=self.mock_crawler,
callbacks=self.callbacks,
max_retries=1,
)
results = await executor.execute(self.urls)
# Assertions
self.assertEqual(len(results), len(self.urls))
self.mock_crawler.arun.assert_awaited()
self.callbacks[CallbackType.PRE_EXECUTION].assert_awaited()
self.callbacks[CallbackType.POST_EXECUTION].assert_awaited()
self.callbacks[CallbackType.ON_ERROR].assert_awaited()
async def test_resource_executor_basic(self):
"""Test basic functionality of ResourceOptimizedExecutor."""
executor = ResourceOptimizedExecutor(
crawler=self.mock_crawler,
callbacks=self.callbacks,
max_concurrent_tasks=2,
max_retries=1,
)
results = await executor.execute(self.urls)
# Assertions
self.assertEqual(len(results), len(self.urls))
self.mock_crawler.arun.assert_awaited()
self.callbacks[CallbackType.PRE_EXECUTION].assert_awaited()
self.callbacks[CallbackType.POST_EXECUTION].assert_awaited()
self.callbacks[CallbackType.ON_ERROR].assert_awaited()
async def test_pause_and_resume(self):
"""Test the pause and resume functionality."""
executor = SpeedOptimizedExecutor(
crawler=self.mock_crawler,
callbacks=self.callbacks,
max_retries=1,
)
execution_task = asyncio.create_task(executor.execute(self.urls))
await asyncio.sleep(0.1)
await executor.control.pause()
self.assertTrue(await executor.control.is_paused())
# Ensure that execution is paused
await asyncio.sleep(0.5)
await executor.control.resume()
self.assertFalse(await executor.control.is_paused())
results = await execution_task
# Assertions
self.assertEqual(len(results), len(self.urls))
async def test_cancellation(self):
"""Test the cancellation functionality."""
executor = SpeedOptimizedExecutor(
crawler=self.mock_crawler,
callbacks=self.callbacks,
max_retries=1,
)
execution_task = asyncio.create_task(executor.execute(self.urls))
await asyncio.sleep(0.1)
await executor.control.cancel()
self.assertTrue(await executor.control.is_cancelled())
with self.assertRaises(asyncio.CancelledError):
await execution_task
async def test_max_retries(self):
"""Test that the executor respects the max_retries setting."""
executor = SpeedOptimizedExecutor(
crawler=self.mock_crawler,
callbacks=self.callbacks,
max_retries=2,
)
results = await executor.execute(self.urls)
# The failing URL should have been retried
self.assertEqual(self.mock_crawler.arun.call_count, len(self.urls) + 2)
self.assertEqual(executor.metrics.total_retries, 2)
async def test_callbacks_invoked(self):
"""Test that all callbacks are invoked appropriately."""
executor = SpeedOptimizedExecutor(
crawler=self.mock_crawler,
callbacks=self.callbacks,
max_retries=1,
)
await executor.execute(self.urls)
# Check that callbacks were called the correct number of times
self.assertEqual(
self.callbacks[CallbackType.PRE_EXECUTION].call_count,
len(self.urls) * (1 + executor.metrics.total_retries),
)
self.assertEqual(
self.callbacks[CallbackType.POST_EXECUTION].call_count,
executor.metrics.completed_urls,
)
self.assertEqual(
self.callbacks[CallbackType.ON_ERROR].call_count,
executor.metrics.failed_urls * (1 + executor.metrics.total_retries),
)
self.callbacks[CallbackType.ON_COMPLETE].assert_awaited_once()
async def test_resource_limits(self):
"""Test that the ResourceOptimizedExecutor respects resource limits."""
with patch('psutil.cpu_percent', return_value=95), \
patch('psutil.virtual_memory', return_value=MagicMock(percent=85, available=1000)):
executor = ResourceOptimizedExecutor(
crawler=self.mock_crawler,
callbacks=self.callbacks,
max_concurrent_tasks=2,
max_retries=1,
)
results = await executor.execute(self.urls)
# Assertions
self.assertEqual(len(results), len(self.urls))
# Since resources are over threshold, batch size should be minimized
batch_sizes = [executor.resource_monitor.get_optimal_batch_size(len(self.urls))]
self.assertTrue(all(size == 1 for size in batch_sizes))
async def test_system_metrics_limit(self):
"""Test that the system_metrics list does not grow indefinitely."""
executor = SpeedOptimizedExecutor(
crawler=self.mock_crawler,
callbacks=self.callbacks,
max_retries=1,
)
# Simulate many batches to exceed MAX_METRICS_HISTORY
original_max_history = MAX_METRICS_HISTORY
try:
# Temporarily reduce MAX_METRICS_HISTORY for the test
globals()['MAX_METRICS_HISTORY'] = 5
# Mock capture_system_metrics to increase system_metrics length
with patch.object(executor.metrics, 'capture_system_metrics') as mock_capture:
def side_effect():
executor.metrics.system_metrics.append(SystemMetrics(0, 0, 0, time.time()))
if len(executor.metrics.system_metrics) > MAX_METRICS_HISTORY:
executor.metrics.system_metrics.pop(0)
mock_capture.side_effect = side_effect
await executor.execute(self.urls * 3) # Multiply URLs to create more batches
# Assertions
self.assertLessEqual(len(executor.metrics.system_metrics), MAX_METRICS_HISTORY)
finally:
# Restore original MAX_METRICS_HISTORY
globals()['MAX_METRICS_HISTORY'] = original_max_history
if __name__ == "__main__":
unittest.main()

View File

@@ -0,0 +1,162 @@
import asyncio
from bs4 import BeautifulSoup
from typing import Dict, Any
import os
import sys
import time
import csv
from tabulate import tabulate
from dataclasses import dataclass
from typing import List, Dict
parent_dir = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
sys.path.append(parent_dir)
__location__ = os.path.realpath(os.path.join(os.getcwd(), os.path.dirname(__file__)))
from crawl4ai.content_scrapping_strategy import WebScrapingStrategy
from crawl4ai.content_scrapping_strategy import WebScrapingStrategy as WebScrapingStrategyCurrent
# from crawl4ai.content_scrapping_strategy_current import WebScrapingStrategy as WebScrapingStrategyCurrent
@dataclass
class TestResult:
name: str
success: bool
images: int
internal_links: int
external_links: int
markdown_length: int
execution_time: float
class StrategyTester:
def __init__(self):
self.new_scraper = WebScrapingStrategy()
self.current_scraper = WebScrapingStrategyCurrent()
with open(__location__ + '/sample_wikipedia.html', 'r', encoding='utf-8') as f:
self.WIKI_HTML = f.read()
self.results = {'new': [], 'current': []}
def run_test(self, name: str, **kwargs) -> tuple[TestResult, TestResult]:
results = []
for scraper in [self.new_scraper, self.current_scraper]:
start_time = time.time()
result = scraper._get_content_of_website_optimized(
url="https://en.wikipedia.org/wiki/Test",
html=self.WIKI_HTML,
**kwargs
)
execution_time = time.time() - start_time
test_result = TestResult(
name=name,
success=result['success'],
images=len(result['media']['images']),
internal_links=len(result['links']['internal']),
external_links=len(result['links']['external']),
markdown_length=len(result['markdown']),
execution_time=execution_time
)
results.append(test_result)
return results[0], results[1] # new, current
def run_all_tests(self):
test_cases = [
("Basic Extraction", {}),
("Exclude Tags", {'excluded_tags': ['table', 'div.infobox', 'div.navbox']}),
("Word Threshold", {'word_count_threshold': 50}),
("CSS Selector", {'css_selector': 'div.mw-parser-output > p'}),
("Link Exclusions", {
'exclude_external_links': True,
'exclude_social_media_links': True,
'exclude_domains': ['facebook.com', 'twitter.com']
}),
("Media Handling", {
'exclude_external_images': True,
'image_description_min_word_threshold': 20
}),
("Text Only", {
'only_text': True,
'remove_forms': True
}),
("HTML Cleaning", {
'clean_html': True,
'keep_data_attributes': True
}),
("HTML2Text Options", {
'html2text': {
'skip_internal_links': True,
'single_line_break': True,
'mark_code': True,
'preserve_tags': ['pre', 'code']
}
})
]
all_results = []
for name, kwargs in test_cases:
try:
new_result, current_result = self.run_test(name, **kwargs)
all_results.append((name, new_result, current_result))
except Exception as e:
print(f"Error in {name}: {str(e)}")
self.save_results_to_csv(all_results)
self.print_comparison_table(all_results)
def save_results_to_csv(self, all_results: List[tuple]):
csv_file = os.path.join(__location__, 'strategy_comparison_results.csv')
with open(csv_file, 'w', newline='') as f:
writer = csv.writer(f)
writer.writerow(['Test Name', 'Strategy', 'Success', 'Images', 'Internal Links',
'External Links', 'Markdown Length', 'Execution Time'])
for name, new_result, current_result in all_results:
writer.writerow([name, 'New', new_result.success, new_result.images,
new_result.internal_links, new_result.external_links,
new_result.markdown_length, f"{new_result.execution_time:.3f}"])
writer.writerow([name, 'Current', current_result.success, current_result.images,
current_result.internal_links, current_result.external_links,
current_result.markdown_length, f"{current_result.execution_time:.3f}"])
def print_comparison_table(self, all_results: List[tuple]):
table_data = []
headers = ['Test Name', 'Strategy', 'Success', 'Images', 'Internal Links',
'External Links', 'Markdown Length', 'Time (s)']
for name, new_result, current_result in all_results:
# Check for differences
differences = []
if new_result.images != current_result.images: differences.append('images')
if new_result.internal_links != current_result.internal_links: differences.append('internal_links')
if new_result.external_links != current_result.external_links: differences.append('external_links')
if new_result.markdown_length != current_result.markdown_length: differences.append('markdown')
# Add row for new strategy
new_row = [
name, 'New', new_result.success, new_result.images,
new_result.internal_links, new_result.external_links,
new_result.markdown_length, f"{new_result.execution_time:.3f}"
]
table_data.append(new_row)
# Add row for current strategy
current_row = [
'', 'Current', current_result.success, current_result.images,
current_result.internal_links, current_result.external_links,
current_result.markdown_length, f"{current_result.execution_time:.3f}"
]
table_data.append(current_row)
# Add difference summary if any
if differences:
table_data.append(['', '⚠️ Differences', ', '.join(differences), '', '', '', '', ''])
# Add empty row for better readability
table_data.append([''] * len(headers))
print("\nStrategy Comparison Results:")
print(tabulate(table_data, headers=headers, tablefmt='grid'))
if __name__ == "__main__":
tester = StrategyTester()
tester.run_all_tests()