Compare commits
7 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
5b84ac9186 | ||
|
|
7ea5603576 | ||
|
|
4750810a67 | ||
|
|
e0e0db4247 | ||
|
|
bccadec887 | ||
|
|
0759503e50 | ||
|
|
7f1c020746 |
6
.gitignore
vendored
6
.gitignore
vendored
@@ -196,4 +196,8 @@ docs/.DS_Store
|
||||
tmp/
|
||||
test_env/
|
||||
**/.DS_Store
|
||||
**/.DS_Store
|
||||
**/.DS_Store
|
||||
|
||||
todo.md
|
||||
git_changes.py
|
||||
git_changes.md
|
||||
|
||||
@@ -1,5 +1,14 @@
|
||||
# Changelog
|
||||
|
||||
## [v0.3.5] - 2024-09-02
|
||||
|
||||
Enhance AsyncWebCrawler with smart waiting and screenshot capabilities
|
||||
|
||||
- Implement smart_wait function in AsyncPlaywrightCrawlerStrategy
|
||||
- Add screenshot support to AsyncCrawlResponse and AsyncWebCrawler
|
||||
- Improve error handling and timeout management in crawling process
|
||||
- Fix typo in CrawlResult model (responser_headers -> response_headers)
|
||||
|
||||
## [v0.2.77] - 2024-08-04
|
||||
|
||||
Significant improvements in text processing and performance:
|
||||
|
||||
20
README.md
20
README.md
@@ -8,7 +8,8 @@
|
||||
|
||||
Crawl4AI simplifies asynchronous web crawling and data extraction, making it accessible for large language models (LLMs) and AI applications. 🆓🌐
|
||||
|
||||
> Looking for the synchronous version? Check out [README.sync.md](./README.sync.md).
|
||||
> Looking for the synchronous version? Check out [README.sync.md](./README.sync.md). You can also access the previous version in the branch [V0.2.76](https://github.com/unclecode/crawl4ai/blob/v0.2.76).
|
||||
|
||||
|
||||
## Try it Now!
|
||||
|
||||
@@ -38,7 +39,6 @@ Crawl4AI simplifies asynchronous web crawling and data extraction, making it acc
|
||||
- 🔄 Session management for complex multi-page crawling scenarios
|
||||
- 🌐 Asynchronous architecture for improved performance and scalability
|
||||
|
||||
|
||||
## Installation 🛠️
|
||||
|
||||
Crawl4AI offers flexible installation options to suit various use cases. You can install it as a Python package or use Docker.
|
||||
@@ -55,9 +55,21 @@ For basic web crawling and scraping tasks:
|
||||
pip install crawl4ai
|
||||
```
|
||||
|
||||
By default this will install the asynchronous version of Crawl4AI, using Playwright for web crawling.
|
||||
By default, this will install the asynchronous version of Crawl4AI, using Playwright for web crawling.
|
||||
|
||||
👉 Note: The standard version of Crawl4AI uses Playwright for asynchronous crawling. If you encounter an error saying that Playwright is not installed, you can run playwright install. However, this should be done automatically during the setup process.
|
||||
👉 Note: When you install Crawl4AI, the setup script should automatically install and set up Playwright. However, if you encounter any Playwright-related errors, you can manually install it using one of these methods:
|
||||
|
||||
1. Through the command line:
|
||||
```bash
|
||||
playwright install
|
||||
```
|
||||
|
||||
2. If the above doesn't work, try this more specific command:
|
||||
```bash
|
||||
python -m playwright install chromium
|
||||
```
|
||||
|
||||
This second method has proven to be more reliable in some cases.
|
||||
|
||||
#### Installation with Synchronous Version
|
||||
|
||||
|
||||
@@ -3,7 +3,7 @@
|
||||
from .async_webcrawler import AsyncWebCrawler
|
||||
from .models import CrawlResult
|
||||
|
||||
__version__ = "0.3.2"
|
||||
__version__ = "0.3.5"
|
||||
|
||||
__all__ = [
|
||||
"AsyncWebCrawler",
|
||||
|
||||
@@ -3,28 +3,21 @@ import base64, time
|
||||
from abc import ABC, abstractmethod
|
||||
from typing import Callable, Dict, Any, List, Optional
|
||||
import os
|
||||
import psutil
|
||||
from playwright.async_api import async_playwright, Page, Browser, Error
|
||||
from io import BytesIO
|
||||
from PIL import Image, ImageDraw, ImageFont
|
||||
from .utils import sanitize_input_encode
|
||||
from .utils import sanitize_input_encode, calculate_semaphore_count
|
||||
import json, uuid
|
||||
import hashlib
|
||||
from pathlib import Path
|
||||
from playwright.async_api import ProxySettings
|
||||
from pydantic import BaseModel
|
||||
|
||||
def calculate_semaphore_count():
|
||||
cpu_count = os.cpu_count()
|
||||
memory_gb = psutil.virtual_memory().total / (1024 ** 3) # Convert to GB
|
||||
base_count = max(1, cpu_count // 2)
|
||||
memory_based_cap = int(memory_gb / 2) # Assume 2GB per instance
|
||||
return min(base_count, memory_based_cap)
|
||||
|
||||
class AsyncCrawlResponse(BaseModel):
|
||||
html: str
|
||||
response_headers: Dict[str, str]
|
||||
status_code: int
|
||||
screenshot: Optional[str] = None
|
||||
|
||||
class AsyncCrawlerStrategy(ABC):
|
||||
@abstractmethod
|
||||
@@ -148,6 +141,45 @@ class AsyncPlaywrightCrawlerStrategy(AsyncCrawlerStrategy):
|
||||
asyncio.create_task(self.kill_session(sid))
|
||||
|
||||
|
||||
async def smart_wait(self, page: Page, wait_for: str, timeout: float = 30000):
|
||||
wait_for = wait_for.strip()
|
||||
|
||||
if wait_for.startswith('js:'):
|
||||
# Explicitly specified JavaScript
|
||||
js_code = wait_for[3:].strip()
|
||||
return await self.csp_compliant_wait(page, js_code, timeout)
|
||||
elif wait_for.startswith('css:'):
|
||||
# Explicitly specified CSS selector
|
||||
css_selector = wait_for[4:].strip()
|
||||
try:
|
||||
await page.wait_for_selector(css_selector, timeout=timeout)
|
||||
except Error as e:
|
||||
if 'Timeout' in str(e):
|
||||
raise TimeoutError(f"Timeout after {timeout}ms waiting for selector '{css_selector}'")
|
||||
else:
|
||||
raise ValueError(f"Invalid CSS selector: '{css_selector}'")
|
||||
else:
|
||||
# Auto-detect based on content
|
||||
if wait_for.startswith('()') or wait_for.startswith('function'):
|
||||
# It's likely a JavaScript function
|
||||
return await self.csp_compliant_wait(page, wait_for, timeout)
|
||||
else:
|
||||
# Assume it's a CSS selector first
|
||||
try:
|
||||
await page.wait_for_selector(wait_for, timeout=timeout)
|
||||
except Error as e:
|
||||
if 'Timeout' in str(e):
|
||||
raise TimeoutError(f"Timeout after {timeout}ms waiting for selector '{wait_for}'")
|
||||
else:
|
||||
# If it's not a timeout error, it might be an invalid selector
|
||||
# Let's try to evaluate it as a JavaScript function as a fallback
|
||||
try:
|
||||
return await self.csp_compliant_wait(page, f"() => {{{wait_for}}}", timeout)
|
||||
except Error:
|
||||
raise ValueError(f"Invalid wait_for parameter: '{wait_for}'. "
|
||||
"It should be either a valid CSS selector, a JavaScript function, "
|
||||
"or explicitly prefixed with 'js:' or 'css:'.")
|
||||
|
||||
async def csp_compliant_wait(self, page: Page, user_wait_function: str, timeout: float = 30000):
|
||||
wrapper_js = f"""
|
||||
async () => {{
|
||||
@@ -259,19 +291,9 @@ class AsyncPlaywrightCrawlerStrategy(AsyncCrawlerStrategy):
|
||||
wait_for = kwargs.get("wait_for")
|
||||
if wait_for:
|
||||
try:
|
||||
await self.csp_compliant_wait(page, wait_for, timeout=kwargs.get("timeout", 30000))
|
||||
await self.smart_wait(page, wait_for, timeout=kwargs.get("timeout", 30000))
|
||||
except Exception as e:
|
||||
raise RuntimeError(f"Custom wait condition failed: {str(e)}")
|
||||
# try:
|
||||
# await page.wait_for_function(wait_for)
|
||||
# # if callable(wait_for):
|
||||
# # await page.wait_for_function(wait_for)
|
||||
# # elif isinstance(wait_for, str):
|
||||
# # await page.wait_for_selector(wait_for)
|
||||
# # else:
|
||||
# # raise ValueError("wait_for must be either a callable or a CSS selector string")
|
||||
# except Error as e:
|
||||
# raise Error(f"Custom wait condition failed: {str(e)}")
|
||||
raise RuntimeError(f"Wait condition failed: {str(e)}")
|
||||
|
||||
html = await page.content()
|
||||
page = await self.execute_hook('before_return_html', page, html)
|
||||
|
||||
@@ -80,7 +80,7 @@ class AsyncWebCrawler:
|
||||
|
||||
word_count_threshold = max(word_count_threshold, MIN_WORD_THRESHOLD)
|
||||
|
||||
async_response : AsyncCrawlResponse = None
|
||||
async_response: AsyncCrawlResponse = None
|
||||
cached = None
|
||||
screenshot_data = None
|
||||
extracted_content = None
|
||||
@@ -102,15 +102,14 @@ class AsyncWebCrawler:
|
||||
t1 = time.time()
|
||||
if user_agent:
|
||||
self.crawler_strategy.update_user_agent(user_agent)
|
||||
async_response : AsyncCrawlResponse = await self.crawler_strategy.crawl(url, **kwargs)
|
||||
async_response: AsyncCrawlResponse = await self.crawler_strategy.crawl(url, screenshot=screenshot, **kwargs)
|
||||
html = sanitize_input_encode(async_response.html)
|
||||
screenshot_data = async_response.screenshot
|
||||
t2 = time.time()
|
||||
if verbose:
|
||||
print(
|
||||
f"[LOG] 🚀 Crawling done for {url}, success: {bool(html)}, time taken: {t2 - t1:.2f} seconds"
|
||||
)
|
||||
if screenshot:
|
||||
screenshot_data = await self.crawler_strategy.take_screenshot(url)
|
||||
|
||||
crawl_result = await self.aprocess_html(
|
||||
url,
|
||||
@@ -127,7 +126,7 @@ class AsyncWebCrawler:
|
||||
**kwargs,
|
||||
)
|
||||
crawl_result.status_code = async_response.status_code if async_response else 200
|
||||
crawl_result.responser_headers = async_response.response_headers if async_response else {}
|
||||
crawl_result.response_headers = async_response.response_headers if async_response else {}
|
||||
crawl_result.success = bool(html)
|
||||
crawl_result.session_id = kwargs.get("session_id", None)
|
||||
return crawl_result
|
||||
|
||||
@@ -18,5 +18,5 @@ class CrawlResult(BaseModel):
|
||||
metadata: Optional[dict] = None
|
||||
error_message: Optional[str] = None
|
||||
session_id: Optional[str] = None
|
||||
responser_headers: Optional[dict] = None
|
||||
response_headers: Optional[dict] = None
|
||||
status_code: Optional[int] = None
|
||||
@@ -6,6 +6,7 @@ import json
|
||||
import html
|
||||
import re
|
||||
import os
|
||||
import platform
|
||||
from html2text import HTML2Text
|
||||
from .prompts import PROMPT_EXTRACT_BLOCKS
|
||||
from .config import *
|
||||
@@ -18,6 +19,46 @@ from requests.exceptions import InvalidSchema
|
||||
class InvalidCSSSelectorError(Exception):
|
||||
pass
|
||||
|
||||
def calculate_semaphore_count():
|
||||
cpu_count = os.cpu_count()
|
||||
memory_gb = get_system_memory() / (1024 ** 3) # Convert to GB
|
||||
base_count = max(1, cpu_count // 2)
|
||||
memory_based_cap = int(memory_gb / 2) # Assume 2GB per instance
|
||||
return min(base_count, memory_based_cap)
|
||||
|
||||
def get_system_memory():
|
||||
system = platform.system()
|
||||
if system == "Linux":
|
||||
with open('/proc/meminfo', 'r') as mem:
|
||||
for line in mem:
|
||||
if line.startswith('MemTotal:'):
|
||||
return int(line.split()[1]) * 1024 # Convert KB to bytes
|
||||
elif system == "Darwin": # macOS
|
||||
import subprocess
|
||||
output = subprocess.check_output(['sysctl', '-n', 'hw.memsize']).decode('utf-8')
|
||||
return int(output.strip())
|
||||
elif system == "Windows":
|
||||
import ctypes
|
||||
kernel32 = ctypes.windll.kernel32
|
||||
c_ulonglong = ctypes.c_ulonglong
|
||||
class MEMORYSTATUSEX(ctypes.Structure):
|
||||
_fields_ = [
|
||||
('dwLength', ctypes.c_ulong),
|
||||
('dwMemoryLoad', ctypes.c_ulong),
|
||||
('ullTotalPhys', c_ulonglong),
|
||||
('ullAvailPhys', c_ulonglong),
|
||||
('ullTotalPageFile', c_ulonglong),
|
||||
('ullAvailPageFile', c_ulonglong),
|
||||
('ullTotalVirtual', c_ulonglong),
|
||||
('ullAvailVirtual', c_ulonglong),
|
||||
('ullAvailExtendedVirtual', c_ulonglong),
|
||||
]
|
||||
memoryStatus = MEMORYSTATUSEX()
|
||||
memoryStatus.dwLength = ctypes.sizeof(MEMORYSTATUSEX)
|
||||
kernel32.GlobalMemoryStatusEx(ctypes.byref(memoryStatus))
|
||||
return memoryStatus.ullTotalPhys
|
||||
else:
|
||||
raise OSError("Unsupported operating system")
|
||||
|
||||
def get_home_folder():
|
||||
home_folder = os.path.join(Path.home(), ".crawl4ai")
|
||||
|
||||
48
docs/examples/async_webcrawler_multiple_urls_example.py
Normal file
48
docs/examples/async_webcrawler_multiple_urls_example.py
Normal file
@@ -0,0 +1,48 @@
|
||||
# File: async_webcrawler_multiple_urls_example.py
|
||||
import os, sys
|
||||
# append 2 parent directories to sys.path to import crawl4ai
|
||||
parent_dir = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
||||
sys.path.append(parent_dir)
|
||||
|
||||
import asyncio
|
||||
from crawl4ai import AsyncWebCrawler
|
||||
|
||||
async def main():
|
||||
# Initialize the AsyncWebCrawler
|
||||
async with AsyncWebCrawler(verbose=True) as crawler:
|
||||
# List of URLs to crawl
|
||||
urls = [
|
||||
"https://example.com",
|
||||
"https://python.org",
|
||||
"https://github.com",
|
||||
"https://stackoverflow.com",
|
||||
"https://news.ycombinator.com"
|
||||
]
|
||||
|
||||
# Set up crawling parameters
|
||||
word_count_threshold = 100
|
||||
|
||||
# Run the crawling process for multiple URLs
|
||||
results = await crawler.arun_many(
|
||||
urls=urls,
|
||||
word_count_threshold=word_count_threshold,
|
||||
bypass_cache=True,
|
||||
verbose=True
|
||||
)
|
||||
|
||||
# Process the results
|
||||
for result in results:
|
||||
if result.success:
|
||||
print(f"Successfully crawled: {result.url}")
|
||||
print(f"Title: {result.metadata.get('title', 'N/A')}")
|
||||
print(f"Word count: {len(result.markdown.split())}")
|
||||
print(f"Number of links: {len(result.links.get('internal', [])) + len(result.links.get('external', []))}")
|
||||
print(f"Number of images: {len(result.media.get('images', []))}")
|
||||
print("---")
|
||||
else:
|
||||
print(f"Failed to crawl: {result.url}")
|
||||
print(f"Error: {result.error_message}")
|
||||
print("---")
|
||||
|
||||
if __name__ == "__main__":
|
||||
asyncio.run(main())
|
||||
45
docs/examples/language_support_example.py
Normal file
45
docs/examples/language_support_example.py
Normal file
@@ -0,0 +1,45 @@
|
||||
import asyncio
|
||||
from crawl4ai import AsyncWebCrawler, AsyncPlaywrightCrawlerStrategy
|
||||
|
||||
async def main():
|
||||
# Example 1: Setting language when creating the crawler
|
||||
crawler1 = AsyncWebCrawler(
|
||||
crawler_strategy=AsyncPlaywrightCrawlerStrategy(
|
||||
headers={"Accept-Language": "fr-FR,fr;q=0.9,en-US;q=0.8,en;q=0.7"}
|
||||
)
|
||||
)
|
||||
result1 = await crawler1.arun("https://www.example.com")
|
||||
print("Example 1 result:", result1.extracted_content[:100]) # Print first 100 characters
|
||||
|
||||
# Example 2: Setting language before crawling
|
||||
crawler2 = AsyncWebCrawler()
|
||||
crawler2.crawler_strategy.headers["Accept-Language"] = "es-ES,es;q=0.9,en-US;q=0.8,en;q=0.7"
|
||||
result2 = await crawler2.arun("https://www.example.com")
|
||||
print("Example 2 result:", result2.extracted_content[:100])
|
||||
|
||||
# Example 3: Setting language when calling arun method
|
||||
crawler3 = AsyncWebCrawler()
|
||||
result3 = await crawler3.arun(
|
||||
"https://www.example.com",
|
||||
headers={"Accept-Language": "de-DE,de;q=0.9,en-US;q=0.8,en;q=0.7"}
|
||||
)
|
||||
print("Example 3 result:", result3.extracted_content[:100])
|
||||
|
||||
# Example 4: Crawling multiple pages with different languages
|
||||
urls = [
|
||||
("https://www.example.com", "fr-FR,fr;q=0.9"),
|
||||
("https://www.example.org", "es-ES,es;q=0.9"),
|
||||
("https://www.example.net", "de-DE,de;q=0.9"),
|
||||
]
|
||||
|
||||
crawler4 = AsyncWebCrawler()
|
||||
results = await asyncio.gather(*[
|
||||
crawler4.arun(url, headers={"Accept-Language": lang})
|
||||
for url, lang in urls
|
||||
])
|
||||
|
||||
for url, result in zip([u for u, _ in urls], results):
|
||||
print(f"Result for {url}:", result.extracted_content[:100])
|
||||
|
||||
if __name__ == "__main__":
|
||||
asyncio.run(main())
|
||||
@@ -2,11 +2,9 @@ aiosqlite==0.20.0
|
||||
html2text==2024.2.26
|
||||
lxml==5.3.0
|
||||
litellm==1.48.0
|
||||
numpy==2.1.1
|
||||
numpy>=1.26.0,<2.1.1
|
||||
pillow==10.4.0
|
||||
playwright==1.47.0
|
||||
python-dotenv==1.0.1
|
||||
requests==2.32.3
|
||||
PyYAML==6.0.2
|
||||
beautifulsoup4==4.12.3
|
||||
psutil==6.0.0
|
||||
requests>=2.26.0,<2.32.3
|
||||
beautifulsoup4==4.12.3
|
||||
23
setup.py
23
setup.py
@@ -4,6 +4,7 @@ import os
|
||||
from pathlib import Path
|
||||
import shutil
|
||||
import subprocess
|
||||
import sys
|
||||
|
||||
# Create the .crawl4ai folder in the user's home directory if it doesn't exist
|
||||
# If the folder already exists, remove the cache folder
|
||||
@@ -35,21 +36,23 @@ transformer_requirements = ["transformers", "tokenizers", "onnxruntime"]
|
||||
cosine_similarity_requirements = ["torch", "transformers", "nltk", "spacy"]
|
||||
sync_requirements = ["selenium"]
|
||||
|
||||
def post_install():
|
||||
print("Running post-installation setup...")
|
||||
def install_playwright():
|
||||
print("Installing Playwright browsers...")
|
||||
try:
|
||||
subprocess.check_call(["playwright", "install"])
|
||||
subprocess.check_call([sys.executable, "-m", "playwright", "install"])
|
||||
print("Playwright installation completed successfully.")
|
||||
except subprocess.CalledProcessError:
|
||||
print("Error during Playwright installation. Please run 'playwright install' manually.")
|
||||
except FileNotFoundError:
|
||||
print("Playwright not found. Please ensure it's installed and run 'playwright install' manually.")
|
||||
except subprocess.CalledProcessError as e:
|
||||
print(f"Error during Playwright installation: {e}")
|
||||
print("Please run 'python -m playwright install' manually after the installation.")
|
||||
except Exception as e:
|
||||
print(f"Unexpected error during Playwright installation: {e}")
|
||||
print("Please run 'python -m playwright install' manually after the installation.")
|
||||
|
||||
class PostInstallCommand(install):
|
||||
def run(self):
|
||||
install.run(self)
|
||||
post_install()
|
||||
|
||||
install_playwright()
|
||||
|
||||
setup(
|
||||
name="Crawl4AI",
|
||||
version=version,
|
||||
@@ -61,7 +64,7 @@ setup(
|
||||
author_email="unclecode@kidocode.com",
|
||||
license="MIT",
|
||||
packages=find_packages(),
|
||||
install_requires=default_requirements,
|
||||
install_requires=default_requirements + ["playwright"], # Add playwright to default requirements
|
||||
extras_require={
|
||||
"torch": torch_requirements,
|
||||
"transformer": transformer_requirements,
|
||||
|
||||
@@ -5,7 +5,7 @@ import asyncio
|
||||
import time
|
||||
|
||||
# Add the parent directory to the Python path
|
||||
parent_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
|
||||
parent_dir = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
||||
sys.path.append(parent_dir)
|
||||
|
||||
from crawl4ai.async_webcrawler import AsyncWebCrawler
|
||||
|
||||
124
tests/async/test_screenshot.py
Normal file
124
tests/async/test_screenshot.py
Normal file
@@ -0,0 +1,124 @@
|
||||
import os
|
||||
import sys
|
||||
import pytest
|
||||
import asyncio
|
||||
import base64
|
||||
from PIL import Image
|
||||
import io
|
||||
|
||||
# Add the parent directory to the Python path
|
||||
parent_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
|
||||
sys.path.append(parent_dir)
|
||||
|
||||
from crawl4ai.async_webcrawler import AsyncWebCrawler
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_basic_screenshot():
|
||||
async with AsyncWebCrawler(verbose=True) as crawler:
|
||||
url = "https://example.com" # A static website
|
||||
result = await crawler.arun(url=url, bypass_cache=True, screenshot=True)
|
||||
|
||||
assert result.success
|
||||
assert result.screenshot is not None
|
||||
|
||||
# Verify the screenshot is a valid image
|
||||
image_data = base64.b64decode(result.screenshot)
|
||||
image = Image.open(io.BytesIO(image_data))
|
||||
assert image.format == "PNG"
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_screenshot_with_wait_for():
|
||||
async with AsyncWebCrawler(verbose=True) as crawler:
|
||||
# Using a website with dynamic content
|
||||
url = "https://www.youtube.com"
|
||||
wait_for = "css:#content" # Wait for the main content to load
|
||||
|
||||
result = await crawler.arun(
|
||||
url=url,
|
||||
bypass_cache=True,
|
||||
screenshot=True,
|
||||
wait_for=wait_for
|
||||
)
|
||||
|
||||
assert result.success
|
||||
assert result.screenshot is not None
|
||||
|
||||
# Verify the screenshot is a valid image
|
||||
image_data = base64.b64decode(result.screenshot)
|
||||
image = Image.open(io.BytesIO(image_data))
|
||||
assert image.format == "PNG"
|
||||
|
||||
# You might want to add more specific checks here, like image dimensions
|
||||
# or even use image recognition to verify certain elements are present
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_screenshot_with_js_wait_for():
|
||||
async with AsyncWebCrawler(verbose=True) as crawler:
|
||||
url = "https://www.amazon.com"
|
||||
wait_for = "js:() => document.querySelector('#nav-logo-sprites') !== null"
|
||||
|
||||
result = await crawler.arun(
|
||||
url=url,
|
||||
bypass_cache=True,
|
||||
screenshot=True,
|
||||
wait_for=wait_for
|
||||
)
|
||||
|
||||
assert result.success
|
||||
assert result.screenshot is not None
|
||||
|
||||
image_data = base64.b64decode(result.screenshot)
|
||||
image = Image.open(io.BytesIO(image_data))
|
||||
assert image.format == "PNG"
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_screenshot_without_wait_for():
|
||||
async with AsyncWebCrawler(verbose=True) as crawler:
|
||||
url = "https://www.nytimes.com" # A website with lots of dynamic content
|
||||
|
||||
result = await crawler.arun(url=url, bypass_cache=True, screenshot=True)
|
||||
|
||||
assert result.success
|
||||
assert result.screenshot is not None
|
||||
|
||||
image_data = base64.b64decode(result.screenshot)
|
||||
image = Image.open(io.BytesIO(image_data))
|
||||
assert image.format == "PNG"
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_screenshot_comparison():
|
||||
async with AsyncWebCrawler(verbose=True) as crawler:
|
||||
url = "https://www.reddit.com"
|
||||
wait_for = "css:#SHORTCUT_FOCUSABLE_DIV"
|
||||
|
||||
# Take screenshot without wait_for
|
||||
result_without_wait = await crawler.arun(
|
||||
url=url,
|
||||
bypass_cache=True,
|
||||
screenshot=True
|
||||
)
|
||||
|
||||
# Take screenshot with wait_for
|
||||
result_with_wait = await crawler.arun(
|
||||
url=url,
|
||||
bypass_cache=True,
|
||||
screenshot=True,
|
||||
wait_for=wait_for
|
||||
)
|
||||
|
||||
assert result_without_wait.success and result_with_wait.success
|
||||
assert result_without_wait.screenshot is not None
|
||||
assert result_with_wait.screenshot is not None
|
||||
|
||||
# Compare the two screenshots
|
||||
image_without_wait = Image.open(io.BytesIO(base64.b64decode(result_without_wait.screenshot)))
|
||||
image_with_wait = Image.open(io.BytesIO(base64.b64decode(result_with_wait.screenshot)))
|
||||
|
||||
# This is a simple size comparison. In a real-world scenario, you might want to use
|
||||
# more sophisticated image comparison techniques.
|
||||
assert image_with_wait.size[0] >= image_without_wait.size[0]
|
||||
assert image_with_wait.size[1] >= image_without_wait.size[1]
|
||||
|
||||
# Entry point for debugging
|
||||
if __name__ == "__main__":
|
||||
pytest.main([__file__, "-v"])
|
||||
Reference in New Issue
Block a user