Compare commits
5 Commits
unclecode-
...
0.3.75
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
3a2cb7dacf | ||
|
|
c38ac29edb | ||
|
|
61b93ebf36 | ||
|
|
bf91adf3f8 | ||
|
|
b6d6631b12 |
1
.gitignore
vendored
1
.gitignore
vendored
@@ -199,6 +199,7 @@ test_env/
|
||||
**/.DS_Store
|
||||
|
||||
todo.md
|
||||
todo_executor.md
|
||||
git_changes.py
|
||||
git_changes.md
|
||||
pypi_build.sh
|
||||
|
||||
40
CHANGELOG.md
40
CHANGELOG.md
@@ -1,6 +1,36 @@
|
||||
# Changelog
|
||||
|
||||
# CHANGELOG
|
||||
# Changelog - November 13, 2024
|
||||
|
||||
### Added
|
||||
- Support for raw HTML and local file crawling via URL prefixes ('raw:', 'file://')
|
||||
- Browser process monitoring for managed browser instances
|
||||
- Screenshot capability for raw HTML and local file content
|
||||
- Response headers storage in cache database
|
||||
- New `fit_markdown` flag for optional markdown generation
|
||||
|
||||
### Changed
|
||||
- Switched HTML parser from 'html.parser' to 'lxml' for ~4x performance improvement
|
||||
- Optimized BeautifulSoup text conversion and element selection
|
||||
- Pre-compiled regular expressions for better performance
|
||||
- Improved metadata extraction efficiency
|
||||
- Response headers now stored alongside HTML in cache
|
||||
|
||||
### Removed
|
||||
- `__del__` method from AsyncPlaywrightCrawlerStrategy to prevent async cleanup issues
|
||||
|
||||
### Fixed
|
||||
- Issue #256: Added support for crawling raw HTML content
|
||||
- Issue #253: Implemented file:// protocol handling
|
||||
- Missing response headers in cached results
|
||||
- Memory leaks from improper async cleanup
|
||||
|
||||
## [v0.3.731] - 2024-11-13 Changelog for Issue 256 Fix
|
||||
- Fixed: Browser context unexpectedly closing in Docker environment during crawl operations.
|
||||
- Removed: __del__ method from AsyncPlaywrightCrawlerStrategy to prevent unreliable asynchronous cleanup, ensuring - browser context is closed explicitly within context managers.
|
||||
- Added: Monitoring for ManagedBrowser subprocess to detect and log unexpected terminations.
|
||||
- Updated: Dockerfile configurations to expose debugging port (9222) and allocate additional shared memory for improved browser stability.
|
||||
- Improved: Error handling and resource cleanup processes for browser lifecycle management within the Docker environment.
|
||||
|
||||
## [v0.3.73] - 2024-11-05
|
||||
|
||||
@@ -180,7 +210,7 @@ This commit introduces several key enhancements, including improved error handli
|
||||
## [v0.3.72] - 2024-10-20
|
||||
|
||||
### Fixed
|
||||
- Added support for parsing Base64 encoded images in WebScrappingStrategy
|
||||
- Added support for parsing Base64 encoded images in WebScrapingStrategy
|
||||
|
||||
### Added
|
||||
- Forked and integrated a customized version of the html2text library for more control over Markdown generation
|
||||
@@ -203,7 +233,7 @@ This commit introduces several key enhancements, including improved error handli
|
||||
### Developer Notes
|
||||
- The customized html2text library is now located within the crawl4ai package
|
||||
- New configuration options are available in the `config.py` file for external content handling
|
||||
- The `WebScrappingStrategy` class has been updated to accommodate new external content exclusion options
|
||||
- The `WebScrapingStrategy` class has been updated to accommodate new external content exclusion options
|
||||
|
||||
## [v0.3.71] - 2024-10-19
|
||||
|
||||
@@ -280,7 +310,7 @@ These updates aim to provide more flexibility in text processing, improve perfor
|
||||
|
||||
### Improvements
|
||||
1. **Better Error Handling**:
|
||||
- Enhanced error reporting in WebScrappingStrategy with detailed error messages and suggestions.
|
||||
- Enhanced error reporting in WebScrapingStrategy with detailed error messages and suggestions.
|
||||
- Added console message and error logging for better debugging.
|
||||
|
||||
2. **Image Processing Enhancements**:
|
||||
@@ -345,7 +375,7 @@ These updates aim to provide more flexibility in text processing, improve perfor
|
||||
- Allows for more customized setups.
|
||||
|
||||
### 2. Image Processing Optimization
|
||||
- Enhanced image handling in WebScrappingStrategy.
|
||||
- Enhanced image handling in WebScrapingStrategy.
|
||||
- Added filtering for small, invisible, or irrelevant images.
|
||||
- Improved image scoring system for better content relevance.
|
||||
- Implemented JavaScript-based image dimension updating for more accurate representation.
|
||||
|
||||
@@ -115,7 +115,12 @@ HEALTHCHECK --interval=30s --timeout=30s --start-period=5s --retries=3 \
|
||||
CMD curl -f http://localhost:8000/health || exit 1
|
||||
|
||||
# Expose port
|
||||
EXPOSE 8000
|
||||
EXPOSE 8000 11235 9222 8080
|
||||
|
||||
# Optional: Increase shared memory size to prevent browser crashes
|
||||
# when loading heavy pages
|
||||
RUN mkdir /dev/shm
|
||||
VOLUME /dev/shm
|
||||
|
||||
# Start the FastAPI server
|
||||
CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "11235"]
|
||||
@@ -1,4 +1,4 @@
|
||||
# 🔥🕷️ Crawl4AI: LLM Friendly Web Crawler & Scrapper
|
||||
# 🔥🕷️ Crawl4AI: LLM Friendly Web Crawler & Scraper
|
||||
|
||||
<a href="https://trendshift.io/repositories/11716" target="_blank"><img src="https://trendshift.io/api/badge/repositories/11716" alt="unclecode%2Fcrawl4ai | Trendshift" style="width: 250px; height: 55px;" width="250" height="55"/></a>
|
||||
|
||||
@@ -127,6 +127,9 @@ docker pull unclecode/crawl4ai:gpu # GPU-enabled version
|
||||
|
||||
# Run the container
|
||||
docker run -p 11235:11235 unclecode/crawl4ai:basic # Replace 'basic' with your chosen version
|
||||
|
||||
# In case to allocate more shared memory for the container
|
||||
docker run --shm-size=2gb -p 11235:11235 unclecode/crawl4ai:basic
|
||||
```
|
||||
|
||||
#### Option 2: Build from Repository
|
||||
|
||||
@@ -26,5 +26,5 @@ if is_sync_version_installed():
|
||||
print("Warning: Failed to import WebCrawler even though selenium is installed. This might be due to other missing dependencies.")
|
||||
else:
|
||||
WebCrawler = None
|
||||
import warnings
|
||||
print("Warning: Synchronous WebCrawler is not available. Install crawl4ai[sync] for synchronous support. However, please note that the synchronous version will be deprecated soon.")
|
||||
# import warnings
|
||||
# print("Warning: Synchronous WebCrawler is not available. Install crawl4ai[sync] for synchronous support. However, please note that the synchronous version will be deprecated soon.")
|
||||
@@ -1,2 +1,2 @@
|
||||
# crawl4ai/_version.py
|
||||
__version__ = "0.3.73"
|
||||
__version__ = "0.3.731"
|
||||
@@ -64,12 +64,27 @@ class ManagedBrowser:
|
||||
stdout=subprocess.PIPE,
|
||||
stderr=subprocess.PIPE
|
||||
)
|
||||
# Monitor browser process output for errors
|
||||
asyncio.create_task(self._monitor_browser_process())
|
||||
await asyncio.sleep(2) # Give browser time to start
|
||||
return f"http://localhost:{self.debugging_port}"
|
||||
except Exception as e:
|
||||
await self.cleanup()
|
||||
raise Exception(f"Failed to start browser: {e}")
|
||||
|
||||
async def _monitor_browser_process(self):
|
||||
"""Monitor the browser process for unexpected termination."""
|
||||
if self.browser_process:
|
||||
stdout, stderr = await asyncio.gather(
|
||||
asyncio.to_thread(self.browser_process.stdout.read),
|
||||
asyncio.to_thread(self.browser_process.stderr.read)
|
||||
)
|
||||
if self.browser_process.poll() is not None:
|
||||
print(f"Browser process terminated unexpectedly with code {self.browser_process.returncode}")
|
||||
print(f"STDOUT: {stdout.decode()}")
|
||||
print(f"STDERR: {stderr.decode()}")
|
||||
await self.cleanup()
|
||||
|
||||
def _get_browser_path(self) -> str:
|
||||
"""Returns the browser executable path based on OS and browser type"""
|
||||
if sys.platform == "darwin": # macOS
|
||||
@@ -186,6 +201,8 @@ class AsyncPlaywrightCrawlerStrategy(AsyncCrawlerStrategy):
|
||||
self.sleep_on_close = kwargs.get("sleep_on_close", False)
|
||||
self.use_managed_browser = kwargs.get("use_managed_browser", False)
|
||||
self.user_data_dir = kwargs.get("user_data_dir", None)
|
||||
self.use_persistent_context = kwargs.get("use_persistent_context", False)
|
||||
self.chrome_channel = kwargs.get("chrome_channel", "chrome")
|
||||
self.managed_browser = None
|
||||
self.default_context = None
|
||||
self.hooks = {
|
||||
@@ -197,6 +214,7 @@ class AsyncPlaywrightCrawlerStrategy(AsyncCrawlerStrategy):
|
||||
'before_return_html': None,
|
||||
'before_retrieve_html': None
|
||||
}
|
||||
self.extra_args = kwargs.get("extra_args", [])
|
||||
|
||||
async def __aenter__(self):
|
||||
await self.start()
|
||||
@@ -238,36 +256,71 @@ class AsyncPlaywrightCrawlerStrategy(AsyncCrawlerStrategy):
|
||||
"User-Agent": self.user_agent
|
||||
})
|
||||
else:
|
||||
# Base browser arguments
|
||||
browser_args = {
|
||||
"headless": self.headless,
|
||||
"args": [
|
||||
"--disable-gpu",
|
||||
"--no-sandbox",
|
||||
"--disable-dev-shm-usage",
|
||||
"--disable-blink-features=AutomationControlled",
|
||||
"--no-first-run",
|
||||
"--no-default-browser-check",
|
||||
"--disable-infobars",
|
||||
"--window-position=0,0",
|
||||
"--ignore-certificate-errors",
|
||||
"--ignore-certificate-errors-spki-list",
|
||||
# "--headless=new", # Use the new headless mode
|
||||
]
|
||||
}
|
||||
|
||||
# Add channel if specified (try Chrome first)
|
||||
if self.chrome_channel:
|
||||
browser_args["channel"] = self.chrome_channel
|
||||
|
||||
# Add extra args if provided
|
||||
if self.extra_args:
|
||||
browser_args["args"].extend(self.extra_args)
|
||||
|
||||
# Add proxy settings if a proxy is specified
|
||||
if self.proxy:
|
||||
proxy_settings = ProxySettings(server=self.proxy)
|
||||
browser_args["proxy"] = proxy_settings
|
||||
elif self.proxy_config:
|
||||
proxy_settings = ProxySettings(server=self.proxy_config.get("server"), username=self.proxy_config.get("username"), password=self.proxy_config.get("password"))
|
||||
proxy_settings = ProxySettings(
|
||||
server=self.proxy_config.get("server"),
|
||||
username=self.proxy_config.get("username"),
|
||||
password=self.proxy_config.get("password")
|
||||
)
|
||||
browser_args["proxy"] = proxy_settings
|
||||
|
||||
# Select the appropriate browser based on the browser_type
|
||||
if self.browser_type == "firefox":
|
||||
self.browser = await self.playwright.firefox.launch(**browser_args)
|
||||
elif self.browser_type == "webkit":
|
||||
self.browser = await self.playwright.webkit.launch(**browser_args)
|
||||
else:
|
||||
self.browser = await self.playwright.chromium.launch(**browser_args)
|
||||
try:
|
||||
# Select the appropriate browser based on the browser_type
|
||||
if self.browser_type == "firefox":
|
||||
self.browser = await self.playwright.firefox.launch(**browser_args)
|
||||
elif self.browser_type == "webkit":
|
||||
self.browser = await self.playwright.webkit.launch(**browser_args)
|
||||
else:
|
||||
if self.use_persistent_context and self.user_data_dir:
|
||||
self.browser = await self.playwright.chromium.launch_persistent_context(
|
||||
user_data_dir=self.user_data_dir,
|
||||
**browser_args
|
||||
)
|
||||
self.default_context = self.browser
|
||||
else:
|
||||
self.browser = await self.playwright.chromium.launch(**browser_args)
|
||||
|
||||
except Exception as e:
|
||||
# Fallback to chromium if Chrome channel fails
|
||||
if "chrome" in str(e) and browser_args.get("channel") == "chrome":
|
||||
browser_args["channel"] = "chromium"
|
||||
if self.use_persistent_context and self.user_data_dir:
|
||||
self.browser = await self.playwright.chromium.launch_persistent_context(
|
||||
user_data_dir=self.user_data_dir,
|
||||
**browser_args
|
||||
)
|
||||
self.default_context = self.browser
|
||||
else:
|
||||
self.browser = await self.playwright.chromium.launch(**browser_args)
|
||||
else:
|
||||
raise
|
||||
|
||||
await self.execute_hook('on_browser_created', self.browser)
|
||||
|
||||
@@ -292,9 +345,10 @@ class AsyncPlaywrightCrawlerStrategy(AsyncCrawlerStrategy):
|
||||
await self.playwright.stop()
|
||||
self.playwright = None
|
||||
|
||||
def __del__(self):
|
||||
if self.browser or self.playwright:
|
||||
asyncio.get_event_loop().run_until_complete(self.close())
|
||||
# Issue #256: Remove __del__ method to avoid potential issues with async cleanup
|
||||
# def __del__(self):
|
||||
# if self.browser or self.playwright:
|
||||
# asyncio.get_event_loop().run_until_complete(self.close())
|
||||
|
||||
def set_hook(self, hook_type: str, hook: Callable):
|
||||
if hook_type in self.hooks:
|
||||
@@ -439,6 +493,75 @@ class AsyncPlaywrightCrawlerStrategy(AsyncCrawlerStrategy):
|
||||
return page
|
||||
|
||||
async def crawl(self, url: str, **kwargs) -> AsyncCrawlResponse:
|
||||
"""
|
||||
Crawls a given URL or processes raw HTML/local file content based on the URL prefix.
|
||||
|
||||
Args:
|
||||
url (str): The URL to crawl. Supported prefixes:
|
||||
- 'http://' or 'https://': Web URL to crawl.
|
||||
- 'file://': Local file path to process.
|
||||
- 'raw:': Raw HTML content to process.
|
||||
**kwargs: Additional parameters:
|
||||
- 'screenshot' (bool): Whether to take a screenshot.
|
||||
- ... [other existing parameters]
|
||||
|
||||
Returns:
|
||||
AsyncCrawlResponse: The response containing HTML, headers, status code, and optional screenshot.
|
||||
"""
|
||||
response_headers = {}
|
||||
status_code = 200 # Default to 200 for local/raw HTML
|
||||
screenshot_requested = kwargs.get('screenshot', False)
|
||||
screenshot_data = None
|
||||
|
||||
if url.startswith(('http://', 'https://')):
|
||||
# Proceed with standard web crawling
|
||||
return await self._crawl_web(url, **kwargs)
|
||||
|
||||
elif url.startswith('file://'):
|
||||
# Process local file
|
||||
local_file_path = url[7:] # Remove 'file://' prefix
|
||||
if not os.path.exists(local_file_path):
|
||||
raise FileNotFoundError(f"Local file not found: {local_file_path}")
|
||||
with open(local_file_path, 'r', encoding='utf-8') as f:
|
||||
html = f.read()
|
||||
if screenshot_requested:
|
||||
screenshot_data = await self._generate_screenshot_from_html(html)
|
||||
return AsyncCrawlResponse(
|
||||
html=html,
|
||||
response_headers=response_headers,
|
||||
status_code=status_code,
|
||||
screenshot=screenshot_data,
|
||||
get_delayed_content=None
|
||||
)
|
||||
|
||||
elif url.startswith('raw:'):
|
||||
# Process raw HTML content
|
||||
raw_html = url[4:] # Remove 'raw:' prefix
|
||||
html = raw_html
|
||||
if screenshot_requested:
|
||||
screenshot_data = await self._generate_screenshot_from_html(html)
|
||||
return AsyncCrawlResponse(
|
||||
html=html,
|
||||
response_headers=response_headers,
|
||||
status_code=status_code,
|
||||
screenshot=screenshot_data,
|
||||
get_delayed_content=None
|
||||
)
|
||||
else:
|
||||
raise ValueError("URL must start with 'http://', 'https://', 'file://', or 'raw:'")
|
||||
|
||||
|
||||
async def _crawl_web(self, url: str, **kwargs) -> AsyncCrawlResponse:
|
||||
"""
|
||||
Existing web crawling logic remains unchanged.
|
||||
|
||||
Args:
|
||||
url (str): The web URL to crawl.
|
||||
**kwargs: Additional parameters.
|
||||
|
||||
Returns:
|
||||
AsyncCrawlResponse: The response containing HTML, headers, status code, and optional screenshot.
|
||||
"""
|
||||
response_headers = {}
|
||||
status_code = None
|
||||
|
||||
@@ -461,24 +584,35 @@ class AsyncPlaywrightCrawlerStrategy(AsyncCrawlerStrategy):
|
||||
if session_id:
|
||||
context, page, _ = self.sessions.get(session_id, (None, None, None))
|
||||
if not context:
|
||||
if self.use_persistent_context and self.browser_type in ["chrome", "chromium"]:
|
||||
# In persistent context, browser is the context
|
||||
context = self.browser
|
||||
page = await context.new_page()
|
||||
else:
|
||||
# Normal context creation for non-persistent or non-Chrome browsers
|
||||
context = await self.browser.new_context(
|
||||
user_agent=self.user_agent,
|
||||
viewport={"width": 1920, "height": 1080},
|
||||
proxy={"server": self.proxy} if self.proxy else None,
|
||||
accept_downloads=True,
|
||||
java_script_enabled=True
|
||||
)
|
||||
await context.add_cookies([{"name": "cookiesEnabled", "value": "true", "url": url}])
|
||||
await context.set_extra_http_headers(self.headers)
|
||||
page = await context.new_page()
|
||||
self.sessions[session_id] = (context, page, time.time())
|
||||
else:
|
||||
if self.use_persistent_context and self.browser_type in ["chrome", "chromium"]:
|
||||
# In persistent context, browser is the context
|
||||
context = self.browser
|
||||
else:
|
||||
# Normal context creation
|
||||
context = await self.browser.new_context(
|
||||
user_agent=self.user_agent,
|
||||
viewport={"width": 1920, "height": 1080},
|
||||
proxy={"server": self.proxy} if self.proxy else None,
|
||||
accept_downloads=True,
|
||||
java_script_enabled=True
|
||||
proxy={"server": self.proxy} if self.proxy else None
|
||||
)
|
||||
await context.add_cookies([{"name": "cookiesEnabled", "value": "true", "url": url}])
|
||||
await context.set_extra_http_headers(self.headers)
|
||||
page = await context.new_page()
|
||||
self.sessions[session_id] = (context, page, time.time())
|
||||
else:
|
||||
context = await self.browser.new_context(
|
||||
user_agent=self.user_agent,
|
||||
viewport={"width": 1920, "height": 1080},
|
||||
proxy={"server": self.proxy} if self.proxy else None
|
||||
)
|
||||
await context.set_extra_http_headers(self.headers)
|
||||
|
||||
if kwargs.get("override_navigator", False) or kwargs.get("simulate_user", False) or kwargs.get("magic", False):
|
||||
# Inject scripts to override navigator properties
|
||||
@@ -512,7 +646,8 @@ class AsyncPlaywrightCrawlerStrategy(AsyncCrawlerStrategy):
|
||||
""")
|
||||
|
||||
page = await context.new_page()
|
||||
# await stealth_async(page) #, stealth_config)
|
||||
if kwargs.get("magic", False):
|
||||
await stealth_async(page, stealth_config)
|
||||
|
||||
# Add console message and error logging
|
||||
if kwargs.get("log_console", False):
|
||||
@@ -544,8 +679,12 @@ class AsyncPlaywrightCrawlerStrategy(AsyncCrawlerStrategy):
|
||||
if not kwargs.get("js_only", False):
|
||||
await self.execute_hook('before_goto', page)
|
||||
|
||||
|
||||
response = await page.goto(
|
||||
url, wait_until="domcontentloaded", timeout=kwargs.get("page_timeout", 60000)
|
||||
url,
|
||||
# wait_until=kwargs.get("wait_until", ["domcontentloaded", "networkidle"]),
|
||||
wait_until=kwargs.get("wait_until", "domcontentloaded"),
|
||||
timeout=kwargs.get("page_timeout", 60000)
|
||||
)
|
||||
|
||||
# response = await page.goto("about:blank")
|
||||
@@ -722,7 +861,7 @@ class AsyncPlaywrightCrawlerStrategy(AsyncCrawlerStrategy):
|
||||
|
||||
if self.verbose:
|
||||
print(f"[LOG] ✅ Crawled {url} successfully!")
|
||||
|
||||
|
||||
if self.use_cached_html:
|
||||
cache_file_path = os.path.join(
|
||||
Path.home(), ".crawl4ai", "cache", hashlib.md5(url.encode()).hexdigest()
|
||||
@@ -902,6 +1041,15 @@ class AsyncPlaywrightCrawlerStrategy(AsyncCrawlerStrategy):
|
||||
print(f"Warning: Failed to remove overlay elements: {str(e)}")
|
||||
|
||||
async def take_screenshot(self, page: Page) -> str:
|
||||
"""
|
||||
Takes a screenshot of the current page.
|
||||
|
||||
Args:
|
||||
page (Page): The Playwright page instance
|
||||
|
||||
Returns:
|
||||
str: Base64-encoded screenshot image
|
||||
"""
|
||||
try:
|
||||
# The page is already loaded, just take the screenshot
|
||||
screenshot = await page.screenshot(full_page=True)
|
||||
@@ -921,4 +1069,36 @@ class AsyncPlaywrightCrawlerStrategy(AsyncCrawlerStrategy):
|
||||
return base64.b64encode(buffered.getvalue()).decode('utf-8')
|
||||
finally:
|
||||
await page.close()
|
||||
|
||||
async def _generate_screenshot_from_html(self, html: str) -> Optional[str]:
|
||||
"""
|
||||
Generates a screenshot from raw HTML content.
|
||||
|
||||
Args:
|
||||
html (str): The HTML content to render and capture.
|
||||
|
||||
Returns:
|
||||
Optional[str]: Base64-encoded screenshot image or an error image if failed.
|
||||
"""
|
||||
try:
|
||||
if not self.browser:
|
||||
await self.start()
|
||||
page = await self.browser.new_page()
|
||||
await page.set_content(html, wait_until='networkidle')
|
||||
screenshot = await page.screenshot(full_page=True)
|
||||
await page.close()
|
||||
return base64.b64encode(screenshot).decode('utf-8')
|
||||
except Exception as e:
|
||||
error_message = f"Failed to take screenshot: {str(e)}"
|
||||
print(error_message)
|
||||
|
||||
# Generate an error image
|
||||
img = Image.new('RGB', (800, 600), color='black')
|
||||
draw = ImageDraw.Draw(img)
|
||||
font = ImageFont.load_default()
|
||||
draw.text((10, 10), error_message, fill=(255, 255, 255), font=font)
|
||||
|
||||
buffered = BytesIO()
|
||||
img.save(buffered, format="JPEG")
|
||||
return base64.b64encode(buffered.getvalue()).decode('utf-8')
|
||||
|
||||
|
||||
965
crawl4ai/async_crawler_strategy_0.3.73.py
Normal file
965
crawl4ai/async_crawler_strategy_0.3.73.py
Normal file
@@ -0,0 +1,965 @@
|
||||
import asyncio
|
||||
import base64
|
||||
import time
|
||||
from abc import ABC, abstractmethod
|
||||
from typing import Callable, Dict, Any, List, Optional, Awaitable
|
||||
import os, sys, shutil
|
||||
import tempfile, subprocess
|
||||
from playwright.async_api import async_playwright, Page, Browser, Error
|
||||
from io import BytesIO
|
||||
from PIL import Image, ImageDraw, ImageFont
|
||||
from pathlib import Path
|
||||
from playwright.async_api import ProxySettings
|
||||
from pydantic import BaseModel
|
||||
import hashlib
|
||||
import json
|
||||
import uuid
|
||||
|
||||
from playwright_stealth import StealthConfig, stealth_async
|
||||
|
||||
stealth_config = StealthConfig(
|
||||
webdriver=True,
|
||||
chrome_app=True,
|
||||
chrome_csi=True,
|
||||
chrome_load_times=True,
|
||||
chrome_runtime=True,
|
||||
navigator_languages=True,
|
||||
navigator_plugins=True,
|
||||
navigator_permissions=True,
|
||||
webgl_vendor=True,
|
||||
outerdimensions=True,
|
||||
navigator_hardware_concurrency=True,
|
||||
media_codecs=True,
|
||||
)
|
||||
|
||||
|
||||
class ManagedBrowser:
|
||||
def __init__(self, browser_type: str = "chromium", user_data_dir: Optional[str] = None, headless: bool = False):
|
||||
self.browser_type = browser_type
|
||||
self.user_data_dir = user_data_dir
|
||||
self.headless = headless
|
||||
self.browser_process = None
|
||||
self.temp_dir = None
|
||||
self.debugging_port = 9222
|
||||
|
||||
async def start(self) -> str:
|
||||
"""
|
||||
Starts the browser process and returns the CDP endpoint URL.
|
||||
If user_data_dir is not provided, creates a temporary directory.
|
||||
"""
|
||||
|
||||
# Create temp dir if needed
|
||||
if not self.user_data_dir:
|
||||
self.temp_dir = tempfile.mkdtemp(prefix="browser-profile-")
|
||||
self.user_data_dir = self.temp_dir
|
||||
|
||||
# Get browser path and args based on OS and browser type
|
||||
browser_path = self._get_browser_path()
|
||||
args = self._get_browser_args()
|
||||
|
||||
# Start browser process
|
||||
try:
|
||||
self.browser_process = subprocess.Popen(
|
||||
args,
|
||||
stdout=subprocess.PIPE,
|
||||
stderr=subprocess.PIPE
|
||||
)
|
||||
await asyncio.sleep(2) # Give browser time to start
|
||||
return f"http://localhost:{self.debugging_port}"
|
||||
except Exception as e:
|
||||
await self.cleanup()
|
||||
raise Exception(f"Failed to start browser: {e}")
|
||||
|
||||
def _get_browser_path(self) -> str:
|
||||
"""Returns the browser executable path based on OS and browser type"""
|
||||
if sys.platform == "darwin": # macOS
|
||||
paths = {
|
||||
"chromium": "/Applications/Google Chrome.app/Contents/MacOS/Google Chrome",
|
||||
"firefox": "/Applications/Firefox.app/Contents/MacOS/firefox",
|
||||
"webkit": "/Applications/Safari.app/Contents/MacOS/Safari"
|
||||
}
|
||||
elif sys.platform == "win32": # Windows
|
||||
paths = {
|
||||
"chromium": "C:\\Program Files\\Google\\Chrome\\Application\\chrome.exe",
|
||||
"firefox": "C:\\Program Files\\Mozilla Firefox\\firefox.exe",
|
||||
"webkit": None # WebKit not supported on Windows
|
||||
}
|
||||
else: # Linux
|
||||
paths = {
|
||||
"chromium": "google-chrome",
|
||||
"firefox": "firefox",
|
||||
"webkit": None # WebKit not supported on Linux
|
||||
}
|
||||
|
||||
return paths.get(self.browser_type)
|
||||
|
||||
def _get_browser_args(self) -> List[str]:
|
||||
"""Returns browser-specific command line arguments"""
|
||||
base_args = [self._get_browser_path()]
|
||||
|
||||
if self.browser_type == "chromium":
|
||||
args = [
|
||||
f"--remote-debugging-port={self.debugging_port}",
|
||||
f"--user-data-dir={self.user_data_dir}",
|
||||
]
|
||||
if self.headless:
|
||||
args.append("--headless=new")
|
||||
elif self.browser_type == "firefox":
|
||||
args = [
|
||||
"--remote-debugging-port", str(self.debugging_port),
|
||||
"--profile", self.user_data_dir,
|
||||
]
|
||||
if self.headless:
|
||||
args.append("--headless")
|
||||
else:
|
||||
raise NotImplementedError(f"Browser type {self.browser_type} not supported")
|
||||
|
||||
return base_args + args
|
||||
|
||||
async def cleanup(self):
|
||||
"""Cleanup browser process and temporary directory"""
|
||||
if self.browser_process:
|
||||
try:
|
||||
self.browser_process.terminate()
|
||||
await asyncio.sleep(1)
|
||||
if self.browser_process.poll() is None:
|
||||
self.browser_process.kill()
|
||||
except Exception as e:
|
||||
print(f"Error terminating browser: {e}")
|
||||
|
||||
if self.temp_dir and os.path.exists(self.temp_dir):
|
||||
try:
|
||||
shutil.rmtree(self.temp_dir)
|
||||
except Exception as e:
|
||||
print(f"Error removing temporary directory: {e}")
|
||||
|
||||
class AsyncCrawlResponse(BaseModel):
|
||||
html: str
|
||||
response_headers: Dict[str, str]
|
||||
status_code: int
|
||||
screenshot: Optional[str] = None
|
||||
get_delayed_content: Optional[Callable[[Optional[float]], Awaitable[str]]] = None
|
||||
|
||||
class Config:
|
||||
arbitrary_types_allowed = True
|
||||
|
||||
class AsyncCrawlerStrategy(ABC):
|
||||
@abstractmethod
|
||||
async def crawl(self, url: str, **kwargs) -> AsyncCrawlResponse:
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
async def crawl_many(self, urls: List[str], **kwargs) -> List[AsyncCrawlResponse]:
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
async def take_screenshot(self, **kwargs) -> str:
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def update_user_agent(self, user_agent: str):
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def set_hook(self, hook_type: str, hook: Callable):
|
||||
pass
|
||||
|
||||
class AsyncPlaywrightCrawlerStrategy(AsyncCrawlerStrategy):
|
||||
def __init__(self, use_cached_html=False, js_code=None, **kwargs):
|
||||
self.use_cached_html = use_cached_html
|
||||
self.user_agent = kwargs.get(
|
||||
"user_agent",
|
||||
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 "
|
||||
"(KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36"
|
||||
)
|
||||
self.proxy = kwargs.get("proxy")
|
||||
self.proxy_config = kwargs.get("proxy_config")
|
||||
self.headless = kwargs.get("headless", True)
|
||||
self.browser_type = kwargs.get("browser_type", "chromium")
|
||||
self.headers = kwargs.get("headers", {})
|
||||
self.sessions = {}
|
||||
self.session_ttl = 1800
|
||||
self.js_code = js_code
|
||||
self.verbose = kwargs.get("verbose", False)
|
||||
self.playwright = None
|
||||
self.browser = None
|
||||
self.sleep_on_close = kwargs.get("sleep_on_close", False)
|
||||
self.use_managed_browser = kwargs.get("use_managed_browser", False)
|
||||
self.user_data_dir = kwargs.get("user_data_dir", None)
|
||||
self.use_persistent_context = kwargs.get("use_persistent_context", False)
|
||||
self.chrome_channel = kwargs.get("chrome_channel", "chrome")
|
||||
self.managed_browser = None
|
||||
self.default_context = None
|
||||
self.hooks = {
|
||||
'on_browser_created': None,
|
||||
'on_user_agent_updated': None,
|
||||
'on_execution_started': None,
|
||||
'before_goto': None,
|
||||
'after_goto': None,
|
||||
'before_return_html': None,
|
||||
'before_retrieve_html': None
|
||||
}
|
||||
self.extra_args = kwargs.get("extra_args", [])
|
||||
|
||||
async def __aenter__(self):
|
||||
await self.start()
|
||||
return self
|
||||
|
||||
async def __aexit__(self, exc_type, exc_val, exc_tb):
|
||||
await self.close()
|
||||
|
||||
async def start(self):
|
||||
if self.playwright is None:
|
||||
self.playwright = await async_playwright().start()
|
||||
if self.browser is None:
|
||||
if self.use_managed_browser:
|
||||
# Use managed browser approach
|
||||
self.managed_browser = ManagedBrowser(
|
||||
browser_type=self.browser_type,
|
||||
user_data_dir=self.user_data_dir,
|
||||
headless=self.headless
|
||||
)
|
||||
cdp_url = await self.managed_browser.start()
|
||||
self.browser = await self.playwright.chromium.connect_over_cdp(cdp_url)
|
||||
|
||||
# Get the default context that maintains the user profile
|
||||
contexts = self.browser.contexts
|
||||
if contexts:
|
||||
self.default_context = contexts[0]
|
||||
else:
|
||||
# If no default context exists, create one
|
||||
self.default_context = await self.browser.new_context(
|
||||
viewport={"width": 1920, "height": 1080}
|
||||
)
|
||||
|
||||
# Set up the default context
|
||||
if self.default_context:
|
||||
await self.default_context.set_extra_http_headers(self.headers)
|
||||
|
||||
if self.user_agent:
|
||||
await self.default_context.set_extra_http_headers({
|
||||
"User-Agent": self.user_agent
|
||||
})
|
||||
else:
|
||||
browser_args = {
|
||||
"headless": self.headless,
|
||||
"args": [
|
||||
"--disable-gpu",
|
||||
"--no-sandbox",
|
||||
"--disable-dev-shm-usage",
|
||||
"--disable-blink-features=AutomationControlled",
|
||||
"--disable-infobars",
|
||||
"--window-position=0,0",
|
||||
"--ignore-certificate-errors",
|
||||
"--ignore-certificate-errors-spki-list",
|
||||
# "--disable-http2",
|
||||
# "--headless=new", # Use the new headless mode
|
||||
]
|
||||
}
|
||||
|
||||
# Add extra args if provided
|
||||
if self.extra_args:
|
||||
browser_args["args"].extend(self.extra_args)
|
||||
|
||||
# Add proxy settings if a proxy is specified
|
||||
if self.proxy:
|
||||
proxy_settings = ProxySettings(server=self.proxy)
|
||||
browser_args["proxy"] = proxy_settings
|
||||
elif self.proxy_config:
|
||||
proxy_settings = ProxySettings(server=self.proxy_config.get("server"), username=self.proxy_config.get("username"), password=self.proxy_config.get("password"))
|
||||
browser_args["proxy"] = proxy_settings
|
||||
|
||||
# Select the appropriate browser based on the browser_type
|
||||
if self.browser_type == "firefox":
|
||||
self.browser = await self.playwright.firefox.launch(**browser_args)
|
||||
elif self.browser_type == "webkit":
|
||||
self.browser = await self.playwright.webkit.launch(**browser_args)
|
||||
else:
|
||||
self.browser = await self.playwright.chromium.launch(**browser_args)
|
||||
|
||||
# Update the headless configuration
|
||||
if self.headless:
|
||||
# Use the new headless mode explicitly
|
||||
browser_args["args"].append("--headless=new")
|
||||
|
||||
await self.execute_hook('on_browser_created', self.browser)
|
||||
|
||||
async def close(self):
|
||||
if self.sleep_on_close:
|
||||
await asyncio.sleep(0.5)
|
||||
|
||||
# Close all active sessions
|
||||
session_ids = list(self.sessions.keys())
|
||||
for session_id in session_ids:
|
||||
await self.kill_session(session_id)
|
||||
|
||||
if self.browser:
|
||||
await self.browser.close()
|
||||
self.browser = None
|
||||
|
||||
if self.managed_browser:
|
||||
await self.managed_browser.cleanup()
|
||||
self.managed_browser = None
|
||||
|
||||
if self.playwright:
|
||||
await self.playwright.stop()
|
||||
self.playwright = None
|
||||
|
||||
def __del__(self):
|
||||
if self.browser or self.playwright:
|
||||
asyncio.get_event_loop().run_until_complete(self.close())
|
||||
|
||||
def set_hook(self, hook_type: str, hook: Callable):
|
||||
if hook_type in self.hooks:
|
||||
self.hooks[hook_type] = hook
|
||||
else:
|
||||
raise ValueError(f"Invalid hook type: {hook_type}")
|
||||
|
||||
async def execute_hook(self, hook_type: str, *args):
|
||||
hook = self.hooks.get(hook_type)
|
||||
if hook:
|
||||
if asyncio.iscoroutinefunction(hook):
|
||||
return await hook(*args)
|
||||
else:
|
||||
return hook(*args)
|
||||
return args[0] if args else None
|
||||
|
||||
def update_user_agent(self, user_agent: str):
|
||||
self.user_agent = user_agent
|
||||
|
||||
def set_custom_headers(self, headers: Dict[str, str]):
|
||||
self.headers = headers
|
||||
|
||||
async def kill_session(self, session_id: str):
|
||||
if session_id in self.sessions:
|
||||
context, page, _ = self.sessions[session_id]
|
||||
await page.close()
|
||||
if not self.use_managed_browser:
|
||||
await context.close()
|
||||
del self.sessions[session_id]
|
||||
|
||||
def _cleanup_expired_sessions(self):
|
||||
current_time = time.time()
|
||||
expired_sessions = [
|
||||
sid for sid, (_, _, last_used) in self.sessions.items()
|
||||
if current_time - last_used > self.session_ttl
|
||||
]
|
||||
for sid in expired_sessions:
|
||||
asyncio.create_task(self.kill_session(sid))
|
||||
|
||||
async def smart_wait(self, page: Page, wait_for: str, timeout: float = 30000):
|
||||
wait_for = wait_for.strip()
|
||||
|
||||
if wait_for.startswith('js:'):
|
||||
# Explicitly specified JavaScript
|
||||
js_code = wait_for[3:].strip()
|
||||
return await self.csp_compliant_wait(page, js_code, timeout)
|
||||
elif wait_for.startswith('css:'):
|
||||
# Explicitly specified CSS selector
|
||||
css_selector = wait_for[4:].strip()
|
||||
try:
|
||||
await page.wait_for_selector(css_selector, timeout=timeout)
|
||||
except Error as e:
|
||||
if 'Timeout' in str(e):
|
||||
raise TimeoutError(f"Timeout after {timeout}ms waiting for selector '{css_selector}'")
|
||||
else:
|
||||
raise ValueError(f"Invalid CSS selector: '{css_selector}'")
|
||||
else:
|
||||
# Auto-detect based on content
|
||||
if wait_for.startswith('()') or wait_for.startswith('function'):
|
||||
# It's likely a JavaScript function
|
||||
return await self.csp_compliant_wait(page, wait_for, timeout)
|
||||
else:
|
||||
# Assume it's a CSS selector first
|
||||
try:
|
||||
await page.wait_for_selector(wait_for, timeout=timeout)
|
||||
except Error as e:
|
||||
if 'Timeout' in str(e):
|
||||
raise TimeoutError(f"Timeout after {timeout}ms waiting for selector '{wait_for}'")
|
||||
else:
|
||||
# If it's not a timeout error, it might be an invalid selector
|
||||
# Let's try to evaluate it as a JavaScript function as a fallback
|
||||
try:
|
||||
return await self.csp_compliant_wait(page, f"() => {{{wait_for}}}", timeout)
|
||||
except Error:
|
||||
raise ValueError(f"Invalid wait_for parameter: '{wait_for}'. "
|
||||
"It should be either a valid CSS selector, a JavaScript function, "
|
||||
"or explicitly prefixed with 'js:' or 'css:'.")
|
||||
|
||||
async def csp_compliant_wait(self, page: Page, user_wait_function: str, timeout: float = 30000):
|
||||
wrapper_js = f"""
|
||||
async () => {{
|
||||
const userFunction = {user_wait_function};
|
||||
const startTime = Date.now();
|
||||
while (true) {{
|
||||
if (await userFunction()) {{
|
||||
return true;
|
||||
}}
|
||||
if (Date.now() - startTime > {timeout}) {{
|
||||
throw new Error('Timeout waiting for condition');
|
||||
}}
|
||||
await new Promise(resolve => setTimeout(resolve, 100));
|
||||
}}
|
||||
}}
|
||||
"""
|
||||
|
||||
try:
|
||||
await page.evaluate(wrapper_js)
|
||||
except TimeoutError:
|
||||
raise TimeoutError(f"Timeout after {timeout}ms waiting for condition")
|
||||
except Exception as e:
|
||||
raise RuntimeError(f"Error in wait condition: {str(e)}")
|
||||
|
||||
async def process_iframes(self, page):
|
||||
# Find all iframes
|
||||
iframes = await page.query_selector_all('iframe')
|
||||
|
||||
for i, iframe in enumerate(iframes):
|
||||
try:
|
||||
# Add a unique identifier to the iframe
|
||||
await iframe.evaluate(f'(element) => element.id = "iframe-{i}"')
|
||||
|
||||
# Get the frame associated with this iframe
|
||||
frame = await iframe.content_frame()
|
||||
|
||||
if frame:
|
||||
# Wait for the frame to load
|
||||
await frame.wait_for_load_state('load', timeout=30000) # 30 seconds timeout
|
||||
|
||||
# Extract the content of the iframe's body
|
||||
iframe_content = await frame.evaluate('() => document.body.innerHTML')
|
||||
|
||||
# Generate a unique class name for this iframe
|
||||
class_name = f'extracted-iframe-content-{i}'
|
||||
|
||||
# Replace the iframe with a div containing the extracted content
|
||||
_iframe = iframe_content.replace('`', '\\`')
|
||||
await page.evaluate(f"""
|
||||
() => {{
|
||||
const iframe = document.getElementById('iframe-{i}');
|
||||
const div = document.createElement('div');
|
||||
div.innerHTML = `{_iframe}`;
|
||||
div.className = '{class_name}';
|
||||
iframe.replaceWith(div);
|
||||
}}
|
||||
""")
|
||||
else:
|
||||
print(f"Warning: Could not access content frame for iframe {i}")
|
||||
except Exception as e:
|
||||
print(f"Error processing iframe {i}: {str(e)}")
|
||||
|
||||
# Return the page object
|
||||
return page
|
||||
|
||||
async def crawl(self, url: str, **kwargs) -> AsyncCrawlResponse:
|
||||
response_headers = {}
|
||||
status_code = None
|
||||
|
||||
self._cleanup_expired_sessions()
|
||||
session_id = kwargs.get("session_id")
|
||||
|
||||
# Handle page creation differently for managed browser
|
||||
if self.use_managed_browser:
|
||||
if session_id:
|
||||
# Reuse existing session if available
|
||||
context, page, _ = self.sessions.get(session_id, (None, None, None))
|
||||
if not page:
|
||||
# Create new page in default context if session doesn't exist
|
||||
page = await self.default_context.new_page()
|
||||
self.sessions[session_id] = (self.default_context, page, time.time())
|
||||
else:
|
||||
# Create new page in default context for non-session requests
|
||||
page = await self.default_context.new_page()
|
||||
else:
|
||||
if session_id:
|
||||
context, page, _ = self.sessions.get(session_id, (None, None, None))
|
||||
if not context:
|
||||
context = await self.browser.new_context(
|
||||
user_agent=self.user_agent,
|
||||
viewport={"width": 1920, "height": 1080},
|
||||
proxy={"server": self.proxy} if self.proxy else None,
|
||||
accept_downloads=True,
|
||||
java_script_enabled=True
|
||||
)
|
||||
await context.add_cookies([{"name": "cookiesEnabled", "value": "true", "url": url}])
|
||||
await context.set_extra_http_headers(self.headers)
|
||||
page = await context.new_page()
|
||||
self.sessions[session_id] = (context, page, time.time())
|
||||
else:
|
||||
context = await self.browser.new_context(
|
||||
user_agent=self.user_agent,
|
||||
viewport={"width": 1920, "height": 1080},
|
||||
proxy={"server": self.proxy} if self.proxy else None
|
||||
)
|
||||
await context.set_extra_http_headers(self.headers)
|
||||
|
||||
if kwargs.get("override_navigator", False) or kwargs.get("simulate_user", False) or kwargs.get("magic", False):
|
||||
# Inject scripts to override navigator properties
|
||||
await context.add_init_script("""
|
||||
// Pass the Permissions Test.
|
||||
const originalQuery = window.navigator.permissions.query;
|
||||
window.navigator.permissions.query = (parameters) => (
|
||||
parameters.name === 'notifications' ?
|
||||
Promise.resolve({ state: Notification.permission }) :
|
||||
originalQuery(parameters)
|
||||
);
|
||||
Object.defineProperty(navigator, 'webdriver', {
|
||||
get: () => undefined
|
||||
});
|
||||
window.navigator.chrome = {
|
||||
runtime: {},
|
||||
// Add other properties if necessary
|
||||
};
|
||||
Object.defineProperty(navigator, 'plugins', {
|
||||
get: () => [1, 2, 3, 4, 5],
|
||||
});
|
||||
Object.defineProperty(navigator, 'languages', {
|
||||
get: () => ['en-US', 'en'],
|
||||
});
|
||||
Object.defineProperty(document, 'hidden', {
|
||||
get: () => false
|
||||
});
|
||||
Object.defineProperty(document, 'visibilityState', {
|
||||
get: () => 'visible'
|
||||
});
|
||||
""")
|
||||
|
||||
page = await context.new_page()
|
||||
if kwargs.get("magic", False):
|
||||
await stealth_async(page, stealth_config)
|
||||
|
||||
# Add console message and error logging
|
||||
if kwargs.get("log_console", False):
|
||||
page.on("console", lambda msg: print(f"Console: {msg.text}"))
|
||||
page.on("pageerror", lambda exc: print(f"Page Error: {exc}"))
|
||||
|
||||
try:
|
||||
if self.verbose:
|
||||
print(f"[LOG] 🕸️ Crawling {url} using AsyncPlaywrightCrawlerStrategy...")
|
||||
|
||||
if self.use_cached_html:
|
||||
cache_file_path = os.path.join(
|
||||
Path.home(), ".crawl4ai", "cache", hashlib.md5(url.encode()).hexdigest()
|
||||
)
|
||||
if os.path.exists(cache_file_path):
|
||||
html = ""
|
||||
with open(cache_file_path, "r") as f:
|
||||
html = f.read()
|
||||
# retrieve response headers and status code from cache
|
||||
with open(cache_file_path + ".meta", "r") as f:
|
||||
meta = json.load(f)
|
||||
response_headers = meta.get("response_headers", {})
|
||||
status_code = meta.get("status_code")
|
||||
response = AsyncCrawlResponse(
|
||||
html=html, response_headers=response_headers, status_code=status_code
|
||||
)
|
||||
return response
|
||||
|
||||
if not kwargs.get("js_only", False):
|
||||
await self.execute_hook('before_goto', page)
|
||||
|
||||
# response = await page.goto(
|
||||
# url, wait_until="domcontentloaded", timeout=kwargs.get("page_timeout", 60000)
|
||||
# )
|
||||
|
||||
# Add retry logic for HTTP2 errors
|
||||
max_retries = kwargs.get("max_retries", 3)
|
||||
current_try = 0
|
||||
|
||||
while current_try < max_retries:
|
||||
try:
|
||||
response = await page.goto(
|
||||
url,
|
||||
# wait_until=kwargs.get("wait_until", ["domcontentloaded", "networkidle"]),
|
||||
wait_until=kwargs.get("wait_until", "networkidle"),
|
||||
timeout=kwargs.get("page_timeout", 60000)
|
||||
)
|
||||
break
|
||||
except Exception as e:
|
||||
current_try += 1
|
||||
if "ERR_HTTP2_PROTOCOL_ERROR" in str(e):
|
||||
if current_try < max_retries:
|
||||
# Add exponential backoff
|
||||
await asyncio.sleep(2 ** current_try)
|
||||
# Try with different protocol
|
||||
if 'args' not in kwargs:
|
||||
kwargs['args'] = []
|
||||
kwargs['args'].extend(['--disable-http2'])
|
||||
continue
|
||||
if current_try == max_retries:
|
||||
raise
|
||||
|
||||
# response = await page.goto("about:blank")
|
||||
# await page.evaluate(f"window.location.href = '{url}'")
|
||||
|
||||
await self.execute_hook('after_goto', page)
|
||||
|
||||
# Get status code and headers
|
||||
status_code = response.status
|
||||
response_headers = response.headers
|
||||
else:
|
||||
status_code = 200
|
||||
response_headers = {}
|
||||
|
||||
# Replace the current wait_for_selector line with this more robust check:
|
||||
try:
|
||||
# First wait for body to exist, regardless of visibility
|
||||
await page.wait_for_selector('body', state='attached', timeout=30000)
|
||||
|
||||
# Then wait for it to become visible by checking CSS
|
||||
await page.wait_for_function("""
|
||||
() => {
|
||||
const body = document.body;
|
||||
const style = window.getComputedStyle(body);
|
||||
return style.display !== 'none' &&
|
||||
style.visibility !== 'hidden' &&
|
||||
style.opacity !== '0';
|
||||
}
|
||||
""", timeout=30000)
|
||||
|
||||
except Error as e:
|
||||
# If waiting fails, let's try to diagnose the issue
|
||||
visibility_info = await page.evaluate("""
|
||||
() => {
|
||||
const body = document.body;
|
||||
const style = window.getComputedStyle(body);
|
||||
return {
|
||||
display: style.display,
|
||||
visibility: style.visibility,
|
||||
opacity: style.opacity,
|
||||
hasContent: body.innerHTML.length,
|
||||
classList: Array.from(body.classList)
|
||||
}
|
||||
}
|
||||
""")
|
||||
|
||||
if self.verbose:
|
||||
print(f"Body visibility debug info: {visibility_info}")
|
||||
|
||||
# Even if body is hidden, we might still want to proceed
|
||||
if kwargs.get('ignore_body_visibility', True):
|
||||
if self.verbose:
|
||||
print("Proceeding despite hidden body...")
|
||||
pass
|
||||
else:
|
||||
raise Error(f"Body element is hidden: {visibility_info}")
|
||||
|
||||
await page.evaluate("window.scrollTo(0, document.body.scrollHeight)")
|
||||
|
||||
js_code = kwargs.get("js_code", kwargs.get("js", self.js_code))
|
||||
if js_code:
|
||||
if isinstance(js_code, str):
|
||||
await page.evaluate(js_code)
|
||||
elif isinstance(js_code, list):
|
||||
for js in js_code:
|
||||
await page.evaluate(js)
|
||||
|
||||
await page.wait_for_load_state('networkidle')
|
||||
# Check for on execution event
|
||||
await self.execute_hook('on_execution_started', page)
|
||||
|
||||
if kwargs.get("simulate_user", False) or kwargs.get("magic", False):
|
||||
# Simulate user interactions
|
||||
await page.mouse.move(100, 100)
|
||||
await page.mouse.down()
|
||||
await page.mouse.up()
|
||||
await page.keyboard.press('ArrowDown')
|
||||
|
||||
# Handle the wait_for parameter
|
||||
wait_for = kwargs.get("wait_for")
|
||||
if wait_for:
|
||||
try:
|
||||
await self.smart_wait(page, wait_for, timeout=kwargs.get("page_timeout", 60000))
|
||||
except Exception as e:
|
||||
raise RuntimeError(f"Wait condition failed: {str(e)}")
|
||||
|
||||
# Update image dimensions
|
||||
update_image_dimensions_js = """
|
||||
() => {
|
||||
return new Promise((resolve) => {
|
||||
const filterImage = (img) => {
|
||||
// Filter out images that are too small
|
||||
if (img.width < 100 && img.height < 100) return false;
|
||||
|
||||
// Filter out images that are not visible
|
||||
const rect = img.getBoundingClientRect();
|
||||
if (rect.width === 0 || rect.height === 0) return false;
|
||||
|
||||
// Filter out images with certain class names (e.g., icons, thumbnails)
|
||||
if (img.classList.contains('icon') || img.classList.contains('thumbnail')) return false;
|
||||
|
||||
// Filter out images with certain patterns in their src (e.g., placeholder images)
|
||||
if (img.src.includes('placeholder') || img.src.includes('icon')) return false;
|
||||
|
||||
return true;
|
||||
};
|
||||
|
||||
const images = Array.from(document.querySelectorAll('img')).filter(filterImage);
|
||||
let imagesLeft = images.length;
|
||||
|
||||
if (imagesLeft === 0) {
|
||||
resolve();
|
||||
return;
|
||||
}
|
||||
|
||||
const checkImage = (img) => {
|
||||
if (img.complete && img.naturalWidth !== 0) {
|
||||
img.setAttribute('width', img.naturalWidth);
|
||||
img.setAttribute('height', img.naturalHeight);
|
||||
imagesLeft--;
|
||||
if (imagesLeft === 0) resolve();
|
||||
}
|
||||
};
|
||||
|
||||
images.forEach(img => {
|
||||
checkImage(img);
|
||||
if (!img.complete) {
|
||||
img.onload = () => {
|
||||
checkImage(img);
|
||||
};
|
||||
img.onerror = () => {
|
||||
imagesLeft--;
|
||||
if (imagesLeft === 0) resolve();
|
||||
};
|
||||
}
|
||||
});
|
||||
|
||||
// Fallback timeout of 5 seconds
|
||||
// setTimeout(() => resolve(), 5000);
|
||||
resolve();
|
||||
});
|
||||
}
|
||||
"""
|
||||
await page.evaluate(update_image_dimensions_js)
|
||||
|
||||
# Wait a bit for any onload events to complete
|
||||
await page.wait_for_timeout(100)
|
||||
|
||||
# Process iframes
|
||||
if kwargs.get("process_iframes", False):
|
||||
page = await self.process_iframes(page)
|
||||
|
||||
await self.execute_hook('before_retrieve_html', page)
|
||||
# Check if delay_before_return_html is set then wait for that time
|
||||
delay_before_return_html = kwargs.get("delay_before_return_html")
|
||||
if delay_before_return_html:
|
||||
await asyncio.sleep(delay_before_return_html)
|
||||
|
||||
# Check for remove_overlay_elements parameter
|
||||
if kwargs.get("remove_overlay_elements", False):
|
||||
await self.remove_overlay_elements(page)
|
||||
|
||||
html = await page.content()
|
||||
await self.execute_hook('before_return_html', page, html)
|
||||
|
||||
# Check if kwargs has screenshot=True then take screenshot
|
||||
screenshot_data = None
|
||||
if kwargs.get("screenshot"):
|
||||
# Check we have screenshot_wait_for parameter, if we have simply wait for that time
|
||||
screenshot_wait_for = kwargs.get("screenshot_wait_for")
|
||||
if screenshot_wait_for:
|
||||
await asyncio.sleep(screenshot_wait_for)
|
||||
screenshot_data = await self.take_screenshot(page)
|
||||
|
||||
if self.verbose:
|
||||
print(f"[LOG] ✅ Crawled {url} successfully!")
|
||||
|
||||
if self.use_cached_html:
|
||||
cache_file_path = os.path.join(
|
||||
Path.home(), ".crawl4ai", "cache", hashlib.md5(url.encode()).hexdigest()
|
||||
)
|
||||
with open(cache_file_path, "w", encoding="utf-8") as f:
|
||||
f.write(html)
|
||||
# store response headers and status code in cache
|
||||
with open(cache_file_path + ".meta", "w", encoding="utf-8") as f:
|
||||
json.dump({
|
||||
"response_headers": response_headers,
|
||||
"status_code": status_code
|
||||
}, f)
|
||||
|
||||
async def get_delayed_content(delay: float = 5.0) -> str:
|
||||
if self.verbose:
|
||||
print(f"[LOG] Waiting for {delay} seconds before retrieving content for {url}")
|
||||
await asyncio.sleep(delay)
|
||||
return await page.content()
|
||||
|
||||
response = AsyncCrawlResponse(
|
||||
html=html,
|
||||
response_headers=response_headers,
|
||||
status_code=status_code,
|
||||
screenshot=screenshot_data,
|
||||
get_delayed_content=get_delayed_content
|
||||
)
|
||||
return response
|
||||
except Error as e:
|
||||
raise Error(f"[ERROR] 🚫 crawl(): Failed to crawl {url}: {str(e)}")
|
||||
# finally:
|
||||
# if not session_id:
|
||||
# await page.close()
|
||||
# await context.close()
|
||||
|
||||
async def crawl_many(self, urls: List[str], **kwargs) -> List[AsyncCrawlResponse]:
|
||||
semaphore_count = kwargs.get('semaphore_count', 5) # Adjust as needed
|
||||
semaphore = asyncio.Semaphore(semaphore_count)
|
||||
|
||||
async def crawl_with_semaphore(url):
|
||||
async with semaphore:
|
||||
return await self.crawl(url, **kwargs)
|
||||
|
||||
tasks = [crawl_with_semaphore(url) for url in urls]
|
||||
results = await asyncio.gather(*tasks, return_exceptions=True)
|
||||
return [result if not isinstance(result, Exception) else str(result) for result in results]
|
||||
|
||||
async def remove_overlay_elements(self, page: Page) -> None:
|
||||
"""
|
||||
Removes popup overlays, modals, cookie notices, and other intrusive elements from the page.
|
||||
|
||||
Args:
|
||||
page (Page): The Playwright page instance
|
||||
"""
|
||||
remove_overlays_js = """
|
||||
async () => {
|
||||
// Function to check if element is visible
|
||||
const isVisible = (elem) => {
|
||||
const style = window.getComputedStyle(elem);
|
||||
return style.display !== 'none' &&
|
||||
style.visibility !== 'hidden' &&
|
||||
style.opacity !== '0';
|
||||
};
|
||||
|
||||
// Common selectors for popups and overlays
|
||||
const commonSelectors = [
|
||||
// Close buttons first
|
||||
'button[class*="close" i]', 'button[class*="dismiss" i]',
|
||||
'button[aria-label*="close" i]', 'button[title*="close" i]',
|
||||
'a[class*="close" i]', 'span[class*="close" i]',
|
||||
|
||||
// Cookie notices
|
||||
'[class*="cookie-banner" i]', '[id*="cookie-banner" i]',
|
||||
'[class*="cookie-consent" i]', '[id*="cookie-consent" i]',
|
||||
|
||||
// Newsletter/subscription dialogs
|
||||
'[class*="newsletter" i]', '[class*="subscribe" i]',
|
||||
|
||||
// Generic popups/modals
|
||||
'[class*="popup" i]', '[class*="modal" i]',
|
||||
'[class*="overlay" i]', '[class*="dialog" i]',
|
||||
'[role="dialog"]', '[role="alertdialog"]'
|
||||
];
|
||||
|
||||
// Try to click close buttons first
|
||||
for (const selector of commonSelectors.slice(0, 6)) {
|
||||
const closeButtons = document.querySelectorAll(selector);
|
||||
for (const button of closeButtons) {
|
||||
if (isVisible(button)) {
|
||||
try {
|
||||
button.click();
|
||||
await new Promise(resolve => setTimeout(resolve, 100));
|
||||
} catch (e) {
|
||||
console.log('Error clicking button:', e);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Remove remaining overlay elements
|
||||
const removeOverlays = () => {
|
||||
// Find elements with high z-index
|
||||
const allElements = document.querySelectorAll('*');
|
||||
for (const elem of allElements) {
|
||||
const style = window.getComputedStyle(elem);
|
||||
const zIndex = parseInt(style.zIndex);
|
||||
const position = style.position;
|
||||
|
||||
if (
|
||||
isVisible(elem) &&
|
||||
(zIndex > 999 || position === 'fixed' || position === 'absolute') &&
|
||||
(
|
||||
elem.offsetWidth > window.innerWidth * 0.5 ||
|
||||
elem.offsetHeight > window.innerHeight * 0.5 ||
|
||||
style.backgroundColor.includes('rgba') ||
|
||||
parseFloat(style.opacity) < 1
|
||||
)
|
||||
) {
|
||||
elem.remove();
|
||||
}
|
||||
}
|
||||
|
||||
// Remove elements matching common selectors
|
||||
for (const selector of commonSelectors) {
|
||||
const elements = document.querySelectorAll(selector);
|
||||
elements.forEach(elem => {
|
||||
if (isVisible(elem)) {
|
||||
elem.remove();
|
||||
}
|
||||
});
|
||||
}
|
||||
};
|
||||
|
||||
// Remove overlay elements
|
||||
removeOverlays();
|
||||
|
||||
// Remove any fixed/sticky position elements at the top/bottom
|
||||
const removeFixedElements = () => {
|
||||
const elements = document.querySelectorAll('*');
|
||||
elements.forEach(elem => {
|
||||
const style = window.getComputedStyle(elem);
|
||||
if (
|
||||
(style.position === 'fixed' || style.position === 'sticky') &&
|
||||
isVisible(elem)
|
||||
) {
|
||||
elem.remove();
|
||||
}
|
||||
});
|
||||
};
|
||||
|
||||
removeFixedElements();
|
||||
|
||||
// Remove empty block elements as: div, p, span, etc.
|
||||
const removeEmptyBlockElements = () => {
|
||||
const blockElements = document.querySelectorAll('div, p, span, section, article, header, footer, aside, nav, main, ul, ol, li, dl, dt, dd, h1, h2, h3, h4, h5, h6');
|
||||
blockElements.forEach(elem => {
|
||||
if (elem.innerText.trim() === '') {
|
||||
elem.remove();
|
||||
}
|
||||
});
|
||||
};
|
||||
|
||||
// Remove margin-right and padding-right from body (often added by modal scripts)
|
||||
document.body.style.marginRight = '0px';
|
||||
document.body.style.paddingRight = '0px';
|
||||
document.body.style.overflow = 'auto';
|
||||
|
||||
// Wait a bit for any animations to complete
|
||||
await new Promise(resolve => setTimeout(resolve, 100));
|
||||
}
|
||||
"""
|
||||
|
||||
try:
|
||||
await page.evaluate(remove_overlays_js)
|
||||
await page.wait_for_timeout(500) # Wait for any animations to complete
|
||||
except Exception as e:
|
||||
if self.verbose:
|
||||
print(f"Warning: Failed to remove overlay elements: {str(e)}")
|
||||
|
||||
async def take_screenshot(self, page: Page) -> str:
|
||||
try:
|
||||
# The page is already loaded, just take the screenshot
|
||||
screenshot = await page.screenshot(full_page=True)
|
||||
return base64.b64encode(screenshot).decode('utf-8')
|
||||
except Exception as e:
|
||||
error_message = f"Failed to take screenshot: {str(e)}"
|
||||
print(error_message)
|
||||
|
||||
# Generate an error image
|
||||
img = Image.new('RGB', (800, 600), color='black')
|
||||
draw = ImageDraw.Draw(img)
|
||||
font = ImageFont.load_default()
|
||||
draw.text((10, 10), error_message, fill=(255, 255, 255), font=font)
|
||||
|
||||
buffered = BytesIO()
|
||||
img.save(buffered, format="JPEG")
|
||||
return base64.b64encode(buffered.getvalue()).decode('utf-8')
|
||||
finally:
|
||||
await page.close()
|
||||
|
||||
@@ -5,6 +5,7 @@ import asyncio
|
||||
from typing import Optional, Tuple, Dict
|
||||
from contextlib import asynccontextmanager
|
||||
import logging
|
||||
import json # Added for serialization/deserialization
|
||||
|
||||
# Set up logging
|
||||
logging.basicConfig(level=logging.INFO)
|
||||
@@ -89,7 +90,8 @@ class AsyncDatabaseManager:
|
||||
media TEXT DEFAULT "{}",
|
||||
links TEXT DEFAULT "{}",
|
||||
metadata TEXT DEFAULT "{}",
|
||||
screenshot TEXT DEFAULT ""
|
||||
screenshot TEXT DEFAULT "",
|
||||
response_headers TEXT DEFAULT "{}" -- New column added
|
||||
)
|
||||
''')
|
||||
|
||||
@@ -105,26 +107,51 @@ class AsyncDatabaseManager:
|
||||
|
||||
column_names = await self.execute_with_retry(_check_columns)
|
||||
|
||||
for column in ['media', 'links', 'metadata', 'screenshot']:
|
||||
# List of new columns to add
|
||||
new_columns = ['media', 'links', 'metadata', 'screenshot', 'response_headers']
|
||||
|
||||
for column in new_columns:
|
||||
if column not in column_names:
|
||||
await self.aalter_db_add_column(column)
|
||||
|
||||
async def aalter_db_add_column(self, new_column: str):
|
||||
"""Add new column to the database"""
|
||||
async def _alter(db):
|
||||
await db.execute(f'ALTER TABLE crawled_data ADD COLUMN {new_column} TEXT DEFAULT ""')
|
||||
if new_column == 'response_headers':
|
||||
await db.execute(f'ALTER TABLE crawled_data ADD COLUMN {new_column} TEXT DEFAULT "{{}}"')
|
||||
else:
|
||||
await db.execute(f'ALTER TABLE crawled_data ADD COLUMN {new_column} TEXT DEFAULT ""')
|
||||
logger.info(f"Added column '{new_column}' to the database.")
|
||||
|
||||
await self.execute_with_retry(_alter)
|
||||
|
||||
async def aget_cached_url(self, url: str) -> Optional[Tuple[str, str, str, str, str, str, str, bool, str]]:
|
||||
async def aget_cached_url(self, url: str) -> Optional[Tuple[str, str, str, str, str, bool, str, str, str, str]]:
|
||||
"""Retrieve cached URL data"""
|
||||
async def _get(db):
|
||||
async with db.execute(
|
||||
'SELECT url, html, cleaned_html, markdown, extracted_content, success, media, links, metadata, screenshot FROM crawled_data WHERE url = ?',
|
||||
'''
|
||||
SELECT url, html, cleaned_html, markdown, extracted_content, success, media, links, metadata, screenshot, response_headers
|
||||
FROM crawled_data WHERE url = ?
|
||||
''',
|
||||
(url,)
|
||||
) as cursor:
|
||||
return await cursor.fetchone()
|
||||
row = await cursor.fetchone()
|
||||
if row:
|
||||
# Deserialize JSON fields
|
||||
return (
|
||||
row[0], # url
|
||||
row[1], # html
|
||||
row[2], # cleaned_html
|
||||
row[3], # markdown
|
||||
row[4], # extracted_content
|
||||
row[5], # success
|
||||
json.loads(row[6] or '{}'), # media
|
||||
json.loads(row[7] or '{}'), # links
|
||||
json.loads(row[8] or '{}'), # metadata
|
||||
row[9], # screenshot
|
||||
json.loads(row[10] or '{}') # response_headers
|
||||
)
|
||||
return None
|
||||
|
||||
try:
|
||||
return await self.execute_with_retry(_get)
|
||||
@@ -132,12 +159,27 @@ class AsyncDatabaseManager:
|
||||
logger.error(f"Error retrieving cached URL: {e}")
|
||||
return None
|
||||
|
||||
async def acache_url(self, url: str, html: str, cleaned_html: str, markdown: str, extracted_content: str, success: bool, media: str = "{}", links: str = "{}", metadata: str = "{}", screenshot: str = ""):
|
||||
async def acache_url(
|
||||
self,
|
||||
url: str,
|
||||
html: str,
|
||||
cleaned_html: str,
|
||||
markdown: str,
|
||||
extracted_content: str,
|
||||
success: bool,
|
||||
media: str = "{}",
|
||||
links: str = "{}",
|
||||
metadata: str = "{}",
|
||||
screenshot: str = "",
|
||||
response_headers: str = "{}" # New parameter added
|
||||
):
|
||||
"""Cache URL data with retry logic"""
|
||||
async def _cache(db):
|
||||
await db.execute('''
|
||||
INSERT INTO crawled_data (url, html, cleaned_html, markdown, extracted_content, success, media, links, metadata, screenshot)
|
||||
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|
||||
INSERT INTO crawled_data (
|
||||
url, html, cleaned_html, markdown, extracted_content, success, media, links, metadata, screenshot, response_headers
|
||||
)
|
||||
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|
||||
ON CONFLICT(url) DO UPDATE SET
|
||||
html = excluded.html,
|
||||
cleaned_html = excluded.cleaned_html,
|
||||
@@ -147,8 +189,9 @@ class AsyncDatabaseManager:
|
||||
media = excluded.media,
|
||||
links = excluded.links,
|
||||
metadata = excluded.metadata,
|
||||
screenshot = excluded.screenshot
|
||||
''', (url, html, cleaned_html, markdown, extracted_content, success, media, links, metadata, screenshot))
|
||||
screenshot = excluded.screenshot,
|
||||
response_headers = excluded.response_headers -- Update response_headers
|
||||
''', (url, html, cleaned_html, markdown, extracted_content, success, media, links, metadata, screenshot, response_headers))
|
||||
|
||||
try:
|
||||
await self.execute_with_retry(_cache)
|
||||
@@ -189,4 +232,4 @@ class AsyncDatabaseManager:
|
||||
logger.error(f"Error flushing database: {e}")
|
||||
|
||||
# Create a singleton instance
|
||||
async_db_manager = AsyncDatabaseManager()
|
||||
async_db_manager = AsyncDatabaseManager()
|
||||
|
||||
741
crawl4ai/async_executor.py
Normal file
741
crawl4ai/async_executor.py
Normal file
@@ -0,0 +1,741 @@
|
||||
from __future__ import annotations
|
||||
import asyncio
|
||||
import psutil
|
||||
import logging
|
||||
import time
|
||||
import sqlite3
|
||||
import aiosqlite
|
||||
from abc import ABC, abstractmethod
|
||||
from dataclasses import dataclass, field
|
||||
from enum import Enum
|
||||
from typing import Dict, List, Optional, Callable, Any, Set, Type
|
||||
from typing import Awaitable
|
||||
from pathlib import Path
|
||||
import json
|
||||
from datetime import datetime
|
||||
from typing import ClassVar, Type, Union
|
||||
import inspect
|
||||
|
||||
# Imports from your crawler package
|
||||
from .async_crawler_strategy import AsyncCrawlerStrategy, AsyncPlaywrightCrawlerStrategy
|
||||
from .chunking_strategy import ChunkingStrategy, RegexChunking
|
||||
from .extraction_strategy import ExtractionStrategy
|
||||
from .models import CrawlResult
|
||||
from .config import MIN_WORD_THRESHOLD
|
||||
from .async_webcrawler import AsyncWebCrawler
|
||||
from .config import MAX_METRICS_HISTORY
|
||||
|
||||
import logging
|
||||
|
||||
# Configure logging
|
||||
logging.basicConfig(
|
||||
level=logging.INFO,
|
||||
format='%(asctime)s [%(levelname)s] %(name)s - %(message)s',
|
||||
datefmt='%Y-%m-%d %H:%M:%S'
|
||||
)
|
||||
# self.logger.error(f"Executor {self.__class__.__name__}: Error message", exc_info=True)
|
||||
# self.logger.info(f"Executor {self.__class__.__name__}: Info message")
|
||||
# self.logger.warning(f"Executor {self.__class__.__name__}: Warning message")
|
||||
|
||||
|
||||
# Enums and Constants
|
||||
class ExecutionMode(Enum):
|
||||
"""Execution mode for the crawler executor."""
|
||||
SPEED = "speed"
|
||||
RESOURCE = "resource"
|
||||
|
||||
class TaskState(Enum):
|
||||
"""Possible states for a crawling task."""
|
||||
PENDING = "pending"
|
||||
SCHEDULED = "scheduled"
|
||||
RUNNING = "running"
|
||||
COMPLETED = "completed"
|
||||
FAILED = "failed"
|
||||
RETRYING = "retrying"
|
||||
|
||||
# Types of callbacks we should support
|
||||
class CallbackType(Enum):
|
||||
PRE_EXECUTION = "pre_execution" # Before processing a URL
|
||||
POST_EXECUTION = "post_execution" # After successful processing
|
||||
ON_ERROR = "on_error" # When an error occurs
|
||||
ON_RETRY = "on_retry" # Before retrying a failed URL
|
||||
ON_BATCH_START = "on_batch_start" # Before starting a batch
|
||||
ON_BATCH_END = "on_batch_end" # After completing a batch
|
||||
ON_COMPLETE = "on_complete" # After all URLs are processed
|
||||
|
||||
|
||||
@dataclass
|
||||
class SystemMetrics:
|
||||
"""System resource metrics."""
|
||||
cpu_percent: float
|
||||
memory_percent: float
|
||||
available_memory: int
|
||||
timestamp: float
|
||||
|
||||
@classmethod
|
||||
def capture(cls) -> 'SystemMetrics':
|
||||
"""Capture current system metrics."""
|
||||
return cls(
|
||||
cpu_percent=psutil.cpu_percent(),
|
||||
memory_percent=psutil.virtual_memory().percent,
|
||||
available_memory=psutil.virtual_memory().available,
|
||||
timestamp=time.time()
|
||||
)
|
||||
|
||||
@dataclass
|
||||
class TaskMetadata:
|
||||
"""Metadata for a crawling task."""
|
||||
url: str
|
||||
state: TaskState
|
||||
attempts: int = 0
|
||||
last_attempt: Optional[float] = None
|
||||
error: Optional[str] = None
|
||||
result: Optional[Any] = None
|
||||
|
||||
@dataclass
|
||||
class ExecutorMetrics:
|
||||
"""Performance and resource metrics for the executor."""
|
||||
# Performance metrics
|
||||
total_urls: int = 0
|
||||
completed_urls: int = 0
|
||||
failed_urls: int = 0
|
||||
start_time: Optional[float] = None
|
||||
total_retries: int = 0
|
||||
response_times: List[float] = field(default_factory=list)
|
||||
|
||||
# Resource metrics
|
||||
system_metrics: List[SystemMetrics] = field(default_factory=list)
|
||||
active_connections: int = 0
|
||||
|
||||
def capture_system_metrics(self):
|
||||
"""Capture system metrics and enforce history size limit."""
|
||||
metrics = SystemMetrics.capture()
|
||||
self.system_metrics.append(metrics)
|
||||
if len(self.system_metrics) > MAX_METRICS_HISTORY:
|
||||
self.system_metrics.pop(0) # Remove the oldest metric
|
||||
|
||||
@property
|
||||
def urls_per_second(self) -> float:
|
||||
"""Calculate URLs processed per second."""
|
||||
if not self.start_time or not self.completed_urls:
|
||||
return 0.0
|
||||
duration = time.time() - self.start_time
|
||||
return self.completed_urls / duration if duration > 0 else 0
|
||||
|
||||
@property
|
||||
def success_rate(self) -> float:
|
||||
"""Calculate success rate as percentage."""
|
||||
if not self.total_urls:
|
||||
return 0.0
|
||||
return (self.completed_urls / self.total_urls) * 100
|
||||
|
||||
@property
|
||||
def retry_rate(self) -> float:
|
||||
"""Calculate retry rate as percentage."""
|
||||
if not self.total_urls:
|
||||
return 0.0
|
||||
return (self.total_retries / self.total_urls) * 100
|
||||
|
||||
@property
|
||||
def average_response_time(self) -> float:
|
||||
"""Calculate average response time in seconds."""
|
||||
if not self.response_times:
|
||||
return 0.0
|
||||
return sum(self.response_times) / len(self.response_times)
|
||||
|
||||
def to_dict(self) -> Dict[str, Any]:
|
||||
"""Convert metrics to dictionary format."""
|
||||
return {
|
||||
"performance": {
|
||||
"urls_per_second": self.urls_per_second,
|
||||
"success_rate": self.success_rate,
|
||||
"retry_rate": self.retry_rate,
|
||||
"average_response_time": self.average_response_time,
|
||||
"total_urls": self.total_urls,
|
||||
"completed_urls": self.completed_urls,
|
||||
"failed_urls": self.failed_urls
|
||||
},
|
||||
"resources": {
|
||||
"cpu_utilization": self.system_metrics[-1].cpu_percent if self.system_metrics else 0,
|
||||
"memory_usage": self.system_metrics[-1].memory_percent if self.system_metrics else 0,
|
||||
"active_connections": self.active_connections
|
||||
}
|
||||
}
|
||||
|
||||
class ResourceMonitor:
|
||||
"""Monitors and manages system resources."""
|
||||
|
||||
def __init__(self, mode: ExecutionMode):
|
||||
self.mode = mode
|
||||
self.metrics_history: List[SystemMetrics] = []
|
||||
self._setup_thresholds()
|
||||
|
||||
def _setup_thresholds(self):
|
||||
"""Set up resource thresholds based on execution mode."""
|
||||
if self.mode == ExecutionMode.SPEED:
|
||||
self.memory_threshold = 80 # 80% memory usage limit
|
||||
self.cpu_threshold = 90 # 90% CPU usage limit
|
||||
else:
|
||||
self.memory_threshold = 40 # 40% memory usage limit
|
||||
self.cpu_threshold = 30 # 30% CPU usage limit
|
||||
|
||||
async def check_resources(self) -> bool:
|
||||
"""Check if system resources are within acceptable limits."""
|
||||
metrics = SystemMetrics.capture()
|
||||
self.metrics_history.append(metrics)
|
||||
|
||||
# Keep only last hour of metrics
|
||||
cutoff_time = time.time() - 3600
|
||||
self.metrics_history = [m for m in self.metrics_history if m.timestamp > cutoff_time]
|
||||
|
||||
return (metrics.cpu_percent < self.cpu_threshold and
|
||||
metrics.memory_percent < self.memory_threshold)
|
||||
|
||||
def get_optimal_batch_size(self, total_urls: int) -> int:
|
||||
metrics = SystemMetrics.capture()
|
||||
if self.mode == ExecutionMode.SPEED:
|
||||
base_size = min(1000, total_urls)
|
||||
|
||||
# Adjust based on resource usage
|
||||
cpu_factor = max(0.0, (self.cpu_threshold - metrics.cpu_percent) / self.cpu_threshold)
|
||||
mem_factor = max(0.0, (self.memory_threshold - metrics.memory_percent) / self.memory_threshold)
|
||||
|
||||
min_factor = min(cpu_factor, mem_factor)
|
||||
adjusted_size = max(1, int(base_size * min_factor))
|
||||
return min(total_urls, adjusted_size)
|
||||
else:
|
||||
# For resource optimization, use a conservative batch size based on resource usage
|
||||
cpu_factor = max(0.1, (self.cpu_threshold - metrics.cpu_percent) / self.cpu_threshold)
|
||||
mem_factor = max(0.1, (self.memory_threshold - metrics.memory_percent) / self.memory_threshold)
|
||||
|
||||
min_factor = min(cpu_factor, mem_factor)
|
||||
adjusted_size = max(1, int(50 * min_factor))
|
||||
return min(total_urls, adjusted_size)
|
||||
|
||||
class ExecutorControl:
|
||||
"""Control interface for the executor."""
|
||||
|
||||
def __init__(self):
|
||||
self._paused = False
|
||||
self._cancelled = False
|
||||
self._pause_event = asyncio.Event()
|
||||
self._pause_event.set() # Not paused initially
|
||||
self._lock = asyncio.Lock() # Lock to protect shared state
|
||||
self.logger = logging.getLogger(f"{__name__}.{self.__class__.__name__}")
|
||||
|
||||
async def pause(self):
|
||||
"""Pause the execution."""
|
||||
async with self._lock:
|
||||
self._paused = True
|
||||
self._pause_event.clear()
|
||||
|
||||
async def resume(self):
|
||||
"""Resume the execution."""
|
||||
async with self._lock:
|
||||
self._paused = False
|
||||
self._pause_event.set()
|
||||
|
||||
async def cancel(self):
|
||||
"""Cancel all pending operations."""
|
||||
async with self._lock:
|
||||
self._cancelled = True
|
||||
self._pause_event.set() # Release any paused operations
|
||||
|
||||
async def is_paused(self) -> bool:
|
||||
"""Check if execution is paused."""
|
||||
async with self._lock:
|
||||
return self._paused
|
||||
|
||||
async def is_cancelled(self) -> bool:
|
||||
"""Check if execution is cancelled."""
|
||||
async with self._lock:
|
||||
return self._cancelled
|
||||
|
||||
async def wait_if_paused(self, timeout: Optional[float] = None):
|
||||
"""Wait if execution is paused, with an optional timeout."""
|
||||
try:
|
||||
await asyncio.wait_for(self._pause_event.wait(), timeout=timeout)
|
||||
except asyncio.TimeoutError:
|
||||
# Timeout occurred, handle as needed
|
||||
async with self._lock:
|
||||
self._paused = False # Optionally reset the paused state
|
||||
self._pause_event.set()
|
||||
# Optionally log a warning
|
||||
self.logger.warning(f"ExecutorControl: wait_if_paused() timed out after {timeout} seconds. Proceeding with execution.")
|
||||
|
||||
async def reset(self):
|
||||
"""Reset control state."""
|
||||
async with self._lock:
|
||||
self._paused = False
|
||||
self._cancelled = False
|
||||
self._pause_event.set()
|
||||
|
||||
|
||||
class ExecutorStrategy(ABC):
|
||||
"""Abstract Base class for executor strategies.
|
||||
|
||||
Callbacks:
|
||||
- PRE_EXECUTION: Callable[[str, Dict[str, Any]], None]
|
||||
- POST_EXECUTION: Callable[[str, Any, Dict[str, Any]], None]
|
||||
- ON_ERROR: Callable[[str, Exception, Dict[str, Any]], None]
|
||||
- ON_RETRY: Callable[[str, int, Dict[str, Any]], None]
|
||||
- ON_BATCH_START: Callable[[List[str], Dict[str, Any]], None]
|
||||
- ON_BATCH_END: Callable[[List[str], Dict[str, Any]], None]
|
||||
- ON_COMPLETE: Callable[[Dict[str, Any], Dict[str, Any]], None]
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
crawler: AsyncWebCrawler,
|
||||
mode: ExecutionMode,
|
||||
# callbacks: Optional[Dict[CallbackType, Callable]] = None,
|
||||
callbacks: Optional[Dict[CallbackType, Callable[[Any], Union[Awaitable[None], None]]]] = None,
|
||||
persistence_path: Optional[Path] = None,
|
||||
**crawl_config_kwargs
|
||||
):
|
||||
self.crawler = crawler
|
||||
self.mode = mode
|
||||
self.callbacks = callbacks or {}
|
||||
self.resource_monitor = ResourceMonitor(mode)
|
||||
self.tasks: Dict[str, TaskMetadata] = {}
|
||||
self.active_tasks: Set[str] = set()
|
||||
self.logger = logging.getLogger(f"{__name__}.{self.__class__.__name__}")
|
||||
self.metrics = ExecutorMetrics()
|
||||
self.control = ExecutorControl()
|
||||
self.crawl_config_kwargs = crawl_config_kwargs # Store parameters for arun
|
||||
|
||||
async def get_status(self) -> Dict[str, Any]:
|
||||
"""Get current executor status and metrics."""
|
||||
return {
|
||||
"status": {
|
||||
"paused": await self.control.is_paused(),
|
||||
"cancelled": await self.control.is_cancelled(),
|
||||
"active_tasks": len(self.active_tasks)
|
||||
},
|
||||
"metrics": self.metrics.to_dict()
|
||||
}
|
||||
|
||||
async def clear_state(self):
|
||||
"""Reset executor state."""
|
||||
self.tasks.clear()
|
||||
self.active_tasks.clear()
|
||||
self.metrics = ExecutorMetrics()
|
||||
await self.control.reset()
|
||||
await self.persistence.clear() # Implement this method
|
||||
|
||||
async def _execute_callback(
|
||||
self,
|
||||
callback_type: CallbackType,
|
||||
*args,
|
||||
**kwargs
|
||||
):
|
||||
"""Execute callback if it exists."""
|
||||
if callback := self.callbacks.get(callback_type):
|
||||
try:
|
||||
if inspect.iscoroutinefunction(callback):
|
||||
await callback(*args, **kwargs)
|
||||
else:
|
||||
callback(*args, **kwargs)
|
||||
except Exception as e:
|
||||
# self.logger.error(f"Callback {callback_type} failed: {e}")
|
||||
self.logger.error(f"Executor {self.__class__.__name__}: Callback {callback_type.value} failed: {e}", exc_info=True)
|
||||
|
||||
async def _process_url(self, url: str) -> CrawlResult:
|
||||
max_retries = self.crawl_config_kwargs.get('max_retries', 3)
|
||||
backoff_factor = self.crawl_config_kwargs.get('backoff_factor', 1)
|
||||
attempts = 0
|
||||
|
||||
while attempts <= max_retries:
|
||||
# Invoke PRE_EXECUTION callback
|
||||
await self._execute_callback(CallbackType.PRE_EXECUTION, url, self.metrics.to_dict())
|
||||
|
||||
"""Process a single URL using the crawler."""
|
||||
# Wait if execution is paused
|
||||
await self.control.wait_if_paused(timeout=300)
|
||||
|
||||
# Check if cancelled
|
||||
if await self.control.is_cancelled():
|
||||
raise asyncio.CancelledError("Execution was cancelled")
|
||||
|
||||
start_time = time.time()
|
||||
self.metrics.active_connections += 1
|
||||
|
||||
try:
|
||||
result = await self.crawler.arun(url, **self.crawl_config_kwargs)
|
||||
self.metrics.completed_urls += 1
|
||||
self.metrics.response_times.append(time.time() - start_time)
|
||||
# Invoke POST_EXECUTION callback
|
||||
await self._execute_callback(CallbackType.POST_EXECUTION, url, result, self.metrics.to_dict())
|
||||
|
||||
return result
|
||||
|
||||
except Exception as e:
|
||||
attempts += 1
|
||||
self.metrics.failed_urls += 1
|
||||
# self.logger.error(f"Error processing URL {url}: {e}")
|
||||
self.logger.error(f"Executor {self.__class__.__name__}: Error processing URL {url}: {e}", exc_info=True)
|
||||
# Invoke ON_ERROR callback
|
||||
await self._execute_callback(CallbackType.ON_ERROR, url, e, self.metrics.to_dict())
|
||||
|
||||
if attempts <= max_retries:
|
||||
# Invoke ON_RETRY callback
|
||||
await self._execute_callback(CallbackType.ON_RETRY, url, attempts, self.metrics.to_dict())
|
||||
# Wait before retrying
|
||||
await asyncio.sleep(backoff_factor * attempts)
|
||||
else:
|
||||
raise e
|
||||
|
||||
finally:
|
||||
self.metrics.active_connections -= 1
|
||||
# Update system metrics
|
||||
# INFO: Uncomment this line if you want to capture system metrics after each URL, but it causes a performance hit
|
||||
# self.metrics.system_metrics.append(SystemMetrics.capture())
|
||||
# Exit the loop if successful or retries exceeded
|
||||
if attempts > max_retries:
|
||||
break
|
||||
|
||||
async def execute(self, urls: List[str]) -> Dict[str, Any]:
|
||||
"""Execute crawling tasks."""
|
||||
# Initialize metrics
|
||||
self.metrics.total_urls = len(urls)
|
||||
self.metrics.start_time = time.time()
|
||||
|
||||
# Create context with metrics (used for callbacks)
|
||||
context = {
|
||||
"mode": self.mode,
|
||||
"start_time": self.metrics.start_time,
|
||||
"total_urls": self.metrics.total_urls
|
||||
}
|
||||
|
||||
# Invoke ON_BATCH_START callback
|
||||
await self._execute_callback(CallbackType.ON_BATCH_START, urls, context)
|
||||
|
||||
results = {}
|
||||
batch_errors = []
|
||||
|
||||
# Use the crawler within an async context manager
|
||||
async with self.crawler:
|
||||
# Check for cancellation before starting
|
||||
if await self.control.is_cancelled():
|
||||
raise asyncio.CancelledError("Execution was cancelled")
|
||||
|
||||
# Wait if paused
|
||||
await self.control.wait_if_paused(timeout=300)
|
||||
|
||||
# Prepare list of batches
|
||||
batches = []
|
||||
total_urls_remaining = len(urls)
|
||||
index = 0
|
||||
|
||||
while index < len(urls):
|
||||
batch_size = self.resource_monitor.get_optimal_batch_size(total_urls_remaining)
|
||||
batch_urls = urls[index:index + batch_size]
|
||||
batches.append(batch_urls)
|
||||
index += batch_size
|
||||
total_urls_remaining -= batch_size
|
||||
|
||||
# Process each batch
|
||||
for batch_urls in batches:
|
||||
# Check for cancellation
|
||||
if await self.control.is_cancelled():
|
||||
raise asyncio.CancelledError("Execution was cancelled")
|
||||
|
||||
# Wait if paused
|
||||
await self.control.wait_if_paused(timeout=300)
|
||||
|
||||
try:
|
||||
# Process the batch
|
||||
batch_results = await self.process_batch(batch_urls)
|
||||
# Update results
|
||||
results.update(batch_results)
|
||||
# Capture system metrics after each batch
|
||||
self.metrics.capture_system_metrics()
|
||||
# Update system metrics after each batch
|
||||
# self.metrics.system_metrics.append(SystemMetrics.capture()) # Has memory leak issue
|
||||
# Invoke ON_BATCH_END callback
|
||||
await self._execute_callback(CallbackType.ON_BATCH_END, batch_urls, context)
|
||||
except Exception as e:
|
||||
# Handle batch-level exceptions
|
||||
self.logger.error(f"Error processing batch: {e}")
|
||||
await self._execute_callback(CallbackType.ON_ERROR, "batch", e, context)
|
||||
# Collect the error
|
||||
batch_errors.append((batch_urls, e))
|
||||
# Continue to next batch instead of raising
|
||||
continue
|
||||
|
||||
# Execution complete
|
||||
await self._execute_callback(CallbackType.ON_COMPLETE, results, context)
|
||||
|
||||
# Log final metrics and batch errors if any
|
||||
final_status = await self.get_status()
|
||||
# self.logger.info(f"Execution completed. Metrics: {final_status}")
|
||||
self.logger.info(f"Executor {self.__class__.__name__}: Execution completed. Metrics: {final_status}")
|
||||
|
||||
if batch_errors:
|
||||
# self.logger.warning(f"Execution completed with errors in {len(batch_errors)} batches.")
|
||||
self.logger.warning(f"Executor {self.__class__.__name__}: Execution completed with errors in {len(batch_errors)} batches.")
|
||||
|
||||
return results
|
||||
|
||||
|
||||
@abstractmethod
|
||||
async def process_batch(self, batch_urls: List[str]) -> Dict[str, Any]:
|
||||
"""Process a batch of URLs."""
|
||||
pass
|
||||
|
||||
class SpeedOptimizedExecutor(ExecutorStrategy):
|
||||
"""Executor optimized for speed."""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
crawler: AsyncWebCrawler,
|
||||
callbacks: Optional[Dict[CallbackType, Callable]] = None,
|
||||
persistence_path: Optional[Path] = None,
|
||||
word_count_threshold=MIN_WORD_THRESHOLD,
|
||||
extraction_strategy: ExtractionStrategy = None,
|
||||
chunking_strategy: ChunkingStrategy = None,
|
||||
bypass_cache: bool = False,
|
||||
css_selector: str = None,
|
||||
screenshot: bool = False,
|
||||
user_agent: str = None,
|
||||
verbose=True,
|
||||
connection_pool_size: int = 1000,
|
||||
dns_cache_size: int = 10000,
|
||||
backoff_factor: int = 1,
|
||||
**kwargs
|
||||
):
|
||||
if chunking_strategy is None:
|
||||
chunking_strategy = RegexChunking()
|
||||
|
||||
super().__init__(
|
||||
crawler=crawler,
|
||||
mode=ExecutionMode.SPEED,
|
||||
callbacks=callbacks,
|
||||
persistence_path=persistence_path,
|
||||
word_count_threshold=word_count_threshold,
|
||||
extraction_strategy=extraction_strategy,
|
||||
chunking_strategy=chunking_strategy,
|
||||
bypass_cache=bypass_cache,
|
||||
css_selector=css_selector,
|
||||
screenshot=screenshot,
|
||||
user_agent=user_agent,
|
||||
verbose=verbose,
|
||||
**kwargs
|
||||
)
|
||||
|
||||
self.connection_pool_size = connection_pool_size
|
||||
self.dns_cache_size = dns_cache_size
|
||||
self.backoff_factor = backoff_factor
|
||||
|
||||
self.logger.info(
|
||||
# "Initialized speed-optimized executor with:"
|
||||
f"Executor {self.__class__.__name__}: Initialized with:"
|
||||
f" connection_pool_size={self.connection_pool_size},"
|
||||
f" dns_cache_size={self.dns_cache_size}"
|
||||
)
|
||||
|
||||
async def process_batch(self, batch_urls: List[str]) -> Dict[str, Any]:
|
||||
"""Process a batch of URLs concurrently."""
|
||||
batch_tasks = [self._process_url(url) for url in batch_urls]
|
||||
|
||||
# Execute batch with concurrency control
|
||||
batch_results_list = await asyncio.gather(*batch_tasks, return_exceptions=True)
|
||||
|
||||
batch_results = {}
|
||||
for url, result in zip(batch_urls, batch_results_list):
|
||||
if isinstance(result, Exception):
|
||||
batch_results[url] = {"success": False, "error": str(result)}
|
||||
else:
|
||||
batch_results[url] = {"success": True, "result": result}
|
||||
|
||||
return batch_results
|
||||
|
||||
class ResourceOptimizedExecutor(ExecutorStrategy):
|
||||
"""Executor optimized for resource usage."""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
crawler: AsyncWebCrawler,
|
||||
callbacks: Optional[Dict[CallbackType, Callable]] = None,
|
||||
persistence_path: Optional[Path] = None,
|
||||
word_count_threshold=MIN_WORD_THRESHOLD,
|
||||
extraction_strategy: ExtractionStrategy = None,
|
||||
chunking_strategy: ChunkingStrategy = None,
|
||||
bypass_cache: bool = False,
|
||||
css_selector: str = None,
|
||||
screenshot: bool = False,
|
||||
user_agent: str = None,
|
||||
verbose=True,
|
||||
connection_pool_size: int = 50,
|
||||
dns_cache_size: int = 1000,
|
||||
backoff_factor: int = 5,
|
||||
max_concurrent_tasks: int = 5,
|
||||
**kwargs
|
||||
):
|
||||
if chunking_strategy is None:
|
||||
chunking_strategy = RegexChunking()
|
||||
|
||||
super().__init__(
|
||||
crawler=crawler,
|
||||
mode=ExecutionMode.RESOURCE,
|
||||
callbacks=callbacks,
|
||||
persistence_path=persistence_path,
|
||||
word_count_threshold=word_count_threshold,
|
||||
extraction_strategy=extraction_strategy,
|
||||
chunking_strategy=chunking_strategy,
|
||||
bypass_cache=bypass_cache,
|
||||
css_selector=css_selector,
|
||||
screenshot=screenshot,
|
||||
user_agent=user_agent,
|
||||
verbose=verbose,
|
||||
**kwargs
|
||||
)
|
||||
|
||||
self.connection_pool_size = connection_pool_size
|
||||
self.dns_cache_size = dns_cache_size
|
||||
self.backoff_factor = backoff_factor
|
||||
self.max_concurrent_tasks = max_concurrent_tasks
|
||||
|
||||
self.logger.info(
|
||||
# "Initialized resource-optimized executor with:"
|
||||
f"Executor {self.__class__.__name__}: Initialized with:"
|
||||
f" connection_pool_size={self.connection_pool_size},"
|
||||
f" dns_cache_size={self.dns_cache_size},"
|
||||
f" max_concurrent_tasks={self.max_concurrent_tasks}"
|
||||
)
|
||||
|
||||
async def process_batch(self, batch_urls: List[str]) -> Dict[str, Any]:
|
||||
"""Process a batch of URLs with resource optimization."""
|
||||
batch_results = {}
|
||||
semaphore = asyncio.Semaphore(self.max_concurrent_tasks)
|
||||
|
||||
# Wait until resources are available before processing batch
|
||||
while not await self.resource_monitor.check_resources():
|
||||
# self.logger.warning("Resource limits reached, waiting...")
|
||||
self.logger.warning(f"Executor {self.__class__.__name__}: Resource limits reached, waiting...")
|
||||
await asyncio.sleep(self.backoff_factor)
|
||||
# Check for cancellation
|
||||
if await self.control.is_cancelled():
|
||||
raise asyncio.CancelledError("Execution was cancelled")
|
||||
|
||||
async def process_url_with_semaphore(url):
|
||||
async with semaphore:
|
||||
# Check for cancellation
|
||||
if await self.control.is_cancelled():
|
||||
raise asyncio.CancelledError("Execution was cancelled")
|
||||
# Wait if paused
|
||||
await self.control.wait_if_paused(timeout=300)
|
||||
|
||||
try:
|
||||
result = await self._process_url(url)
|
||||
batch_results[url] = {"success": True, "result": result}
|
||||
except Exception as e:
|
||||
batch_results[url] = {"success": False, "error": str(e)}
|
||||
finally:
|
||||
# Update system metrics after each URL
|
||||
# INFO: Uncomment this line if you want to capture system metrics after each URL, but it causes a performance hit
|
||||
# self.metrics.system_metrics.append(SystemMetrics.capture())
|
||||
# Controlled delay between URLs
|
||||
await asyncio.sleep(0.1) # Small delay for resource management
|
||||
|
||||
tasks = [process_url_with_semaphore(url) for url in batch_urls]
|
||||
await asyncio.gather(*tasks)
|
||||
|
||||
return batch_results
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
async def main():
|
||||
# Sample callback functions
|
||||
async def pre_execution_callback(url: str, context: Dict[str, Any]):
|
||||
print(f"Pre-execution callback: About to process URL {url}")
|
||||
|
||||
async def post_execution_callback(url: str, result: Any, context: Dict[str, Any]):
|
||||
print(f"Post-execution callback: Successfully processed URL {url}")
|
||||
|
||||
async def on_error_callback(url: str, error: Exception, context: Dict[str, Any]):
|
||||
print(f"Error callback: Error processing URL {url}: {error}")
|
||||
|
||||
async def on_retry_callback(url: str, attempt: int, context: Dict[str, Any]):
|
||||
print(f"Retry callback: Retrying URL {url}, attempt {attempt}")
|
||||
|
||||
async def on_batch_start_callback(urls: List[str], context: Dict[str, Any]):
|
||||
print(f"Batch start callback: Starting batch with {len(urls)} URLs")
|
||||
|
||||
async def on_batch_end_callback(urls: List[str], context: Dict[str, Any]):
|
||||
print(f"Batch end callback: Completed batch with {len(urls)} URLs")
|
||||
|
||||
async def on_complete_callback(results: Dict[str, Any], context: Dict[str, Any]):
|
||||
print(f"Complete callback: Execution completed with {len(results)} results")
|
||||
|
||||
# Sample URLs to crawl
|
||||
urls = [
|
||||
"https://www.example.com",
|
||||
"https://www.python.org",
|
||||
"https://www.asyncio.org",
|
||||
# Add more URLs as needed
|
||||
]
|
||||
|
||||
# Instantiate the crawler
|
||||
crawler = AsyncWebCrawler()
|
||||
|
||||
# Set up callbacks
|
||||
callbacks = {
|
||||
CallbackType.PRE_EXECUTION: pre_execution_callback,
|
||||
CallbackType.POST_EXECUTION: post_execution_callback,
|
||||
CallbackType.ON_ERROR: on_error_callback,
|
||||
CallbackType.ON_RETRY: on_retry_callback,
|
||||
CallbackType.ON_BATCH_START: on_batch_start_callback,
|
||||
CallbackType.ON_BATCH_END: on_batch_end_callback,
|
||||
CallbackType.ON_COMPLETE: on_complete_callback,
|
||||
}
|
||||
|
||||
# Instantiate the executors
|
||||
speed_executor = SpeedOptimizedExecutor(
|
||||
crawler=crawler,
|
||||
callbacks=callbacks,
|
||||
max_retries=2, # Example additional config
|
||||
)
|
||||
|
||||
resource_executor = ResourceOptimizedExecutor(
|
||||
crawler=crawler,
|
||||
callbacks=callbacks,
|
||||
max_concurrent_tasks=3, # Limit concurrency
|
||||
max_retries=2, # Example additional config
|
||||
)
|
||||
|
||||
# Choose which executor to use
|
||||
executor = speed_executor # Or resource_executor
|
||||
|
||||
# Start the execution in a background task
|
||||
execution_task = asyncio.create_task(executor.execute(urls))
|
||||
|
||||
# Simulate control operations
|
||||
await asyncio.sleep(2) # Let it run for a bit
|
||||
print("Pausing execution...")
|
||||
await executor.control.pause()
|
||||
await asyncio.sleep(2) # Wait while paused
|
||||
print("Resuming execution...")
|
||||
await executor.control.resume()
|
||||
|
||||
# Wait for execution to complete
|
||||
results = await execution_task
|
||||
|
||||
# Print the results
|
||||
print("Execution results:")
|
||||
for url, result in results.items():
|
||||
print(f"{url}: {result}")
|
||||
|
||||
# Get and print final metrics
|
||||
final_status = await executor.get_status()
|
||||
print("Final executor status and metrics:")
|
||||
print(final_status)
|
||||
|
||||
# Run the main function
|
||||
if __name__ == "__main__":
|
||||
asyncio.run(main())
|
||||
|
||||
if __name__ == "__main__":
|
||||
asyncio.run(main())
|
||||
@@ -9,7 +9,7 @@ from .async_database import async_db_manager
|
||||
from .chunking_strategy import *
|
||||
from .extraction_strategy import *
|
||||
from .async_crawler_strategy import AsyncCrawlerStrategy, AsyncPlaywrightCrawlerStrategy, AsyncCrawlResponse
|
||||
from .content_scrapping_strategy import WebScrappingStrategy
|
||||
from .content_scrapping_strategy import WebScrapingStrategy
|
||||
from .config import MIN_WORD_THRESHOLD, IMAGE_DESCRIPTION_MIN_WORD_THRESHOLD
|
||||
from .utils import (
|
||||
sanitize_input_encode,
|
||||
@@ -75,6 +75,19 @@ class AsyncWebCrawler:
|
||||
verbose=True,
|
||||
**kwargs,
|
||||
) -> CrawlResult:
|
||||
"""
|
||||
Runs the crawler for a single source: URL (web, local file, or raw HTML).
|
||||
|
||||
Args:
|
||||
url (str): The URL to crawl. Supported prefixes:
|
||||
- 'http://' or 'https://': Web URL to crawl.
|
||||
- 'file://': Local file path to process.
|
||||
- 'raw:': Raw HTML content to process.
|
||||
... [other existing parameters]
|
||||
|
||||
Returns:
|
||||
CrawlResult: The result of the crawling and processing.
|
||||
"""
|
||||
try:
|
||||
extraction_strategy = extraction_strategy or NoExtractionStrategy()
|
||||
extraction_strategy.verbose = verbose
|
||||
@@ -89,8 +102,13 @@ class AsyncWebCrawler:
|
||||
cached = None
|
||||
screenshot_data = None
|
||||
extracted_content = None
|
||||
if not bypass_cache and not self.always_by_pass_cache:
|
||||
|
||||
is_web_url = url.startswith(('http://', 'https://'))
|
||||
if is_web_url and not bypass_cache and not self.always_by_pass_cache:
|
||||
cached = await async_db_manager.aget_cached_url(url)
|
||||
|
||||
# if not bypass_cache and not self.always_by_pass_cache:
|
||||
# cached = await async_db_manager.aget_cached_url(url)
|
||||
|
||||
if kwargs.get("warmup", True) and not self.ready:
|
||||
return None
|
||||
@@ -117,25 +135,32 @@ class AsyncWebCrawler:
|
||||
)
|
||||
|
||||
crawl_result = await self.aprocess_html(
|
||||
url,
|
||||
html,
|
||||
extracted_content,
|
||||
word_count_threshold,
|
||||
extraction_strategy,
|
||||
chunking_strategy,
|
||||
css_selector,
|
||||
screenshot_data,
|
||||
verbose,
|
||||
bool(cached),
|
||||
url=url,
|
||||
html=html,
|
||||
extracted_content=extracted_content,
|
||||
word_count_threshold=word_count_threshold,
|
||||
extraction_strategy=extraction_strategy,
|
||||
chunking_strategy=chunking_strategy,
|
||||
css_selector=css_selector,
|
||||
screenshot=screenshot_data,
|
||||
verbose=verbose,
|
||||
is_cached=bool(cached),
|
||||
async_response=async_response,
|
||||
bypass_cache=bypass_cache,
|
||||
**kwargs,
|
||||
)
|
||||
crawl_result.status_code = async_response.status_code if async_response else 200
|
||||
crawl_result.response_headers = async_response.response_headers if async_response else {}
|
||||
|
||||
if async_response:
|
||||
crawl_result.status_code = async_response.status_code
|
||||
crawl_result.response_headers = async_response.response_headers
|
||||
else:
|
||||
crawl_result.status_code = 200
|
||||
crawl_result.response_headers = cached[10]
|
||||
|
||||
crawl_result.success = bool(html)
|
||||
crawl_result.session_id = kwargs.get("session_id", None)
|
||||
return crawl_result
|
||||
|
||||
except Exception as e:
|
||||
if not hasattr(e, "msg"):
|
||||
e.msg = str(e)
|
||||
@@ -155,22 +180,40 @@ class AsyncWebCrawler:
|
||||
verbose=True,
|
||||
**kwargs,
|
||||
) -> List[CrawlResult]:
|
||||
tasks = [
|
||||
self.arun(
|
||||
url,
|
||||
word_count_threshold,
|
||||
extraction_strategy,
|
||||
chunking_strategy,
|
||||
bypass_cache,
|
||||
css_selector,
|
||||
screenshot,
|
||||
user_agent,
|
||||
verbose,
|
||||
**kwargs
|
||||
)
|
||||
for url in urls
|
||||
]
|
||||
return await asyncio.gather(*tasks)
|
||||
"""
|
||||
Runs the crawler for multiple sources: URLs (web, local files, or raw HTML).
|
||||
|
||||
Args:
|
||||
urls (List[str]): A list of URLs with supported prefixes:
|
||||
- 'http://' or 'https://': Web URL to crawl.
|
||||
- 'file://': Local file path to process.
|
||||
- 'raw:': Raw HTML content to process.
|
||||
... [other existing parameters]
|
||||
|
||||
Returns:
|
||||
List[CrawlResult]: The results of the crawling and processing.
|
||||
"""
|
||||
semaphore_count = kwargs.get('semaphore_count', 5) # Adjust as needed
|
||||
semaphore = asyncio.Semaphore(semaphore_count)
|
||||
|
||||
async def crawl_with_semaphore(url):
|
||||
async with semaphore:
|
||||
return await self.arun(
|
||||
url,
|
||||
word_count_threshold=word_count_threshold,
|
||||
extraction_strategy=extraction_strategy,
|
||||
chunking_strategy=chunking_strategy,
|
||||
bypass_cache=bypass_cache,
|
||||
css_selector=css_selector,
|
||||
screenshot=screenshot,
|
||||
user_agent=user_agent,
|
||||
verbose=verbose,
|
||||
**kwargs,
|
||||
)
|
||||
|
||||
tasks = [crawl_with_semaphore(url) for url in urls]
|
||||
results = await asyncio.gather(*tasks, return_exceptions=True)
|
||||
return [result if not isinstance(result, Exception) else str(result) for result in results]
|
||||
|
||||
async def aprocess_html(
|
||||
self,
|
||||
@@ -184,13 +227,14 @@ class AsyncWebCrawler:
|
||||
screenshot: str,
|
||||
verbose: bool,
|
||||
is_cached: bool,
|
||||
async_response: Optional[AsyncCrawlResponse],
|
||||
**kwargs,
|
||||
) -> CrawlResult:
|
||||
t = time.time()
|
||||
# Extract content from HTML
|
||||
try:
|
||||
t1 = time.time()
|
||||
scrapping_strategy = WebScrappingStrategy()
|
||||
scrapping_strategy = WebScrapingStrategy()
|
||||
# result = await scrapping_strategy.ascrap(
|
||||
result = scrapping_strategy.scrap(
|
||||
url,
|
||||
@@ -245,6 +289,12 @@ class AsyncWebCrawler:
|
||||
)
|
||||
|
||||
screenshot = None if not screenshot else screenshot
|
||||
|
||||
response_headers = "{}" # Default value
|
||||
if async_response:
|
||||
# Serialize response_headers dict to JSON string
|
||||
response_headers = json.dumps(async_response.response_headers, ensure_ascii=False)
|
||||
|
||||
|
||||
if not is_cached or kwargs.get("bypass_cache", False) or self.always_by_pass_cache:
|
||||
await async_db_manager.acache_url(
|
||||
@@ -258,6 +308,7 @@ class AsyncWebCrawler:
|
||||
json.dumps(links),
|
||||
json.dumps(metadata),
|
||||
screenshot=screenshot,
|
||||
response_headers=response_headers,
|
||||
)
|
||||
|
||||
return CrawlResult(
|
||||
|
||||
@@ -51,3 +51,5 @@ SOCIAL_MEDIA_DOMAINS = [
|
||||
# If image format is in jpg, png or webp
|
||||
# If image is in the first half of the total images extracted from the page
|
||||
IMAGE_SCORE_THRESHOLD = 2
|
||||
|
||||
MAX_METRICS_HISTORY = 1000
|
||||
@@ -15,7 +15,7 @@ class ContentCleaningStrategy:
|
||||
self.link_density_threshold = 0.2
|
||||
self.max_dom_depth = 10 # To prevent excessive DOM traversal
|
||||
|
||||
def clean(self, clean_html: str) -> str:
|
||||
def clean(self, clean_html: str, soup = None) -> str:
|
||||
"""
|
||||
Main function that takes cleaned HTML and returns super cleaned HTML.
|
||||
|
||||
@@ -28,18 +28,20 @@ class ContentCleaningStrategy:
|
||||
try:
|
||||
if not clean_html or not isinstance(clean_html, str):
|
||||
return ''
|
||||
soup = BeautifulSoup(clean_html, 'html.parser')
|
||||
if not soup:
|
||||
# soup = BeautifulSoup(clean_html, 'html.parser')
|
||||
soup = BeautifulSoup(clean_html, 'lxml')
|
||||
main_content = self.extract_main_content(soup)
|
||||
if main_content:
|
||||
super_clean_element = self.clean_element(main_content)
|
||||
return str(super_clean_element)
|
||||
return super_clean_element.encode_contents().decode('utf-8')
|
||||
else:
|
||||
return ''
|
||||
except Exception:
|
||||
# Handle exceptions silently or log them as needed
|
||||
return ''
|
||||
|
||||
def extract_main_content(self, soup: BeautifulSoup) -> Optional[Tag]:
|
||||
def extract_main_content(self, soup) -> Optional[Tag]:
|
||||
"""
|
||||
Identifies and extracts the main content element from the HTML.
|
||||
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
import re # Point 1: Pre-Compile Regular Expressions
|
||||
from abc import ABC, abstractmethod
|
||||
from typing import Dict, Any
|
||||
from bs4 import BeautifulSoup
|
||||
@@ -105,7 +106,39 @@ class CustomHTML2Text(HTML2Text):
|
||||
return
|
||||
super().handle_data(data, entity_char)
|
||||
|
||||
class ContentScrappingStrategy(ABC):
|
||||
# Pre-compile regular expressions for Open Graph and Twitter metadata
|
||||
OG_REGEX = re.compile(r'^og:')
|
||||
TWITTER_REGEX = re.compile(r'^twitter:')
|
||||
DIMENSION_REGEX = re.compile(r"(\d+)(\D*)")
|
||||
|
||||
# Function to parse image height/width value and units
|
||||
def parse_dimension(dimension):
|
||||
if dimension:
|
||||
# match = re.match(r"(\d+)(\D*)", dimension)
|
||||
match = DIMENSION_REGEX.match(dimension)
|
||||
if match:
|
||||
number = int(match.group(1))
|
||||
unit = match.group(2) or 'px' # Default unit is 'px' if not specified
|
||||
return number, unit
|
||||
return None, None
|
||||
|
||||
# Fetch image file metadata to extract size and extension
|
||||
def fetch_image_file_size(img, base_url):
|
||||
#If src is relative path construct full URL, if not it may be CDN URL
|
||||
img_url = urljoin(base_url,img.get('src'))
|
||||
try:
|
||||
response = requests.head(img_url)
|
||||
if response.status_code == 200:
|
||||
return response.headers.get('Content-Length',None)
|
||||
else:
|
||||
print(f"Failed to retrieve file size for {img_url}")
|
||||
return None
|
||||
except InvalidSchema as e:
|
||||
return None
|
||||
finally:
|
||||
return
|
||||
|
||||
class ContentScrapingStrategy(ABC):
|
||||
@abstractmethod
|
||||
def scrap(self, url: str, html: str, **kwargs) -> Dict[str, Any]:
|
||||
pass
|
||||
@@ -114,7 +147,7 @@ class ContentScrappingStrategy(ABC):
|
||||
async def ascrap(self, url: str, html: str, **kwargs) -> Dict[str, Any]:
|
||||
pass
|
||||
|
||||
class WebScrappingStrategy(ContentScrappingStrategy):
|
||||
class WebScrapingStrategy(ContentScrapingStrategy):
|
||||
def scrap(self, url: str, html: str, **kwargs) -> Dict[str, Any]:
|
||||
return self._get_content_of_website_optimized(url, html, is_async=False, **kwargs)
|
||||
|
||||
@@ -126,9 +159,16 @@ class WebScrappingStrategy(ContentScrappingStrategy):
|
||||
if not html:
|
||||
return None
|
||||
|
||||
soup = BeautifulSoup(html, 'html.parser')
|
||||
# soup = BeautifulSoup(html, 'html.parser')
|
||||
soup = BeautifulSoup(html, 'lxml')
|
||||
body = soup.body
|
||||
|
||||
try:
|
||||
meta = extract_metadata("", soup)
|
||||
except Exception as e:
|
||||
print('Error extracting metadata:', str(e))
|
||||
meta = {}
|
||||
|
||||
|
||||
image_description_min_word_threshold = kwargs.get('image_description_min_word_threshold', IMAGE_DESCRIPTION_MIN_WORD_THRESHOLD)
|
||||
|
||||
@@ -187,31 +227,7 @@ class WebScrappingStrategy(ContentScrappingStrategy):
|
||||
|
||||
#Score an image for it's usefulness
|
||||
def score_image_for_usefulness(img, base_url, index, images_count):
|
||||
# Function to parse image height/width value and units
|
||||
def parse_dimension(dimension):
|
||||
if dimension:
|
||||
match = re.match(r"(\d+)(\D*)", dimension)
|
||||
if match:
|
||||
number = int(match.group(1))
|
||||
unit = match.group(2) or 'px' # Default unit is 'px' if not specified
|
||||
return number, unit
|
||||
return None, None
|
||||
|
||||
# Fetch image file metadata to extract size and extension
|
||||
def fetch_image_file_size(img, base_url):
|
||||
#If src is relative path construct full URL, if not it may be CDN URL
|
||||
img_url = urljoin(base_url,img.get('src'))
|
||||
try:
|
||||
response = requests.head(img_url)
|
||||
if response.status_code == 200:
|
||||
return response.headers.get('Content-Length',None)
|
||||
else:
|
||||
print(f"Failed to retrieve file size for {img_url}")
|
||||
return None
|
||||
except InvalidSchema as e:
|
||||
return None
|
||||
finally:
|
||||
return
|
||||
|
||||
image_height = img.get('height')
|
||||
height_value, height_unit = parse_dimension(image_height)
|
||||
@@ -294,7 +310,6 @@ class WebScrappingStrategy(ContentScrappingStrategy):
|
||||
|
||||
exclude_social_media_domains = SOCIAL_MEDIA_DOMAINS + kwargs.get('exclude_social_media_domains', [])
|
||||
exclude_social_media_domains = list(set(exclude_social_media_domains))
|
||||
|
||||
|
||||
try:
|
||||
if element.name == 'a' and element.get('href'):
|
||||
@@ -439,15 +454,7 @@ class WebScrappingStrategy(ContentScrappingStrategy):
|
||||
except Exception as e:
|
||||
print('Error processing element:', str(e))
|
||||
return False
|
||||
|
||||
#process images by filtering and extracting contextual text from the page
|
||||
# imgs = body.find_all('img')
|
||||
# media['images'] = [
|
||||
# result for result in
|
||||
# (process_image(img, url, i, len(imgs)) for i, img in enumerate(imgs))
|
||||
# if result is not None
|
||||
# ]
|
||||
|
||||
|
||||
process_element(body)
|
||||
|
||||
# Update the links dictionary with unique links
|
||||
@@ -478,8 +485,9 @@ class WebScrappingStrategy(ContentScrappingStrategy):
|
||||
# Replace base64 data with empty string
|
||||
img['src'] = base64_pattern.sub('', src)
|
||||
|
||||
str_body = ""
|
||||
try:
|
||||
str(body)
|
||||
str_body = body.encode_contents().decode('utf-8')
|
||||
except Exception as e:
|
||||
# Reset body to the original HTML
|
||||
success = False
|
||||
@@ -504,11 +512,12 @@ class WebScrappingStrategy(ContentScrappingStrategy):
|
||||
|
||||
# Append the error div to the body
|
||||
body.body.append(error_div)
|
||||
str_body = body.encode_contents().decode('utf-8')
|
||||
|
||||
print(f"[LOG] 😧 Error: After processing the crawled HTML and removing irrelevant tags, nothing was left in the page. Check the markdown for further details.")
|
||||
|
||||
|
||||
cleaned_html = str(body).replace('\n\n', '\n').replace(' ', ' ')
|
||||
cleaned_html = str_body.replace('\n\n', '\n').replace(' ', ' ')
|
||||
|
||||
try:
|
||||
h = CustomHTML2Text()
|
||||
@@ -518,15 +527,14 @@ class WebScrappingStrategy(ContentScrappingStrategy):
|
||||
markdown = h.handle(sanitize_html(cleaned_html))
|
||||
markdown = markdown.replace(' ```', '```')
|
||||
|
||||
try:
|
||||
meta = extract_metadata(html, soup)
|
||||
except Exception as e:
|
||||
print('Error extracting metadata:', str(e))
|
||||
meta = {}
|
||||
|
||||
|
||||
cleaner = ContentCleaningStrategy()
|
||||
fit_html = cleaner.clean(cleaned_html)
|
||||
fit_markdown = h.handle(fit_html)
|
||||
fit_markdown = "Set flag 'fit_markdown' to True to get cleaned HTML content."
|
||||
fit_html = "Set flag 'fit_markdown' to True to get cleaned HTML content."
|
||||
if kwargs.get('fit_markdown', False):
|
||||
cleaner = ContentCleaningStrategy()
|
||||
fit_html = cleaner.clean(cleaned_html)
|
||||
fit_markdown = h.handle(fit_html)
|
||||
|
||||
cleaned_html = sanitize_html(cleaned_html)
|
||||
return {
|
||||
|
||||
@@ -736,46 +736,54 @@ def get_content_of_website_optimized(url: str, html: str, word_count_threshold:
|
||||
'metadata': meta
|
||||
}
|
||||
|
||||
def extract_metadata(html, soup = None):
|
||||
def extract_metadata(html, soup=None):
|
||||
metadata = {}
|
||||
|
||||
if not html:
|
||||
if not html and not soup:
|
||||
return {}
|
||||
|
||||
if not soup:
|
||||
soup = BeautifulSoup(html, 'lxml')
|
||||
|
||||
head = soup.head
|
||||
if not head:
|
||||
return metadata
|
||||
|
||||
# Parse HTML content with BeautifulSoup
|
||||
if not soup:
|
||||
soup = BeautifulSoup(html, 'html.parser')
|
||||
|
||||
# Title
|
||||
title_tag = soup.find('title')
|
||||
metadata['title'] = title_tag.string if title_tag else None
|
||||
title_tag = head.find('title')
|
||||
metadata['title'] = title_tag.string.strip() if title_tag and title_tag.string else None
|
||||
|
||||
# Meta description
|
||||
description_tag = soup.find('meta', attrs={'name': 'description'})
|
||||
metadata['description'] = description_tag['content'] if description_tag else None
|
||||
description_tag = head.find('meta', attrs={'name': 'description'})
|
||||
metadata['description'] = description_tag.get('content', '').strip() if description_tag else None
|
||||
|
||||
# Meta keywords
|
||||
keywords_tag = soup.find('meta', attrs={'name': 'keywords'})
|
||||
metadata['keywords'] = keywords_tag['content'] if keywords_tag else None
|
||||
keywords_tag = head.find('meta', attrs={'name': 'keywords'})
|
||||
metadata['keywords'] = keywords_tag.get('content', '').strip() if keywords_tag else None
|
||||
|
||||
# Meta author
|
||||
author_tag = soup.find('meta', attrs={'name': 'author'})
|
||||
metadata['author'] = author_tag['content'] if author_tag else None
|
||||
author_tag = head.find('meta', attrs={'name': 'author'})
|
||||
metadata['author'] = author_tag.get('content', '').strip() if author_tag else None
|
||||
|
||||
# Open Graph metadata
|
||||
og_tags = soup.find_all('meta', attrs={'property': lambda value: value and value.startswith('og:')})
|
||||
og_tags = head.find_all('meta', attrs={'property': re.compile(r'^og:')})
|
||||
for tag in og_tags:
|
||||
property_name = tag['property']
|
||||
metadata[property_name] = tag['content']
|
||||
property_name = tag.get('property', '').strip()
|
||||
content = tag.get('content', '').strip()
|
||||
if property_name and content:
|
||||
metadata[property_name] = content
|
||||
|
||||
# Twitter Card metadata
|
||||
twitter_tags = soup.find_all('meta', attrs={'name': lambda value: value and value.startswith('twitter:')})
|
||||
twitter_tags = head.find_all('meta', attrs={'name': re.compile(r'^twitter:')})
|
||||
for tag in twitter_tags:
|
||||
property_name = tag['name']
|
||||
metadata[property_name] = tag['content']
|
||||
|
||||
property_name = tag.get('name', '').strip()
|
||||
content = tag.get('content', '').strip()
|
||||
if property_name and content:
|
||||
metadata[property_name] = content
|
||||
|
||||
return metadata
|
||||
|
||||
|
||||
def extract_xml_tags(string):
|
||||
tags = re.findall(r'<(\w+)>', string)
|
||||
return list(set(tags))
|
||||
|
||||
@@ -10,6 +10,7 @@ from .extraction_strategy import *
|
||||
from .crawler_strategy import *
|
||||
from typing import List
|
||||
from concurrent.futures import ThreadPoolExecutor
|
||||
from .content_scrapping_strategy import WebScrapingStrategy
|
||||
from .config import *
|
||||
import warnings
|
||||
import json
|
||||
@@ -181,7 +182,21 @@ class WebCrawler:
|
||||
# Extract content from HTML
|
||||
try:
|
||||
t1 = time.time()
|
||||
result = get_content_of_website_optimized(url, html, word_count_threshold, css_selector=css_selector, only_text=kwargs.get("only_text", False))
|
||||
scrapping_strategy = WebScrapingStrategy()
|
||||
extra_params = {k: v for k, v in kwargs.items() if k not in ["only_text", "image_description_min_word_threshold"]}
|
||||
result = scrapping_strategy.scrap(
|
||||
url,
|
||||
html,
|
||||
word_count_threshold=word_count_threshold,
|
||||
css_selector=css_selector,
|
||||
only_text=kwargs.get("only_text", False),
|
||||
image_description_min_word_threshold=kwargs.get(
|
||||
"image_description_min_word_threshold", IMAGE_DESCRIPTION_MIN_WORD_THRESHOLD
|
||||
),
|
||||
**extra_params,
|
||||
)
|
||||
|
||||
# result = get_content_of_website_optimized(url, html, word_count_threshold, css_selector=css_selector, only_text=kwargs.get("only_text", False))
|
||||
if verbose:
|
||||
print(f"[LOG] 🚀 Content extracted for {url}, success: True, time taken: {time.time() - t1:.2f} seconds")
|
||||
|
||||
|
||||
235
docs/md_v2/basic/prefix-based-input.md
Normal file
235
docs/md_v2/basic/prefix-based-input.md
Normal file
@@ -0,0 +1,235 @@
|
||||
# Prefix-Based Input Handling in Crawl4AI
|
||||
|
||||
This guide will walk you through using the Crawl4AI library to crawl web pages, local HTML files, and raw HTML strings. We'll demonstrate these capabilities using a Wikipedia page as an example.
|
||||
|
||||
## Table of Contents
|
||||
- [Prefix-Based Input Handling in Crawl4AI](#prefix-based-input-handling-in-crawl4ai)
|
||||
- [Table of Contents](#table-of-contents)
|
||||
- [Crawling a Web URL](#crawling-a-web-url)
|
||||
- [Crawling a Local HTML File](#crawling-a-local-html-file)
|
||||
- [Crawling Raw HTML Content](#crawling-raw-html-content)
|
||||
- [Complete Example](#complete-example)
|
||||
- [**How It Works**](#how-it-works)
|
||||
- [**Running the Example**](#running-the-example)
|
||||
- [Conclusion](#conclusion)
|
||||
|
||||
---
|
||||
|
||||
|
||||
### Crawling a Web URL
|
||||
|
||||
To crawl a live web page, provide the URL starting with `http://` or `https://`.
|
||||
|
||||
```python
|
||||
import asyncio
|
||||
from crawl4ai import AsyncWebCrawler
|
||||
|
||||
async def crawl_web():
|
||||
async with AsyncWebCrawler(verbose=True) as crawler:
|
||||
result = await crawler.arun(url="https://en.wikipedia.org/wiki/apple", bypass_cache=True)
|
||||
if result.success:
|
||||
print("Markdown Content:")
|
||||
print(result.markdown)
|
||||
else:
|
||||
print(f"Failed to crawl: {result.error_message}")
|
||||
|
||||
asyncio.run(crawl_web())
|
||||
```
|
||||
|
||||
### Crawling a Local HTML File
|
||||
|
||||
To crawl a local HTML file, prefix the file path with `file://`.
|
||||
|
||||
```python
|
||||
import asyncio
|
||||
from crawl4ai import AsyncWebCrawler
|
||||
|
||||
async def crawl_local_file():
|
||||
local_file_path = "/path/to/apple.html" # Replace with your file path
|
||||
file_url = f"file://{local_file_path}"
|
||||
|
||||
async with AsyncWebCrawler(verbose=True) as crawler:
|
||||
result = await crawler.arun(url=file_url, bypass_cache=True)
|
||||
if result.success:
|
||||
print("Markdown Content from Local File:")
|
||||
print(result.markdown)
|
||||
else:
|
||||
print(f"Failed to crawl local file: {result.error_message}")
|
||||
|
||||
asyncio.run(crawl_local_file())
|
||||
```
|
||||
|
||||
### Crawling Raw HTML Content
|
||||
|
||||
To crawl raw HTML content, prefix the HTML string with `raw:`.
|
||||
|
||||
```python
|
||||
import asyncio
|
||||
from crawl4ai import AsyncWebCrawler
|
||||
|
||||
async def crawl_raw_html():
|
||||
raw_html = "<html><body><h1>Hello, World!</h1></body></html>"
|
||||
raw_html_url = f"raw:{raw_html}"
|
||||
|
||||
async with AsyncWebCrawler(verbose=True) as crawler:
|
||||
result = await crawler.arun(url=raw_html_url, bypass_cache=True)
|
||||
if result.success:
|
||||
print("Markdown Content from Raw HTML:")
|
||||
print(result.markdown)
|
||||
else:
|
||||
print(f"Failed to crawl raw HTML: {result.error_message}")
|
||||
|
||||
asyncio.run(crawl_raw_html())
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## Complete Example
|
||||
|
||||
Below is a comprehensive script that:
|
||||
1. **Crawls the Wikipedia page for "Apple".**
|
||||
2. **Saves the HTML content to a local file (`apple.html`).**
|
||||
3. **Crawls the local HTML file and verifies the markdown length matches the original crawl.**
|
||||
4. **Crawls the raw HTML content from the saved file and verifies consistency.**
|
||||
|
||||
```python
|
||||
import os
|
||||
import sys
|
||||
import asyncio
|
||||
from pathlib import Path
|
||||
|
||||
# Adjust the parent directory to include the crawl4ai module
|
||||
parent_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
|
||||
sys.path.append(parent_dir)
|
||||
|
||||
from crawl4ai import AsyncWebCrawler
|
||||
|
||||
async def main():
|
||||
# Define the URL to crawl
|
||||
wikipedia_url = "https://en.wikipedia.org/wiki/apple"
|
||||
|
||||
# Define the path to save the HTML file
|
||||
# Save the file in the same directory as the script
|
||||
script_dir = Path(__file__).parent
|
||||
html_file_path = script_dir / "apple.html"
|
||||
|
||||
async with AsyncWebCrawler(verbose=True) as crawler:
|
||||
print("\n=== Step 1: Crawling the Wikipedia URL ===")
|
||||
# Crawl the Wikipedia URL
|
||||
result = await crawler.arun(url=wikipedia_url, bypass_cache=True)
|
||||
|
||||
# Check if crawling was successful
|
||||
if not result.success:
|
||||
print(f"Failed to crawl {wikipedia_url}: {result.error_message}")
|
||||
return
|
||||
|
||||
# Save the HTML content to a local file
|
||||
with open(html_file_path, 'w', encoding='utf-8') as f:
|
||||
f.write(result.html)
|
||||
print(f"Saved HTML content to {html_file_path}")
|
||||
|
||||
# Store the length of the generated markdown
|
||||
web_crawl_length = len(result.markdown)
|
||||
print(f"Length of markdown from web crawl: {web_crawl_length}\n")
|
||||
|
||||
print("=== Step 2: Crawling from the Local HTML File ===")
|
||||
# Construct the file URL with 'file://' prefix
|
||||
file_url = f"file://{html_file_path.resolve()}"
|
||||
|
||||
# Crawl the local HTML file
|
||||
local_result = await crawler.arun(url=file_url, bypass_cache=True)
|
||||
|
||||
# Check if crawling was successful
|
||||
if not local_result.success:
|
||||
print(f"Failed to crawl local file {file_url}: {local_result.error_message}")
|
||||
return
|
||||
|
||||
# Store the length of the generated markdown from local file
|
||||
local_crawl_length = len(local_result.markdown)
|
||||
print(f"Length of markdown from local file crawl: {local_crawl_length}")
|
||||
|
||||
# Compare the lengths
|
||||
assert web_crawl_length == local_crawl_length, (
|
||||
f"Markdown length mismatch: Web crawl ({web_crawl_length}) != Local file crawl ({local_crawl_length})"
|
||||
)
|
||||
print("✅ Markdown length matches between web crawl and local file crawl.\n")
|
||||
|
||||
print("=== Step 3: Crawling Using Raw HTML Content ===")
|
||||
# Read the HTML content from the saved file
|
||||
with open(html_file_path, 'r', encoding='utf-8') as f:
|
||||
raw_html_content = f.read()
|
||||
|
||||
# Prefix the raw HTML content with 'raw:'
|
||||
raw_html_url = f"raw:{raw_html_content}"
|
||||
|
||||
# Crawl using the raw HTML content
|
||||
raw_result = await crawler.arun(url=raw_html_url, bypass_cache=True)
|
||||
|
||||
# Check if crawling was successful
|
||||
if not raw_result.success:
|
||||
print(f"Failed to crawl raw HTML content: {raw_result.error_message}")
|
||||
return
|
||||
|
||||
# Store the length of the generated markdown from raw HTML
|
||||
raw_crawl_length = len(raw_result.markdown)
|
||||
print(f"Length of markdown from raw HTML crawl: {raw_crawl_length}")
|
||||
|
||||
# Compare the lengths
|
||||
assert web_crawl_length == raw_crawl_length, (
|
||||
f"Markdown length mismatch: Web crawl ({web_crawl_length}) != Raw HTML crawl ({raw_crawl_length})"
|
||||
)
|
||||
print("✅ Markdown length matches between web crawl and raw HTML crawl.\n")
|
||||
|
||||
print("All tests passed successfully!")
|
||||
|
||||
# Clean up by removing the saved HTML file
|
||||
if html_file_path.exists():
|
||||
os.remove(html_file_path)
|
||||
print(f"Removed the saved HTML file: {html_file_path}")
|
||||
|
||||
# Run the main function
|
||||
if __name__ == "__main__":
|
||||
asyncio.run(main())
|
||||
```
|
||||
|
||||
### **How It Works**
|
||||
|
||||
1. **Step 1: Crawl the Web URL**
|
||||
- Crawls `https://en.wikipedia.org/wiki/apple`.
|
||||
- Saves the HTML content to `apple.html`.
|
||||
- Records the length of the generated markdown.
|
||||
|
||||
2. **Step 2: Crawl from the Local HTML File**
|
||||
- Uses the `file://` prefix to crawl `apple.html`.
|
||||
- Ensures the markdown length matches the original web crawl.
|
||||
|
||||
3. **Step 3: Crawl Using Raw HTML Content**
|
||||
- Reads the HTML from `apple.html`.
|
||||
- Prefixes it with `raw:` and crawls.
|
||||
- Verifies the markdown length matches the previous results.
|
||||
|
||||
4. **Cleanup**
|
||||
- Deletes the `apple.html` file after testing.
|
||||
|
||||
### **Running the Example**
|
||||
|
||||
1. **Save the Script:**
|
||||
- Save the above code as `test_crawl4ai.py` in your project directory.
|
||||
|
||||
2. **Execute the Script:**
|
||||
- Run the script using:
|
||||
```bash
|
||||
python test_crawl4ai.py
|
||||
```
|
||||
|
||||
3. **Observe the Output:**
|
||||
- The script will print logs detailing each step.
|
||||
- Assertions ensure consistency across different crawling methods.
|
||||
- Upon success, it confirms that all markdown lengths match.
|
||||
|
||||
---
|
||||
|
||||
## Conclusion
|
||||
|
||||
With the new prefix-based input handling in **Crawl4AI**, you can effortlessly crawl web URLs, local HTML files, and raw HTML strings using a unified `url` parameter. This enhancement simplifies the API usage and provides greater flexibility for diverse crawling scenarios.
|
||||
|
||||
12
main.py
12
main.py
@@ -321,7 +321,12 @@ app.add_middleware(
|
||||
|
||||
# Mount the pages directory as a static directory
|
||||
app.mount("/pages", StaticFiles(directory=__location__ + "/pages"), name="pages")
|
||||
app.mount("/mkdocs", StaticFiles(directory="site", html=True), name="mkdocs")
|
||||
|
||||
# Check if site directory exists
|
||||
if os.path.exists(__location__ + "/site"):
|
||||
# Mount the site directory as a static directory
|
||||
app.mount("/mkdocs", StaticFiles(directory="site", html=True), name="mkdocs")
|
||||
|
||||
site_templates = Jinja2Templates(directory=__location__ + "/site")
|
||||
templates = Jinja2Templates(directory=__location__ + "/pages")
|
||||
|
||||
@@ -337,7 +342,10 @@ async def shutdown_event():
|
||||
|
||||
@app.get("/")
|
||||
def read_root():
|
||||
return RedirectResponse(url="/mkdocs")
|
||||
if os.path.exists(__location__ + "/site"):
|
||||
return RedirectResponse(url="/mkdocs")
|
||||
# Return a json response
|
||||
return {"message": "Crawl4AI API service is running"}
|
||||
|
||||
|
||||
@app.post("/crawl")
|
||||
|
||||
@@ -8,4 +8,4 @@ playwright>=1.47,<1.48
|
||||
python-dotenv~=1.0
|
||||
requests~=2.26
|
||||
beautifulsoup4~=4.12
|
||||
playwright_stealth~=1.0
|
||||
tf-playwright-stealth~=1.0
|
||||
|
||||
2179
tests/async/sample_wikipedia.html
Normal file
2179
tests/async/sample_wikipedia.html
Normal file
File diff suppressed because one or more lines are too long
219
tests/async/test_async_executor.py
Normal file
219
tests/async/test_async_executor.py
Normal file
@@ -0,0 +1,219 @@
|
||||
import os, sys
|
||||
import unittest
|
||||
import asynctest
|
||||
import asyncio
|
||||
import time
|
||||
|
||||
from typing import Dict, Any, List
|
||||
from unittest.mock import AsyncMock, MagicMock, patch
|
||||
|
||||
# Add the parent directory to the Python path
|
||||
parent_dir = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
||||
sys.path.append(parent_dir)
|
||||
|
||||
# Assuming all classes and imports are already available from the code above
|
||||
from crawl4ai.async_webcrawler import AsyncWebCrawler
|
||||
from crawl4ai.config import MAX_METRICS_HISTORY
|
||||
from crawl4ai.async_executor import (
|
||||
SpeedOptimizedExecutor,
|
||||
ResourceOptimizedExecutor,
|
||||
AsyncWebCrawler,
|
||||
ExecutionMode,
|
||||
SystemMetrics,
|
||||
CallbackType
|
||||
)
|
||||
|
||||
class TestAsyncExecutor(asynctest.TestCase):
|
||||
async def setUp(self):
|
||||
# Set up a mock crawler
|
||||
self.mock_crawler = AsyncMock(spec=AsyncWebCrawler)
|
||||
self.mock_crawler.arun = AsyncMock(side_effect=self.mock_crawl)
|
||||
|
||||
# Sample URLs
|
||||
self.urls = [
|
||||
"https://www.example.com",
|
||||
"https://www.python.org",
|
||||
"https://www.asyncio.org",
|
||||
"https://www.nonexistenturl.xyz", # This will simulate a failure
|
||||
]
|
||||
|
||||
# Set up callbacks
|
||||
self.callbacks = {
|
||||
CallbackType.PRE_EXECUTION: AsyncMock(),
|
||||
CallbackType.POST_EXECUTION: AsyncMock(),
|
||||
CallbackType.ON_ERROR: AsyncMock(),
|
||||
CallbackType.ON_RETRY: AsyncMock(),
|
||||
CallbackType.ON_BATCH_START: AsyncMock(),
|
||||
CallbackType.ON_BATCH_END: AsyncMock(),
|
||||
CallbackType.ON_COMPLETE: AsyncMock(),
|
||||
}
|
||||
|
||||
async def mock_crawl(self, url: str, **kwargs):
|
||||
if "nonexistenturl" in url:
|
||||
raise Exception("Failed to fetch URL")
|
||||
return f"Mock content for {url}"
|
||||
|
||||
async def test_speed_executor_basic(self):
|
||||
"""Test basic functionality of SpeedOptimizedExecutor."""
|
||||
executor = SpeedOptimizedExecutor(
|
||||
crawler=self.mock_crawler,
|
||||
callbacks=self.callbacks,
|
||||
max_retries=1,
|
||||
)
|
||||
|
||||
results = await executor.execute(self.urls)
|
||||
|
||||
# Assertions
|
||||
self.assertEqual(len(results), len(self.urls))
|
||||
self.mock_crawler.arun.assert_awaited()
|
||||
self.callbacks[CallbackType.PRE_EXECUTION].assert_awaited()
|
||||
self.callbacks[CallbackType.POST_EXECUTION].assert_awaited()
|
||||
self.callbacks[CallbackType.ON_ERROR].assert_awaited()
|
||||
|
||||
async def test_resource_executor_basic(self):
|
||||
"""Test basic functionality of ResourceOptimizedExecutor."""
|
||||
executor = ResourceOptimizedExecutor(
|
||||
crawler=self.mock_crawler,
|
||||
callbacks=self.callbacks,
|
||||
max_concurrent_tasks=2,
|
||||
max_retries=1,
|
||||
)
|
||||
|
||||
results = await executor.execute(self.urls)
|
||||
|
||||
# Assertions
|
||||
self.assertEqual(len(results), len(self.urls))
|
||||
self.mock_crawler.arun.assert_awaited()
|
||||
self.callbacks[CallbackType.PRE_EXECUTION].assert_awaited()
|
||||
self.callbacks[CallbackType.POST_EXECUTION].assert_awaited()
|
||||
self.callbacks[CallbackType.ON_ERROR].assert_awaited()
|
||||
|
||||
async def test_pause_and_resume(self):
|
||||
"""Test the pause and resume functionality."""
|
||||
executor = SpeedOptimizedExecutor(
|
||||
crawler=self.mock_crawler,
|
||||
callbacks=self.callbacks,
|
||||
max_retries=1,
|
||||
)
|
||||
|
||||
execution_task = asyncio.create_task(executor.execute(self.urls))
|
||||
await asyncio.sleep(0.1)
|
||||
await executor.control.pause()
|
||||
self.assertTrue(await executor.control.is_paused())
|
||||
|
||||
# Ensure that execution is paused
|
||||
await asyncio.sleep(0.5)
|
||||
await executor.control.resume()
|
||||
self.assertFalse(await executor.control.is_paused())
|
||||
|
||||
results = await execution_task
|
||||
|
||||
# Assertions
|
||||
self.assertEqual(len(results), len(self.urls))
|
||||
|
||||
async def test_cancellation(self):
|
||||
"""Test the cancellation functionality."""
|
||||
executor = SpeedOptimizedExecutor(
|
||||
crawler=self.mock_crawler,
|
||||
callbacks=self.callbacks,
|
||||
max_retries=1,
|
||||
)
|
||||
|
||||
execution_task = asyncio.create_task(executor.execute(self.urls))
|
||||
await asyncio.sleep(0.1)
|
||||
await executor.control.cancel()
|
||||
self.assertTrue(await executor.control.is_cancelled())
|
||||
|
||||
with self.assertRaises(asyncio.CancelledError):
|
||||
await execution_task
|
||||
|
||||
async def test_max_retries(self):
|
||||
"""Test that the executor respects the max_retries setting."""
|
||||
executor = SpeedOptimizedExecutor(
|
||||
crawler=self.mock_crawler,
|
||||
callbacks=self.callbacks,
|
||||
max_retries=2,
|
||||
)
|
||||
|
||||
results = await executor.execute(self.urls)
|
||||
|
||||
# The failing URL should have been retried
|
||||
self.assertEqual(self.mock_crawler.arun.call_count, len(self.urls) + 2)
|
||||
self.assertEqual(executor.metrics.total_retries, 2)
|
||||
|
||||
async def test_callbacks_invoked(self):
|
||||
"""Test that all callbacks are invoked appropriately."""
|
||||
executor = SpeedOptimizedExecutor(
|
||||
crawler=self.mock_crawler,
|
||||
callbacks=self.callbacks,
|
||||
max_retries=1,
|
||||
)
|
||||
|
||||
await executor.execute(self.urls)
|
||||
|
||||
# Check that callbacks were called the correct number of times
|
||||
self.assertEqual(
|
||||
self.callbacks[CallbackType.PRE_EXECUTION].call_count,
|
||||
len(self.urls) * (1 + executor.metrics.total_retries),
|
||||
)
|
||||
self.assertEqual(
|
||||
self.callbacks[CallbackType.POST_EXECUTION].call_count,
|
||||
executor.metrics.completed_urls,
|
||||
)
|
||||
self.assertEqual(
|
||||
self.callbacks[CallbackType.ON_ERROR].call_count,
|
||||
executor.metrics.failed_urls * (1 + executor.metrics.total_retries),
|
||||
)
|
||||
self.callbacks[CallbackType.ON_COMPLETE].assert_awaited_once()
|
||||
|
||||
async def test_resource_limits(self):
|
||||
"""Test that the ResourceOptimizedExecutor respects resource limits."""
|
||||
with patch('psutil.cpu_percent', return_value=95), \
|
||||
patch('psutil.virtual_memory', return_value=MagicMock(percent=85, available=1000)):
|
||||
executor = ResourceOptimizedExecutor(
|
||||
crawler=self.mock_crawler,
|
||||
callbacks=self.callbacks,
|
||||
max_concurrent_tasks=2,
|
||||
max_retries=1,
|
||||
)
|
||||
|
||||
results = await executor.execute(self.urls)
|
||||
|
||||
# Assertions
|
||||
self.assertEqual(len(results), len(self.urls))
|
||||
# Since resources are over threshold, batch size should be minimized
|
||||
batch_sizes = [executor.resource_monitor.get_optimal_batch_size(len(self.urls))]
|
||||
self.assertTrue(all(size == 1 for size in batch_sizes))
|
||||
|
||||
async def test_system_metrics_limit(self):
|
||||
"""Test that the system_metrics list does not grow indefinitely."""
|
||||
executor = SpeedOptimizedExecutor(
|
||||
crawler=self.mock_crawler,
|
||||
callbacks=self.callbacks,
|
||||
max_retries=1,
|
||||
)
|
||||
|
||||
# Simulate many batches to exceed MAX_METRICS_HISTORY
|
||||
original_max_history = MAX_METRICS_HISTORY
|
||||
try:
|
||||
# Temporarily reduce MAX_METRICS_HISTORY for the test
|
||||
globals()['MAX_METRICS_HISTORY'] = 5
|
||||
|
||||
# Mock capture_system_metrics to increase system_metrics length
|
||||
with patch.object(executor.metrics, 'capture_system_metrics') as mock_capture:
|
||||
def side_effect():
|
||||
executor.metrics.system_metrics.append(SystemMetrics(0, 0, 0, time.time()))
|
||||
if len(executor.metrics.system_metrics) > MAX_METRICS_HISTORY:
|
||||
executor.metrics.system_metrics.pop(0)
|
||||
mock_capture.side_effect = side_effect
|
||||
|
||||
await executor.execute(self.urls * 3) # Multiply URLs to create more batches
|
||||
|
||||
# Assertions
|
||||
self.assertLessEqual(len(executor.metrics.system_metrics), MAX_METRICS_HISTORY)
|
||||
finally:
|
||||
# Restore original MAX_METRICS_HISTORY
|
||||
globals()['MAX_METRICS_HISTORY'] = original_max_history
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
162
tests/async/test_content_scraper_strategy.py
Normal file
162
tests/async/test_content_scraper_strategy.py
Normal file
@@ -0,0 +1,162 @@
|
||||
import asyncio
|
||||
from bs4 import BeautifulSoup
|
||||
from typing import Dict, Any
|
||||
import os
|
||||
import sys
|
||||
import time
|
||||
import csv
|
||||
from tabulate import tabulate
|
||||
from dataclasses import dataclass
|
||||
from typing import List, Dict
|
||||
|
||||
parent_dir = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
||||
sys.path.append(parent_dir)
|
||||
__location__ = os.path.realpath(os.path.join(os.getcwd(), os.path.dirname(__file__)))
|
||||
|
||||
from crawl4ai.content_scrapping_strategy import WebScrapingStrategy
|
||||
from crawl4ai.content_scrapping_strategy import WebScrapingStrategy as WebScrapingStrategyCurrent
|
||||
# from crawl4ai.content_scrapping_strategy_current import WebScrapingStrategy as WebScrapingStrategyCurrent
|
||||
|
||||
@dataclass
|
||||
class TestResult:
|
||||
name: str
|
||||
success: bool
|
||||
images: int
|
||||
internal_links: int
|
||||
external_links: int
|
||||
markdown_length: int
|
||||
execution_time: float
|
||||
|
||||
class StrategyTester:
|
||||
def __init__(self):
|
||||
self.new_scraper = WebScrapingStrategy()
|
||||
self.current_scraper = WebScrapingStrategyCurrent()
|
||||
with open(__location__ + '/sample_wikipedia.html', 'r', encoding='utf-8') as f:
|
||||
self.WIKI_HTML = f.read()
|
||||
self.results = {'new': [], 'current': []}
|
||||
|
||||
def run_test(self, name: str, **kwargs) -> tuple[TestResult, TestResult]:
|
||||
results = []
|
||||
for scraper in [self.new_scraper, self.current_scraper]:
|
||||
start_time = time.time()
|
||||
result = scraper._get_content_of_website_optimized(
|
||||
url="https://en.wikipedia.org/wiki/Test",
|
||||
html=self.WIKI_HTML,
|
||||
**kwargs
|
||||
)
|
||||
execution_time = time.time() - start_time
|
||||
|
||||
test_result = TestResult(
|
||||
name=name,
|
||||
success=result['success'],
|
||||
images=len(result['media']['images']),
|
||||
internal_links=len(result['links']['internal']),
|
||||
external_links=len(result['links']['external']),
|
||||
markdown_length=len(result['markdown']),
|
||||
execution_time=execution_time
|
||||
)
|
||||
results.append(test_result)
|
||||
|
||||
return results[0], results[1] # new, current
|
||||
|
||||
def run_all_tests(self):
|
||||
test_cases = [
|
||||
("Basic Extraction", {}),
|
||||
("Exclude Tags", {'excluded_tags': ['table', 'div.infobox', 'div.navbox']}),
|
||||
("Word Threshold", {'word_count_threshold': 50}),
|
||||
("CSS Selector", {'css_selector': 'div.mw-parser-output > p'}),
|
||||
("Link Exclusions", {
|
||||
'exclude_external_links': True,
|
||||
'exclude_social_media_links': True,
|
||||
'exclude_domains': ['facebook.com', 'twitter.com']
|
||||
}),
|
||||
("Media Handling", {
|
||||
'exclude_external_images': True,
|
||||
'image_description_min_word_threshold': 20
|
||||
}),
|
||||
("Text Only", {
|
||||
'only_text': True,
|
||||
'remove_forms': True
|
||||
}),
|
||||
("HTML Cleaning", {
|
||||
'clean_html': True,
|
||||
'keep_data_attributes': True
|
||||
}),
|
||||
("HTML2Text Options", {
|
||||
'html2text': {
|
||||
'skip_internal_links': True,
|
||||
'single_line_break': True,
|
||||
'mark_code': True,
|
||||
'preserve_tags': ['pre', 'code']
|
||||
}
|
||||
})
|
||||
]
|
||||
|
||||
all_results = []
|
||||
for name, kwargs in test_cases:
|
||||
try:
|
||||
new_result, current_result = self.run_test(name, **kwargs)
|
||||
all_results.append((name, new_result, current_result))
|
||||
except Exception as e:
|
||||
print(f"Error in {name}: {str(e)}")
|
||||
|
||||
self.save_results_to_csv(all_results)
|
||||
self.print_comparison_table(all_results)
|
||||
|
||||
def save_results_to_csv(self, all_results: List[tuple]):
|
||||
csv_file = os.path.join(__location__, 'strategy_comparison_results.csv')
|
||||
with open(csv_file, 'w', newline='') as f:
|
||||
writer = csv.writer(f)
|
||||
writer.writerow(['Test Name', 'Strategy', 'Success', 'Images', 'Internal Links',
|
||||
'External Links', 'Markdown Length', 'Execution Time'])
|
||||
|
||||
for name, new_result, current_result in all_results:
|
||||
writer.writerow([name, 'New', new_result.success, new_result.images,
|
||||
new_result.internal_links, new_result.external_links,
|
||||
new_result.markdown_length, f"{new_result.execution_time:.3f}"])
|
||||
writer.writerow([name, 'Current', current_result.success, current_result.images,
|
||||
current_result.internal_links, current_result.external_links,
|
||||
current_result.markdown_length, f"{current_result.execution_time:.3f}"])
|
||||
|
||||
def print_comparison_table(self, all_results: List[tuple]):
|
||||
table_data = []
|
||||
headers = ['Test Name', 'Strategy', 'Success', 'Images', 'Internal Links',
|
||||
'External Links', 'Markdown Length', 'Time (s)']
|
||||
|
||||
for name, new_result, current_result in all_results:
|
||||
# Check for differences
|
||||
differences = []
|
||||
if new_result.images != current_result.images: differences.append('images')
|
||||
if new_result.internal_links != current_result.internal_links: differences.append('internal_links')
|
||||
if new_result.external_links != current_result.external_links: differences.append('external_links')
|
||||
if new_result.markdown_length != current_result.markdown_length: differences.append('markdown')
|
||||
|
||||
# Add row for new strategy
|
||||
new_row = [
|
||||
name, 'New', new_result.success, new_result.images,
|
||||
new_result.internal_links, new_result.external_links,
|
||||
new_result.markdown_length, f"{new_result.execution_time:.3f}"
|
||||
]
|
||||
table_data.append(new_row)
|
||||
|
||||
# Add row for current strategy
|
||||
current_row = [
|
||||
'', 'Current', current_result.success, current_result.images,
|
||||
current_result.internal_links, current_result.external_links,
|
||||
current_result.markdown_length, f"{current_result.execution_time:.3f}"
|
||||
]
|
||||
table_data.append(current_row)
|
||||
|
||||
# Add difference summary if any
|
||||
if differences:
|
||||
table_data.append(['', '⚠️ Differences', ', '.join(differences), '', '', '', '', ''])
|
||||
|
||||
# Add empty row for better readability
|
||||
table_data.append([''] * len(headers))
|
||||
|
||||
print("\nStrategy Comparison Results:")
|
||||
print(tabulate(table_data, headers=headers, tablefmt='grid'))
|
||||
|
||||
if __name__ == "__main__":
|
||||
tester = StrategyTester()
|
||||
tester.run_all_tests()
|
||||
Reference in New Issue
Block a user