Apply Ruff Corrections

This commit is contained in:
UncleCode
2025-01-13 19:19:58 +08:00
parent c3370ec5da
commit 8ec12d7d68
84 changed files with 6861 additions and 5076 deletions

View File

@@ -1,17 +1,18 @@
import os, sys
parent_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
sys.path.append(parent_dir)
__location__ = os.path.realpath( os.path.join(os.getcwd(), os.path.dirname(__file__)))
import os, sys
import os
import sys
import asyncio
from crawl4ai import AsyncWebCrawler, CacheMode
from crawl4ai.content_filter_strategy import PruningContentFilter
from crawl4ai.markdown_generation_strategy import DefaultMarkdownGenerator
# Assuming that the changes made allow different configurations
parent_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
sys.path.append(parent_dir)
__location__ = os.path.realpath(os.path.join(os.getcwd(), os.path.dirname(__file__)))
# Assuming that the changes made allow different configurations
# for managed browser, persistent context, and so forth.
async def test_default_headless():
async with AsyncWebCrawler(
headless=True,
@@ -24,13 +25,14 @@ async def test_default_headless():
# Testing normal ephemeral context
) as crawler:
result = await crawler.arun(
url='https://www.kidocode.com/degrees/technology',
url="https://www.kidocode.com/degrees/technology",
cache_mode=CacheMode.BYPASS,
markdown_generator=DefaultMarkdownGenerator(options={"ignore_links": True}),
)
print("[test_default_headless] success:", result.success)
print("HTML length:", len(result.html if result.html else ""))
async def test_managed_browser_persistent():
# Treating use_persistent_context=True as managed_browser scenario.
async with AsyncWebCrawler(
@@ -44,13 +46,14 @@ async def test_managed_browser_persistent():
# This should store and reuse profile data across runs
) as crawler:
result = await crawler.arun(
url='https://www.google.com',
url="https://www.google.com",
cache_mode=CacheMode.BYPASS,
markdown_generator=DefaultMarkdownGenerator(options={"ignore_links": True})
markdown_generator=DefaultMarkdownGenerator(options={"ignore_links": True}),
)
print("[test_managed_browser_persistent] success:", result.success)
print("HTML length:", len(result.html if result.html else ""))
async def test_session_reuse():
# Test creating a session, using it for multiple calls
session_id = "my_session"
@@ -62,25 +65,25 @@ async def test_session_reuse():
use_managed_browser=False,
use_persistent_context=False,
) as crawler:
# First call: create session
result1 = await crawler.arun(
url='https://www.example.com',
url="https://www.example.com",
cache_mode=CacheMode.BYPASS,
session_id=session_id,
markdown_generator=DefaultMarkdownGenerator(options={"ignore_links": True})
markdown_generator=DefaultMarkdownGenerator(options={"ignore_links": True}),
)
print("[test_session_reuse first call] success:", result1.success)
# Second call: same session, possibly cookie retained
result2 = await crawler.arun(
url='https://www.example.com/about',
url="https://www.example.com/about",
cache_mode=CacheMode.BYPASS,
session_id=session_id,
markdown_generator=DefaultMarkdownGenerator(options={"ignore_links": True})
markdown_generator=DefaultMarkdownGenerator(options={"ignore_links": True}),
)
print("[test_session_reuse second call] success:", result2.success)
async def test_magic_mode():
# Test magic mode with override_navigator and simulate_user
async with AsyncWebCrawler(
@@ -95,13 +98,14 @@ async def test_magic_mode():
simulate_user=True,
) as crawler:
result = await crawler.arun(
url='https://www.kidocode.com/degrees/business',
url="https://www.kidocode.com/degrees/business",
cache_mode=CacheMode.BYPASS,
markdown_generator=DefaultMarkdownGenerator(options={"ignore_links": True})
markdown_generator=DefaultMarkdownGenerator(options={"ignore_links": True}),
)
print("[test_magic_mode] success:", result.success)
print("HTML length:", len(result.html if result.html else ""))
async def test_proxy_settings():
# Test with a proxy (if available) to ensure code runs with proxy
async with AsyncWebCrawler(
@@ -113,14 +117,15 @@ async def test_proxy_settings():
use_persistent_context=False,
) as crawler:
result = await crawler.arun(
url='https://httpbin.org/ip',
url="https://httpbin.org/ip",
cache_mode=CacheMode.BYPASS,
markdown_generator=DefaultMarkdownGenerator(options={"ignore_links": True})
markdown_generator=DefaultMarkdownGenerator(options={"ignore_links": True}),
)
print("[test_proxy_settings] success:", result.success)
if result.success:
print("HTML preview:", result.html[:200] if result.html else "")
async def test_ignore_https_errors():
# Test ignore HTTPS errors with a self-signed or invalid cert domain
# This is just conceptual, the domain should be one that triggers SSL error.
@@ -134,12 +139,13 @@ async def test_ignore_https_errors():
use_persistent_context=False,
) as crawler:
result = await crawler.arun(
url='https://self-signed.badssl.com/',
url="https://self-signed.badssl.com/",
cache_mode=CacheMode.BYPASS,
markdown_generator=DefaultMarkdownGenerator(options={"ignore_links": True})
markdown_generator=DefaultMarkdownGenerator(options={"ignore_links": True}),
)
print("[test_ignore_https_errors] success:", result.success)
async def main():
print("Running tests...")
# await test_default_headless()
@@ -149,5 +155,6 @@ async def main():
# await test_proxy_settings()
await test_ignore_https_errors()
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -1,15 +1,16 @@
import os, sys
parent_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
sys.path.append(parent_dir)
__location__ = os.path.realpath(os.path.join(os.getcwd(), os.path.dirname(__file__)))
import asyncio
from crawl4ai import AsyncWebCrawler, CacheMode
from crawl4ai.async_configs import BrowserConfig, CrawlerRunConfig
from crawl4ai.async_configs import BrowserConfig, CrawlerRunConfig
from crawl4ai.content_filter_strategy import PruningContentFilter
from crawl4ai.extraction_strategy import JsonCssExtractionStrategy
from crawl4ai.chunking_strategy import RegexChunking
from crawl4ai.markdown_generation_strategy import DefaultMarkdownGenerator
# Category 1: Browser Configuration Tests
async def test_browser_config_object():
@@ -21,29 +22,31 @@ async def test_browser_config_object():
viewport_height=1080,
use_managed_browser=True,
user_agent_mode="random",
user_agent_generator_config={"device_type": "desktop", "os_type": "windows"}
user_agent_generator_config={"device_type": "desktop", "os_type": "windows"},
)
async with AsyncWebCrawler(config=browser_config, verbose=True) as crawler:
result = await crawler.arun('https://example.com', cache_mode=CacheMode.BYPASS)
result = await crawler.arun("https://example.com", cache_mode=CacheMode.BYPASS)
assert result.success, "Browser config crawl failed"
assert len(result.html) > 0, "No HTML content retrieved"
async def test_browser_performance_config():
"""Test browser configurations focused on performance"""
browser_config = BrowserConfig(
text_mode=True,
light_mode=True,
extra_args=['--disable-gpu', '--disable-software-rasterizer'],
extra_args=["--disable-gpu", "--disable-software-rasterizer"],
ignore_https_errors=True,
java_script_enabled=False
java_script_enabled=False,
)
async with AsyncWebCrawler(config=browser_config) as crawler:
result = await crawler.arun('https://example.com')
result = await crawler.arun("https://example.com")
assert result.success, "Performance optimized crawl failed"
assert result.status_code == 200, "Unexpected status code"
# Category 2: Content Processing Tests
async def test_content_extraction_config():
"""Test content extraction with various strategies"""
@@ -53,24 +56,20 @@ async def test_content_extraction_config():
schema={
"name": "article",
"baseSelector": "div",
"fields": [{
"name": "title",
"selector": "h1",
"type": "text"
}]
"fields": [{"name": "title", "selector": "h1", "type": "text"}],
}
),
chunking_strategy=RegexChunking(),
content_filter=PruningContentFilter()
content_filter=PruningContentFilter(),
)
async with AsyncWebCrawler() as crawler:
result = await crawler.arun(
'https://example.com/article',
config=crawler_config
"https://example.com/article", config=crawler_config
)
assert result.extracted_content is not None, "Content extraction failed"
assert 'title' in result.extracted_content, "Missing expected content field"
assert "title" in result.extracted_content, "Missing expected content field"
# Category 3: Cache and Session Management Tests
async def test_cache_and_session_management():
@@ -79,25 +78,20 @@ async def test_cache_and_session_management():
crawler_config = CrawlerRunConfig(
cache_mode=CacheMode.WRITE_ONLY,
process_iframes=True,
remove_overlay_elements=True
remove_overlay_elements=True,
)
async with AsyncWebCrawler(config=browser_config) as crawler:
# First request - should write to cache
result1 = await crawler.arun(
'https://example.com',
config=crawler_config
)
result1 = await crawler.arun("https://example.com", config=crawler_config)
# Second request - should use fresh fetch due to WRITE_ONLY mode
result2 = await crawler.arun(
'https://example.com',
config=crawler_config
)
result2 = await crawler.arun("https://example.com", config=crawler_config)
assert result1.success and result2.success, "Cache mode crawl failed"
assert result1.html == result2.html, "Inconsistent results between requests"
# Category 4: Media Handling Tests
async def test_media_handling_config():
"""Test configurations related to media handling"""
@@ -107,24 +101,22 @@ async def test_media_handling_config():
viewport_width=1920,
viewport_height=1080,
accept_downloads=True,
downloads_path= os.path.expanduser("~/.crawl4ai/downloads")
downloads_path=os.path.expanduser("~/.crawl4ai/downloads"),
)
crawler_config = CrawlerRunConfig(
screenshot=True,
pdf=True,
adjust_viewport_to_content=True,
wait_for_images=True,
screenshot_height_threshold=20000
screenshot_height_threshold=20000,
)
async with AsyncWebCrawler(config=browser_config) as crawler:
result = await crawler.arun(
'https://example.com',
config=crawler_config
)
result = await crawler.arun("https://example.com", config=crawler_config)
assert result.screenshot is not None, "Screenshot capture failed"
assert result.pdf is not None, "PDF generation failed"
# Category 5: Anti-Bot and Site Interaction Tests
async def test_antibot_config():
"""Test configurations for handling anti-bot measures"""
@@ -135,76 +127,64 @@ async def test_antibot_config():
wait_for="js:()=>document.querySelector('body')",
delay_before_return_html=1.0,
log_console=True,
cache_mode=CacheMode.BYPASS
cache_mode=CacheMode.BYPASS,
)
async with AsyncWebCrawler() as crawler:
result = await crawler.arun(
'https://example.com',
config=crawler_config
)
result = await crawler.arun("https://example.com", config=crawler_config)
assert result.success, "Anti-bot measure handling failed"
# Category 6: Parallel Processing Tests
async def test_parallel_processing():
"""Test parallel processing capabilities"""
crawler_config = CrawlerRunConfig(
mean_delay=0.5,
max_range=1.0,
semaphore_count=5
)
urls = [
'https://example.com/1',
'https://example.com/2',
'https://example.com/3'
]
crawler_config = CrawlerRunConfig(mean_delay=0.5, max_range=1.0, semaphore_count=5)
urls = ["https://example.com/1", "https://example.com/2", "https://example.com/3"]
async with AsyncWebCrawler() as crawler:
results = await crawler.arun_many(
urls,
config=crawler_config
)
results = await crawler.arun_many(urls, config=crawler_config)
assert len(results) == len(urls), "Not all URLs were processed"
assert all(r.success for r in results), "Some parallel requests failed"
# Category 7: Backwards Compatibility Tests
async def test_legacy_parameter_support():
"""Test that legacy parameters still work"""
async with AsyncWebCrawler(
headless=True,
browser_type="chromium",
viewport_width=1024,
viewport_height=768
headless=True, browser_type="chromium", viewport_width=1024, viewport_height=768
) as crawler:
result = await crawler.arun(
'https://example.com',
"https://example.com",
screenshot=True,
word_count_threshold=200,
bypass_cache=True,
css_selector=".main-content"
css_selector=".main-content",
)
assert result.success, "Legacy parameter support failed"
# Category 8: Mixed Configuration Tests
async def test_mixed_config_usage():
"""Test mixing new config objects with legacy parameters"""
browser_config = BrowserConfig(headless=True)
crawler_config = CrawlerRunConfig(screenshot=True)
async with AsyncWebCrawler(
config=browser_config,
verbose=True # legacy parameter
verbose=True, # legacy parameter
) as crawler:
result = await crawler.arun(
'https://example.com',
"https://example.com",
config=crawler_config,
cache_mode=CacheMode.BYPASS, # legacy parameter
css_selector="body" # legacy parameter
css_selector="body", # legacy parameter
)
assert result.success, "Mixed configuration usage failed"
if __name__ == "__main__":
async def run_tests():
test_functions = [
test_browser_config_object,
@@ -217,7 +197,7 @@ if __name__ == "__main__":
# test_legacy_parameter_support,
# test_mixed_config_usage
]
for test in test_functions:
print(f"\nRunning {test.__name__}...")
try:
@@ -227,5 +207,5 @@ if __name__ == "__main__":
print(f"{test.__name__} failed: {str(e)}")
except Exception as e:
print(f"{test.__name__} error: {str(e)}")
asyncio.run(run_tests())
asyncio.run(run_tests())

View File

@@ -4,7 +4,6 @@ import asyncio
import shutil
from typing import List
import tempfile
import time
# Add the parent directory to the Python path
parent_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
@@ -12,28 +11,27 @@ sys.path.append(parent_dir)
from crawl4ai.async_webcrawler import AsyncWebCrawler
class TestDownloads:
def __init__(self):
self.temp_dir = tempfile.mkdtemp(prefix="crawl4ai_test_")
self.download_dir = os.path.join(self.temp_dir, "downloads")
os.makedirs(self.download_dir, exist_ok=True)
self.results: List[str] = []
def cleanup(self):
shutil.rmtree(self.temp_dir)
def log_result(self, test_name: str, success: bool, message: str = ""):
result = f"{'' if success else ''} {test_name}: {message}"
self.results.append(result)
print(result)
async def test_basic_download(self):
"""Test basic file download functionality"""
try:
async with AsyncWebCrawler(
accept_downloads=True,
downloads_path=self.download_dir,
verbose=True
accept_downloads=True, downloads_path=self.download_dir, verbose=True
) as crawler:
# Python.org downloads page typically has stable download links
result = await crawler.arun(
@@ -42,14 +40,19 @@ class TestDownloads:
// Click first download link
const downloadLink = document.querySelector('a[href$=".exe"]');
if (downloadLink) downloadLink.click();
"""
""",
)
success = (
result.downloaded_files is not None
and len(result.downloaded_files) > 0
)
success = result.downloaded_files is not None and len(result.downloaded_files) > 0
self.log_result(
"Basic Download",
success,
f"Downloaded {len(result.downloaded_files or [])} files" if success else "No files downloaded"
f"Downloaded {len(result.downloaded_files or [])} files"
if success
else "No files downloaded",
)
except Exception as e:
self.log_result("Basic Download", False, str(e))
@@ -59,27 +62,32 @@ class TestDownloads:
try:
user_data_dir = os.path.join(self.temp_dir, "user_data")
os.makedirs(user_data_dir, exist_ok=True)
async with AsyncWebCrawler(
accept_downloads=True,
downloads_path=self.download_dir,
use_persistent_context=True,
user_data_dir=user_data_dir,
verbose=True
verbose=True,
) as crawler:
result = await crawler.arun(
url="https://www.python.org/downloads/",
js_code="""
const downloadLink = document.querySelector('a[href$=".exe"]');
if (downloadLink) downloadLink.click();
"""
""",
)
success = (
result.downloaded_files is not None
and len(result.downloaded_files) > 0
)
success = result.downloaded_files is not None and len(result.downloaded_files) > 0
self.log_result(
"Persistent Context Download",
success,
f"Downloaded {len(result.downloaded_files or [])} files" if success else "No files downloaded"
f"Downloaded {len(result.downloaded_files or [])} files"
if success
else "No files downloaded",
)
except Exception as e:
self.log_result("Persistent Context Download", False, str(e))
@@ -88,9 +96,7 @@ class TestDownloads:
"""Test multiple simultaneous downloads"""
try:
async with AsyncWebCrawler(
accept_downloads=True,
downloads_path=self.download_dir,
verbose=True
accept_downloads=True, downloads_path=self.download_dir, verbose=True
) as crawler:
result = await crawler.arun(
url="https://www.python.org/downloads/",
@@ -98,14 +104,19 @@ class TestDownloads:
// Click multiple download links
const downloadLinks = document.querySelectorAll('a[href$=".exe"]');
downloadLinks.forEach(link => link.click());
"""
""",
)
success = (
result.downloaded_files is not None
and len(result.downloaded_files) > 1
)
success = result.downloaded_files is not None and len(result.downloaded_files) > 1
self.log_result(
"Multiple Downloads",
success,
f"Downloaded {len(result.downloaded_files or [])} files" if success else "Not enough files downloaded"
f"Downloaded {len(result.downloaded_files or [])} files"
if success
else "Not enough files downloaded",
)
except Exception as e:
self.log_result("Multiple Downloads", False, str(e))
@@ -113,49 +124,51 @@ class TestDownloads:
async def test_different_browsers(self):
"""Test downloads across different browser types"""
browsers = ["chromium", "firefox", "webkit"]
for browser_type in browsers:
try:
async with AsyncWebCrawler(
accept_downloads=True,
downloads_path=self.download_dir,
browser_type=browser_type,
verbose=True
verbose=True,
) as crawler:
result = await crawler.arun(
url="https://www.python.org/downloads/",
js_code="""
const downloadLink = document.querySelector('a[href$=".exe"]');
if (downloadLink) downloadLink.click();
"""
""",
)
success = (
result.downloaded_files is not None
and len(result.downloaded_files) > 0
)
success = result.downloaded_files is not None and len(result.downloaded_files) > 0
self.log_result(
f"{browser_type.title()} Download",
success,
f"Downloaded {len(result.downloaded_files or [])} files" if success else "No files downloaded"
f"Downloaded {len(result.downloaded_files or [])} files"
if success
else "No files downloaded",
)
except Exception as e:
self.log_result(f"{browser_type.title()} Download", False, str(e))
async def test_edge_cases(self):
"""Test various edge cases"""
# Test 1: Downloads without specifying download path
try:
async with AsyncWebCrawler(
accept_downloads=True,
verbose=True
) as crawler:
async with AsyncWebCrawler(accept_downloads=True, verbose=True) as crawler:
result = await crawler.arun(
url="https://www.python.org/downloads/",
js_code="document.querySelector('a[href$=\".exe\"]').click()"
js_code="document.querySelector('a[href$=\".exe\"]').click()",
)
self.log_result(
"Default Download Path",
True,
f"Downloaded to default path: {result.downloaded_files[0] if result.downloaded_files else 'None'}"
f"Downloaded to default path: {result.downloaded_files[0] if result.downloaded_files else 'None'}",
)
except Exception as e:
self.log_result("Default Download Path", False, str(e))
@@ -165,31 +178,34 @@ class TestDownloads:
async with AsyncWebCrawler(
accept_downloads=True,
downloads_path="/invalid/path/that/doesnt/exist",
verbose=True
verbose=True,
) as crawler:
result = await crawler.arun(
url="https://www.python.org/downloads/",
js_code="document.querySelector('a[href$=\".exe\"]').click()"
js_code="document.querySelector('a[href$=\".exe\"]').click()",
)
self.log_result("Invalid Download Path", False, "Should have raised an error")
except Exception as e:
self.log_result("Invalid Download Path", True, "Correctly handled invalid path")
self.log_result(
"Invalid Download Path", False, "Should have raised an error"
)
except Exception:
self.log_result(
"Invalid Download Path", True, "Correctly handled invalid path"
)
# Test 3: Download with accept_downloads=False
try:
async with AsyncWebCrawler(
accept_downloads=False,
verbose=True
) as crawler:
async with AsyncWebCrawler(accept_downloads=False, verbose=True) as crawler:
result = await crawler.arun(
url="https://www.python.org/downloads/",
js_code="document.querySelector('a[href$=\".exe\"]').click()"
js_code="document.querySelector('a[href$=\".exe\"]').click()",
)
success = result.downloaded_files is None
self.log_result(
"Disabled Downloads",
success,
"Correctly ignored downloads" if success else "Unexpectedly downloaded files"
"Correctly ignored downloads"
if success
else "Unexpectedly downloaded files",
)
except Exception as e:
self.log_result("Disabled Downloads", False, str(e))
@@ -197,33 +213,35 @@ class TestDownloads:
async def run_all_tests(self):
"""Run all test cases"""
print("\n🧪 Running Download Tests...\n")
test_methods = [
self.test_basic_download,
self.test_persistent_context_download,
self.test_multiple_downloads,
self.test_different_browsers,
self.test_edge_cases
self.test_edge_cases,
]
for test in test_methods:
print(f"\n📝 Running {test.__doc__}...")
await test()
await asyncio.sleep(2) # Brief pause between tests
print("\n📊 Test Results Summary:")
for result in self.results:
print(result)
successes = len([r for r in self.results if '' in r])
successes = len([r for r in self.results if "" in r])
total = len(self.results)
print(f"\nTotal: {successes}/{total} tests passed")
self.cleanup()
async def main():
tester = TestDownloads()
await tester.run_all_tests()
if __name__ == "__main__":
asyncio.run(main())
asyncio.run(main())

View File

@@ -1,15 +1,17 @@
import os
import sys
import pytest
import asyncio
import time
# Add the parent directory to the Python path
parent_dir = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
parent_dir = os.path.dirname(
os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
)
sys.path.append(parent_dir)
from crawl4ai.async_webcrawler import AsyncWebCrawler
@pytest.mark.asyncio
async def test_successful_crawl():
async with AsyncWebCrawler(verbose=True) as crawler:
@@ -21,6 +23,7 @@ async def test_successful_crawl():
assert result.markdown
assert result.cleaned_html
@pytest.mark.asyncio
async def test_invalid_url():
async with AsyncWebCrawler(verbose=True) as crawler:
@@ -29,19 +32,21 @@ async def test_invalid_url():
assert not result.success
assert result.error_message
@pytest.mark.asyncio
async def test_multiple_urls():
async with AsyncWebCrawler(verbose=True) as crawler:
urls = [
"https://www.nbcnews.com/business",
"https://www.example.com",
"https://www.python.org"
"https://www.python.org",
]
results = await crawler.arun_many(urls=urls, bypass_cache=True)
assert len(results) == len(urls)
assert all(result.success for result in results)
assert all(result.html for result in results)
@pytest.mark.asyncio
async def test_javascript_execution():
async with AsyncWebCrawler(verbose=True) as crawler:
@@ -51,6 +56,7 @@ async def test_javascript_execution():
assert result.success
assert "<h1>Modified by JS</h1>" in result.html
@pytest.mark.asyncio
async def test_concurrent_crawling_performance():
async with AsyncWebCrawler(verbose=True) as crawler:
@@ -59,23 +65,26 @@ async def test_concurrent_crawling_performance():
"https://www.example.com",
"https://www.python.org",
"https://www.github.com",
"https://www.stackoverflow.com"
"https://www.stackoverflow.com",
]
start_time = time.time()
results = await crawler.arun_many(urls=urls, bypass_cache=True)
end_time = time.time()
total_time = end_time - start_time
print(f"Total time for concurrent crawling: {total_time:.2f} seconds")
assert all(result.success for result in results)
assert len(results) == len(urls)
# Assert that concurrent crawling is faster than sequential
# This multiplier may need adjustment based on the number of URLs and their complexity
assert total_time < len(urls) * 5, f"Concurrent crawling not significantly faster: {total_time:.2f} seconds"
assert (
total_time < len(urls) * 5
), f"Concurrent crawling not significantly faster: {total_time:.2f} seconds"
# Entry point for debugging
if __name__ == "__main__":
pytest.main([__file__, "-v"])
pytest.main([__file__, "-v"])

View File

@@ -9,74 +9,79 @@ sys.path.append(parent_dir)
from crawl4ai.async_webcrawler import AsyncWebCrawler
@pytest.mark.asyncio
async def test_caching():
async with AsyncWebCrawler(verbose=True) as crawler:
url = "https://www.nbcnews.com/business"
# First crawl (should not use cache)
start_time = asyncio.get_event_loop().time()
result1 = await crawler.arun(url=url, bypass_cache=True)
end_time = asyncio.get_event_loop().time()
time_taken1 = end_time - start_time
assert result1.success
# Second crawl (should use cache)
start_time = asyncio.get_event_loop().time()
result2 = await crawler.arun(url=url, bypass_cache=False)
end_time = asyncio.get_event_loop().time()
time_taken2 = end_time - start_time
assert result2.success
assert time_taken2 < time_taken1 # Cached result should be faster
@pytest.mark.asyncio
async def test_bypass_cache():
async with AsyncWebCrawler(verbose=True) as crawler:
url = "https://www.nbcnews.com/business"
# First crawl
result1 = await crawler.arun(url=url, bypass_cache=False)
assert result1.success
# Second crawl with bypass_cache=True
result2 = await crawler.arun(url=url, bypass_cache=True)
assert result2.success
# Content should be different (or at least, not guaranteed to be the same)
assert result1.html != result2.html or result1.markdown != result2.markdown
@pytest.mark.asyncio
async def test_clear_cache():
async with AsyncWebCrawler(verbose=True) as crawler:
url = "https://www.nbcnews.com/business"
# Crawl and cache
await crawler.arun(url=url, bypass_cache=False)
# Clear cache
await crawler.aclear_cache()
# Check cache size
cache_size = await crawler.aget_cache_size()
assert cache_size == 0
@pytest.mark.asyncio
async def test_flush_cache():
async with AsyncWebCrawler(verbose=True) as crawler:
url = "https://www.nbcnews.com/business"
# Crawl and cache
await crawler.arun(url=url, bypass_cache=False)
# Flush cache
await crawler.aflush_cache()
# Check cache size
cache_size = await crawler.aget_cache_size()
assert cache_size == 0
# Entry point for debugging
if __name__ == "__main__":
pytest.main([__file__, "-v"])
pytest.main([__file__, "-v"])

View File

@@ -1,7 +1,6 @@
import os
import sys
import pytest
import asyncio
import json
# Add the parent directory to the Python path
@@ -9,8 +8,9 @@ parent_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
sys.path.append(parent_dir)
from crawl4ai.async_webcrawler import AsyncWebCrawler
from crawl4ai.chunking_strategy import RegexChunking, NlpSentenceChunking
from crawl4ai.extraction_strategy import CosineStrategy, LLMExtractionStrategy
from crawl4ai.chunking_strategy import RegexChunking
from crawl4ai.extraction_strategy import LLMExtractionStrategy
@pytest.mark.asyncio
async def test_regex_chunking():
@@ -18,15 +18,14 @@ async def test_regex_chunking():
url = "https://www.nbcnews.com/business"
chunking_strategy = RegexChunking(patterns=["\n\n"])
result = await crawler.arun(
url=url,
chunking_strategy=chunking_strategy,
bypass_cache=True
url=url, chunking_strategy=chunking_strategy, bypass_cache=True
)
assert result.success
assert result.extracted_content
chunks = json.loads(result.extracted_content)
assert len(chunks) > 1 # Ensure multiple chunks were created
# @pytest.mark.asyncio
# async def test_cosine_strategy():
# async with AsyncWebCrawler(verbose=True) as crawler:
@@ -43,25 +42,25 @@ async def test_regex_chunking():
# assert len(extracted_data) > 0
# assert all('tags' in item for item in extracted_data)
@pytest.mark.asyncio
async def test_llm_extraction_strategy():
async with AsyncWebCrawler(verbose=True) as crawler:
url = "https://www.nbcnews.com/business"
extraction_strategy = LLMExtractionStrategy(
provider="openai/gpt-4o-mini",
api_token=os.getenv('OPENAI_API_KEY'),
instruction="Extract only content related to technology"
api_token=os.getenv("OPENAI_API_KEY"),
instruction="Extract only content related to technology",
)
result = await crawler.arun(
url=url,
extraction_strategy=extraction_strategy,
bypass_cache=True
url=url, extraction_strategy=extraction_strategy, bypass_cache=True
)
assert result.success
assert result.extracted_content
extracted_data = json.loads(result.extracted_content)
assert len(extracted_data) > 0
assert all('content' in item for item in extracted_data)
assert all("content" in item for item in extracted_data)
# @pytest.mark.asyncio
# async def test_combined_chunking_and_extraction():
@@ -84,4 +83,4 @@ async def test_llm_extraction_strategy():
# Entry point for debugging
if __name__ == "__main__":
pytest.main([__file__, "-v"])
pytest.main([__file__, "-v"])

View File

@@ -1,8 +1,6 @@
import os
import sys
import pytest
import asyncio
import json
# Add the parent directory to the Python path
parent_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
@@ -10,6 +8,7 @@ sys.path.append(parent_dir)
from crawl4ai.async_webcrawler import AsyncWebCrawler
@pytest.mark.asyncio
async def test_extract_markdown():
async with AsyncWebCrawler(verbose=True) as crawler:
@@ -20,6 +19,7 @@ async def test_extract_markdown():
assert isinstance(result.markdown, str)
assert len(result.markdown) > 0
@pytest.mark.asyncio
async def test_extract_cleaned_html():
async with AsyncWebCrawler(verbose=True) as crawler:
@@ -30,6 +30,7 @@ async def test_extract_cleaned_html():
assert isinstance(result.cleaned_html, str)
assert len(result.cleaned_html) > 0
@pytest.mark.asyncio
async def test_extract_media():
async with AsyncWebCrawler(verbose=True) as crawler:
@@ -46,6 +47,7 @@ async def test_extract_media():
assert "alt" in image
assert "type" in image
@pytest.mark.asyncio
async def test_extract_links():
async with AsyncWebCrawler(verbose=True) as crawler:
@@ -63,6 +65,7 @@ async def test_extract_links():
assert "href" in link
assert "text" in link
@pytest.mark.asyncio
async def test_extract_metadata():
async with AsyncWebCrawler(verbose=True) as crawler:
@@ -75,16 +78,20 @@ async def test_extract_metadata():
assert "title" in metadata
assert isinstance(metadata["title"], str)
@pytest.mark.asyncio
async def test_css_selector_extraction():
async with AsyncWebCrawler(verbose=True) as crawler:
url = "https://www.nbcnews.com/business"
css_selector = "h1, h2, h3"
result = await crawler.arun(url=url, bypass_cache=True, css_selector=css_selector)
result = await crawler.arun(
url=url, bypass_cache=True, css_selector=css_selector
)
assert result.success
assert result.markdown
assert all(heading in result.markdown for heading in ["#", "##", "###"])
# Entry point for debugging
if __name__ == "__main__":
pytest.main([__file__, "-v"])
pytest.main([__file__, "-v"])

View File

@@ -1,7 +1,6 @@
import os, sys
import pytest
from bs4 import BeautifulSoup
from typing import List
# Add the parent directory to the Python path
parent_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
@@ -9,6 +8,7 @@ sys.path.append(parent_dir)
from crawl4ai.content_filter_strategy import BM25ContentFilter
@pytest.fixture
def basic_html():
return """
@@ -28,6 +28,7 @@ def basic_html():
</html>
"""
@pytest.fixture
def wiki_html():
return """
@@ -46,6 +47,7 @@ def wiki_html():
</html>
"""
@pytest.fixture
def no_meta_html():
return """
@@ -57,26 +59,27 @@ def no_meta_html():
</html>
"""
class TestBM25ContentFilter:
def test_basic_extraction(self, basic_html):
"""Test basic content extraction functionality"""
filter = BM25ContentFilter()
contents = filter.filter_content(basic_html)
assert contents, "Should extract content"
assert len(contents) >= 1, "Should extract at least one content block"
assert "long paragraph" in ' '.join(contents).lower()
assert "navigation" not in ' '.join(contents).lower()
assert "long paragraph" in " ".join(contents).lower()
assert "navigation" not in " ".join(contents).lower()
def test_user_query_override(self, basic_html):
"""Test that user query overrides metadata extraction"""
user_query = "specific test query"
filter = BM25ContentFilter(user_query=user_query)
# Access internal state to verify query usage
soup = BeautifulSoup(basic_html, 'lxml')
extracted_query = filter.extract_page_query(soup.find('head'))
soup = BeautifulSoup(basic_html, "lxml")
extracted_query = filter.extract_page_query(soup.find("head"))
assert extracted_query == user_query
assert "Test description" not in extracted_query
@@ -84,8 +87,8 @@ class TestBM25ContentFilter:
"""Test that headers are properly extracted despite length"""
filter = BM25ContentFilter()
contents = filter.filter_content(wiki_html)
combined_content = ' '.join(contents).lower()
combined_content = " ".join(contents).lower()
assert "section 1" in combined_content, "Should include section header"
assert "article title" in combined_content, "Should include main title"
@@ -93,9 +96,11 @@ class TestBM25ContentFilter:
"""Test fallback behavior when no metadata is present"""
filter = BM25ContentFilter()
contents = filter.filter_content(no_meta_html)
assert contents, "Should extract content even without metadata"
assert "First paragraph" in ' '.join(contents), "Should use first paragraph content"
assert "First paragraph" in " ".join(
contents
), "Should use first paragraph content"
def test_empty_input(self):
"""Test handling of empty input"""
@@ -108,29 +113,30 @@ class TestBM25ContentFilter:
malformed_html = "<p>Unclosed paragraph<div>Nested content</p></div>"
filter = BM25ContentFilter()
contents = filter.filter_content(malformed_html)
assert isinstance(contents, list), "Should return list even with malformed HTML"
def test_threshold_behavior(self, basic_html):
"""Test different BM25 threshold values"""
strict_filter = BM25ContentFilter(bm25_threshold=2.0)
lenient_filter = BM25ContentFilter(bm25_threshold=0.5)
strict_contents = strict_filter.filter_content(basic_html)
lenient_contents = lenient_filter.filter_content(basic_html)
assert len(strict_contents) <= len(lenient_contents), \
"Strict threshold should extract fewer elements"
assert len(strict_contents) <= len(
lenient_contents
), "Strict threshold should extract fewer elements"
def test_html_cleaning(self, basic_html):
"""Test HTML cleaning functionality"""
filter = BM25ContentFilter()
contents = filter.filter_content(basic_html)
cleaned_content = ' '.join(contents)
assert 'class=' not in cleaned_content, "Should remove class attributes"
assert 'style=' not in cleaned_content, "Should remove style attributes"
assert '<script' not in cleaned_content, "Should remove script tags"
cleaned_content = " ".join(contents)
assert "class=" not in cleaned_content, "Should remove class attributes"
assert "style=" not in cleaned_content, "Should remove style attributes"
assert "<script" not in cleaned_content, "Should remove script tags"
def test_large_content(self):
"""Test handling of large content blocks"""
@@ -143,9 +149,9 @@ class TestBM25ContentFilter:
contents = filter.filter_content(large_html)
assert contents, "Should handle large content blocks"
@pytest.mark.parametrize("unwanted_tag", [
'script', 'style', 'nav', 'footer', 'header'
])
@pytest.mark.parametrize(
"unwanted_tag", ["script", "style", "nav", "footer", "header"]
)
def test_excluded_tags(self, unwanted_tag):
"""Test that specific tags are properly excluded"""
html = f"""
@@ -156,20 +162,22 @@ class TestBM25ContentFilter:
"""
filter = BM25ContentFilter()
contents = filter.filter_content(html)
combined_content = ' '.join(contents).lower()
combined_content = " ".join(contents).lower()
assert "should not appear" not in combined_content
def test_performance(self, basic_html):
"""Test performance with timer"""
filter = BM25ContentFilter()
import time
start = time.perf_counter()
filter.filter_content(basic_html)
duration = time.perf_counter() - start
assert duration < 1.0, f"Processing took too long: {duration:.2f} seconds"
if __name__ == "__main__":
pytest.main([__file__])
pytest.main([__file__])

View File

@@ -1,12 +1,12 @@
import os, sys
import pytest
from bs4 import BeautifulSoup
parent_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
sys.path.append(parent_dir)
from crawl4ai.content_filter_strategy import PruningContentFilter
@pytest.fixture
def basic_html():
return """
@@ -22,6 +22,7 @@ def basic_html():
</html>
"""
@pytest.fixture
def link_heavy_html():
return """
@@ -40,6 +41,7 @@ def link_heavy_html():
</html>
"""
@pytest.fixture
def mixed_content_html():
return """
@@ -60,13 +62,14 @@ def mixed_content_html():
</html>
"""
class TestPruningContentFilter:
def test_basic_pruning(self, basic_html):
"""Test basic content pruning functionality"""
filter = PruningContentFilter(min_word_threshold=5)
contents = filter.filter_content(basic_html)
combined_content = ' '.join(contents).lower()
combined_content = " ".join(contents).lower()
assert "high-quality paragraph" in combined_content
assert "sidebar content" not in combined_content
assert "share buttons" not in combined_content
@@ -75,40 +78,42 @@ class TestPruningContentFilter:
"""Test minimum word threshold filtering"""
filter = PruningContentFilter(min_word_threshold=10)
contents = filter.filter_content(mixed_content_html)
combined_content = ' '.join(contents).lower()
combined_content = " ".join(contents).lower()
assert "short summary" not in combined_content
assert "long high-quality paragraph" in combined_content
assert "short comment" not in combined_content
def test_threshold_types(self, basic_html):
"""Test fixed vs dynamic thresholds"""
fixed_filter = PruningContentFilter(threshold_type='fixed', threshold=0.48)
dynamic_filter = PruningContentFilter(threshold_type='dynamic', threshold=0.45)
fixed_filter = PruningContentFilter(threshold_type="fixed", threshold=0.48)
dynamic_filter = PruningContentFilter(threshold_type="dynamic", threshold=0.45)
fixed_contents = fixed_filter.filter_content(basic_html)
dynamic_contents = dynamic_filter.filter_content(basic_html)
assert len(fixed_contents) != len(dynamic_contents), \
"Fixed and dynamic thresholds should yield different results"
assert len(fixed_contents) != len(
dynamic_contents
), "Fixed and dynamic thresholds should yield different results"
def test_link_density_impact(self, link_heavy_html):
"""Test handling of link-heavy content"""
filter = PruningContentFilter(threshold_type='dynamic')
filter = PruningContentFilter(threshold_type="dynamic")
contents = filter.filter_content(link_heavy_html)
combined_content = ' '.join(contents).lower()
combined_content = " ".join(contents).lower()
assert "good content paragraph" in combined_content
assert len([c for c in contents if 'href' in c]) < 2, \
"Should prune link-heavy sections"
assert (
len([c for c in contents if "href" in c]) < 2
), "Should prune link-heavy sections"
def test_tag_importance(self, mixed_content_html):
"""Test tag importance in scoring"""
filter = PruningContentFilter(threshold_type='dynamic')
filter = PruningContentFilter(threshold_type="dynamic")
contents = filter.filter_content(mixed_content_html)
has_article = any('article' in c.lower() for c in contents)
has_h1 = any('h1' in c.lower() for c in contents)
has_article = any("article" in c.lower() for c in contents)
has_h1 = any("h1" in c.lower() for c in contents)
assert has_article or has_h1, "Should retain important tags"
def test_empty_input(self):
@@ -127,26 +132,31 @@ class TestPruningContentFilter:
def test_performance(self, basic_html):
"""Test performance with timer"""
filter = PruningContentFilter()
import time
start = time.perf_counter()
filter.filter_content(basic_html)
duration = time.perf_counter() - start
# Extra strict on performance since you mentioned milliseconds matter
assert duration < 0.1, f"Processing took too long: {duration:.3f} seconds"
@pytest.mark.parametrize("threshold,expected_count", [
(0.3, 4), # Very lenient
(0.48, 2), # Default
(0.7, 1), # Very strict
])
@pytest.mark.parametrize(
"threshold,expected_count",
[
(0.3, 4), # Very lenient
(0.48, 2), # Default
(0.7, 1), # Very strict
],
)
def test_threshold_levels(self, mixed_content_html, threshold, expected_count):
"""Test different threshold levels"""
filter = PruningContentFilter(threshold_type='fixed', threshold=threshold)
filter = PruningContentFilter(threshold_type="fixed", threshold=threshold)
contents = filter.filter_content(mixed_content_html)
assert len(contents) <= expected_count, \
f"Expected {expected_count} or fewer elements with threshold {threshold}"
assert (
len(contents) <= expected_count
), f"Expected {expected_count} or fewer elements with threshold {threshold}"
def test_consistent_output(self, basic_html):
"""Test output consistency across multiple runs"""
@@ -155,5 +165,6 @@ class TestPruningContentFilter:
second_run = filter.filter_content(basic_html)
assert first_run == second_run, "Output should be consistent"
if __name__ == "__main__":
pytest.main([__file__])
pytest.main([__file__])

View File

@@ -1,22 +1,24 @@
import asyncio
from bs4 import BeautifulSoup
from typing import Dict, Any
import os
import sys
import time
import csv
from tabulate import tabulate
from dataclasses import dataclass
from typing import List, Dict
from typing import List
parent_dir = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
parent_dir = os.path.dirname(
os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
)
sys.path.append(parent_dir)
__location__ = os.path.realpath(os.path.join(os.getcwd(), os.path.dirname(__file__)))
from crawl4ai.content_scraping_strategy import WebScrapingStrategy
from crawl4ai.content_scraping_strategy import WebScrapingStrategy as WebScrapingStrategyCurrent
from crawl4ai.content_scraping_strategy import (
WebScrapingStrategy as WebScrapingStrategyCurrent,
)
# from crawl4ai.content_scrapping_strategy_current import WebScrapingStrategy as WebScrapingStrategyCurrent
@dataclass
class TestResult:
name: str
@@ -27,69 +29,71 @@ class TestResult:
markdown_length: int
execution_time: float
class StrategyTester:
def __init__(self):
self.new_scraper = WebScrapingStrategy()
self.current_scraper = WebScrapingStrategyCurrent()
with open(__location__ + '/sample_wikipedia.html', 'r', encoding='utf-8') as f:
with open(__location__ + "/sample_wikipedia.html", "r", encoding="utf-8") as f:
self.WIKI_HTML = f.read()
self.results = {'new': [], 'current': []}
self.results = {"new": [], "current": []}
def run_test(self, name: str, **kwargs) -> tuple[TestResult, TestResult]:
results = []
for scraper in [self.new_scraper, self.current_scraper]:
start_time = time.time()
result = scraper._get_content_of_website_optimized(
url="https://en.wikipedia.org/wiki/Test",
html=self.WIKI_HTML,
**kwargs
url="https://en.wikipedia.org/wiki/Test", html=self.WIKI_HTML, **kwargs
)
execution_time = time.time() - start_time
test_result = TestResult(
name=name,
success=result['success'],
images=len(result['media']['images']),
internal_links=len(result['links']['internal']),
external_links=len(result['links']['external']),
markdown_length=len(result['markdown']),
execution_time=execution_time
success=result["success"],
images=len(result["media"]["images"]),
internal_links=len(result["links"]["internal"]),
external_links=len(result["links"]["external"]),
markdown_length=len(result["markdown"]),
execution_time=execution_time,
)
results.append(test_result)
return results[0], results[1] # new, current
def run_all_tests(self):
test_cases = [
("Basic Extraction", {}),
("Exclude Tags", {'excluded_tags': ['table', 'div.infobox', 'div.navbox']}),
("Word Threshold", {'word_count_threshold': 50}),
("CSS Selector", {'css_selector': 'div.mw-parser-output > p'}),
("Link Exclusions", {
'exclude_external_links': True,
'exclude_social_media_links': True,
'exclude_domains': ['facebook.com', 'twitter.com']
}),
("Media Handling", {
'exclude_external_images': True,
'image_description_min_word_threshold': 20
}),
("Text Only", {
'only_text': True,
'remove_forms': True
}),
("HTML Cleaning", {
'clean_html': True,
'keep_data_attributes': True
}),
("HTML2Text Options", {
'html2text': {
'skip_internal_links': True,
'single_line_break': True,
'mark_code': True,
'preserve_tags': ['pre', 'code']
}
})
("Exclude Tags", {"excluded_tags": ["table", "div.infobox", "div.navbox"]}),
("Word Threshold", {"word_count_threshold": 50}),
("CSS Selector", {"css_selector": "div.mw-parser-output > p"}),
(
"Link Exclusions",
{
"exclude_external_links": True,
"exclude_social_media_links": True,
"exclude_domains": ["facebook.com", "twitter.com"],
},
),
(
"Media Handling",
{
"exclude_external_images": True,
"image_description_min_word_threshold": 20,
},
),
("Text Only", {"only_text": True, "remove_forms": True}),
("HTML Cleaning", {"clean_html": True, "keep_data_attributes": True}),
(
"HTML2Text Options",
{
"html2text": {
"skip_internal_links": True,
"single_line_break": True,
"mark_code": True,
"preserve_tags": ["pre", "code"],
}
},
),
]
all_results = []
@@ -99,64 +103,117 @@ class StrategyTester:
all_results.append((name, new_result, current_result))
except Exception as e:
print(f"Error in {name}: {str(e)}")
self.save_results_to_csv(all_results)
self.print_comparison_table(all_results)
def save_results_to_csv(self, all_results: List[tuple]):
csv_file = os.path.join(__location__, 'strategy_comparison_results.csv')
with open(csv_file, 'w', newline='') as f:
csv_file = os.path.join(__location__, "strategy_comparison_results.csv")
with open(csv_file, "w", newline="") as f:
writer = csv.writer(f)
writer.writerow(['Test Name', 'Strategy', 'Success', 'Images', 'Internal Links',
'External Links', 'Markdown Length', 'Execution Time'])
writer.writerow(
[
"Test Name",
"Strategy",
"Success",
"Images",
"Internal Links",
"External Links",
"Markdown Length",
"Execution Time",
]
)
for name, new_result, current_result in all_results:
writer.writerow([name, 'New', new_result.success, new_result.images,
new_result.internal_links, new_result.external_links,
new_result.markdown_length, f"{new_result.execution_time:.3f}"])
writer.writerow([name, 'Current', current_result.success, current_result.images,
current_result.internal_links, current_result.external_links,
current_result.markdown_length, f"{current_result.execution_time:.3f}"])
writer.writerow(
[
name,
"New",
new_result.success,
new_result.images,
new_result.internal_links,
new_result.external_links,
new_result.markdown_length,
f"{new_result.execution_time:.3f}",
]
)
writer.writerow(
[
name,
"Current",
current_result.success,
current_result.images,
current_result.internal_links,
current_result.external_links,
current_result.markdown_length,
f"{current_result.execution_time:.3f}",
]
)
def print_comparison_table(self, all_results: List[tuple]):
table_data = []
headers = ['Test Name', 'Strategy', 'Success', 'Images', 'Internal Links',
'External Links', 'Markdown Length', 'Time (s)']
headers = [
"Test Name",
"Strategy",
"Success",
"Images",
"Internal Links",
"External Links",
"Markdown Length",
"Time (s)",
]
for name, new_result, current_result in all_results:
# Check for differences
differences = []
if new_result.images != current_result.images: differences.append('images')
if new_result.internal_links != current_result.internal_links: differences.append('internal_links')
if new_result.external_links != current_result.external_links: differences.append('external_links')
if new_result.markdown_length != current_result.markdown_length: differences.append('markdown')
if new_result.images != current_result.images:
differences.append("images")
if new_result.internal_links != current_result.internal_links:
differences.append("internal_links")
if new_result.external_links != current_result.external_links:
differences.append("external_links")
if new_result.markdown_length != current_result.markdown_length:
differences.append("markdown")
# Add row for new strategy
new_row = [
name, 'New', new_result.success, new_result.images,
new_result.internal_links, new_result.external_links,
new_result.markdown_length, f"{new_result.execution_time:.3f}"
name,
"New",
new_result.success,
new_result.images,
new_result.internal_links,
new_result.external_links,
new_result.markdown_length,
f"{new_result.execution_time:.3f}",
]
table_data.append(new_row)
# Add row for current strategy
current_row = [
'', 'Current', current_result.success, current_result.images,
current_result.internal_links, current_result.external_links,
current_result.markdown_length, f"{current_result.execution_time:.3f}"
"",
"Current",
current_result.success,
current_result.images,
current_result.internal_links,
current_result.external_links,
current_result.markdown_length,
f"{current_result.execution_time:.3f}",
]
table_data.append(current_row)
# Add difference summary if any
if differences:
table_data.append(['', '⚠️ Differences', ', '.join(differences), '', '', '', '', ''])
table_data.append(
["", "⚠️ Differences", ", ".join(differences), "", "", "", "", ""]
)
# Add empty row for better readability
table_data.append([''] * len(headers))
table_data.append([""] * len(headers))
print("\nStrategy Comparison Results:")
print(tabulate(table_data, headers=headers, tablefmt='grid'))
print(tabulate(table_data, headers=headers, tablefmt="grid"))
if __name__ == "__main__":
tester = StrategyTester()
tester.run_all_tests()
tester.run_all_tests()

View File

@@ -1,14 +1,13 @@
import os
import sys
import pytest
import asyncio
# Add the parent directory to the Python path
parent_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
sys.path.append(parent_dir)
from crawl4ai.async_webcrawler import AsyncWebCrawler
from crawl4ai.async_crawler_strategy import AsyncPlaywrightCrawlerStrategy
@pytest.mark.asyncio
async def test_custom_user_agent():
@@ -20,6 +19,7 @@ async def test_custom_user_agent():
assert result.success
assert custom_user_agent in result.html
@pytest.mark.asyncio
async def test_custom_headers():
async with AsyncWebCrawler(verbose=True) as crawler:
@@ -31,6 +31,7 @@ async def test_custom_headers():
assert "X-Test-Header" in result.html
assert "TestValue" in result.html
@pytest.mark.asyncio
async def test_javascript_execution():
async with AsyncWebCrawler(verbose=True) as crawler:
@@ -40,19 +41,22 @@ async def test_javascript_execution():
assert result.success
assert "<h1>Modified by JS</h1>" in result.html
@pytest.mark.asyncio
async def test_hook_execution():
async with AsyncWebCrawler(verbose=True) as crawler:
async def test_hook(page):
await page.evaluate("document.body.style.backgroundColor = 'red';")
return page
crawler.crawler_strategy.set_hook('after_goto', test_hook)
crawler.crawler_strategy.set_hook("after_goto", test_hook)
url = "https://www.example.com"
result = await crawler.arun(url=url, bypass_cache=True)
assert result.success
assert "background-color: red" in result.html
@pytest.mark.asyncio
async def test_screenshot():
async with AsyncWebCrawler(verbose=True) as crawler:
@@ -63,6 +67,7 @@ async def test_screenshot():
assert isinstance(result.screenshot, str)
assert len(result.screenshot) > 0
# Entry point for debugging
if __name__ == "__main__":
pytest.main([__file__, "-v"])
pytest.main([__file__, "-v"])

View File

@@ -1,8 +1,6 @@
import os
import sys
import pytest
import asyncio
import json
# Add the parent directory to the Python path
parent_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
@@ -10,6 +8,7 @@ sys.path.append(parent_dir)
from crawl4ai.async_webcrawler import AsyncWebCrawler
@pytest.mark.asyncio
async def test_cache_url():
async with AsyncWebCrawler(verbose=True) as crawler:
@@ -23,6 +22,7 @@ async def test_cache_url():
assert result2.success
assert result2.html == result1.html
@pytest.mark.asyncio
async def test_bypass_cache():
async with AsyncWebCrawler(verbose=True) as crawler:
@@ -34,25 +34,29 @@ async def test_bypass_cache():
# Second run bypassing cache
result2 = await crawler.arun(url=url, bypass_cache=True)
assert result2.success
assert result2.html != result1.html # Content might be different due to dynamic nature of websites
assert (
result2.html != result1.html
) # Content might be different due to dynamic nature of websites
@pytest.mark.asyncio
async def test_cache_size():
async with AsyncWebCrawler(verbose=True) as crawler:
initial_size = await crawler.aget_cache_size()
url = "https://www.nbcnews.com/business"
await crawler.arun(url=url, bypass_cache=True)
new_size = await crawler.aget_cache_size()
assert new_size == initial_size + 1
@pytest.mark.asyncio
async def test_clear_cache():
async with AsyncWebCrawler(verbose=True) as crawler:
url = "https://www.example.org"
await crawler.arun(url=url, bypass_cache=True)
initial_size = await crawler.aget_cache_size()
assert initial_size > 0
@@ -60,12 +64,13 @@ async def test_clear_cache():
new_size = await crawler.aget_cache_size()
assert new_size == 0
@pytest.mark.asyncio
async def test_flush_cache():
async with AsyncWebCrawler(verbose=True) as crawler:
url = "https://www.example.net"
await crawler.arun(url=url, bypass_cache=True)
initial_size = await crawler.aget_cache_size()
assert initial_size > 0
@@ -75,8 +80,11 @@ async def test_flush_cache():
# Try to retrieve the previously cached URL
result = await crawler.arun(url=url, bypass_cache=False)
assert result.success # The crawler should still succeed, but it will fetch the content anew
assert (
result.success
) # The crawler should still succeed, but it will fetch the content anew
# Entry point for debugging
if __name__ == "__main__":
pytest.main([__file__, "-v"])
pytest.main([__file__, "-v"])

View File

@@ -1,114 +1,133 @@
import pytest
import asyncio, time
import time
from crawl4ai import (
AsyncWebCrawler, BrowserConfig, CrawlerRunConfig,
MemoryAdaptiveDispatcher, SemaphoreDispatcher,
RateLimiter, CrawlerMonitor, DisplayMode, CacheMode
AsyncWebCrawler,
BrowserConfig,
CrawlerRunConfig,
MemoryAdaptiveDispatcher,
SemaphoreDispatcher,
RateLimiter,
CrawlerMonitor,
DisplayMode,
CacheMode,
)
@pytest.fixture
def browser_config():
return BrowserConfig(
headless=True,
verbose=False
)
return BrowserConfig(headless=True, verbose=False)
@pytest.fixture
def run_config():
return CrawlerRunConfig(
cache_mode=CacheMode.BYPASS,
verbose=False
)
return CrawlerRunConfig(cache_mode=CacheMode.BYPASS, verbose=False)
@pytest.fixture
def test_urls():
return [
"http://example.com",
"http://example.com/page1",
"http://example.com/page2"
"http://example.com/page2",
]
@pytest.mark.asyncio
class TestDispatchStrategies:
async def test_memory_adaptive_basic(self, browser_config, run_config, test_urls):
async with AsyncWebCrawler(config=browser_config) as crawler:
dispatcher = MemoryAdaptiveDispatcher(
memory_threshold_percent=70.0,
max_session_permit=2,
check_interval=0.1
memory_threshold_percent=70.0, max_session_permit=2, check_interval=0.1
)
results = await crawler.arun_many(
test_urls, config=run_config, dispatcher=dispatcher
)
results = await crawler.arun_many(test_urls, config=run_config, dispatcher=dispatcher)
assert len(results) == len(test_urls)
assert all(r.success for r in results)
async def test_memory_adaptive_with_rate_limit(self, browser_config, run_config, test_urls):
async def test_memory_adaptive_with_rate_limit(
self, browser_config, run_config, test_urls
):
async with AsyncWebCrawler(config=browser_config) as crawler:
dispatcher = MemoryAdaptiveDispatcher(
memory_threshold_percent=70.0,
max_session_permit=2,
check_interval=0.1,
rate_limiter=RateLimiter(
base_delay=(0.1, 0.2),
max_delay=1.0,
max_retries=2
)
base_delay=(0.1, 0.2), max_delay=1.0, max_retries=2
),
)
results = await crawler.arun_many(
test_urls, config=run_config, dispatcher=dispatcher
)
results = await crawler.arun_many(test_urls, config=run_config, dispatcher=dispatcher)
assert len(results) == len(test_urls)
assert all(r.success for r in results)
async def test_semaphore_basic(self, browser_config, run_config, test_urls):
async with AsyncWebCrawler(config=browser_config) as crawler:
dispatcher = SemaphoreDispatcher(
semaphore_count=2
dispatcher = SemaphoreDispatcher(semaphore_count=2)
results = await crawler.arun_many(
test_urls, config=run_config, dispatcher=dispatcher
)
results = await crawler.arun_many(test_urls, config=run_config, dispatcher=dispatcher)
assert len(results) == len(test_urls)
assert all(r.success for r in results)
async def test_semaphore_with_rate_limit(self, browser_config, run_config, test_urls):
async def test_semaphore_with_rate_limit(
self, browser_config, run_config, test_urls
):
async with AsyncWebCrawler(config=browser_config) as crawler:
dispatcher = SemaphoreDispatcher(
semaphore_count=2,
rate_limiter=RateLimiter(
base_delay=(0.1, 0.2),
max_delay=1.0,
max_retries=2
)
base_delay=(0.1, 0.2), max_delay=1.0, max_retries=2
),
)
results = await crawler.arun_many(
test_urls, config=run_config, dispatcher=dispatcher
)
results = await crawler.arun_many(test_urls, config=run_config, dispatcher=dispatcher)
assert len(results) == len(test_urls)
assert all(r.success for r in results)
async def test_memory_adaptive_memory_error(self, browser_config, run_config, test_urls):
async def test_memory_adaptive_memory_error(
self, browser_config, run_config, test_urls
):
async with AsyncWebCrawler(config=browser_config) as crawler:
dispatcher = MemoryAdaptiveDispatcher(
memory_threshold_percent=1.0, # Set unrealistically low threshold
max_session_permit=2,
check_interval=0.1,
memory_wait_timeout=1.0 # Short timeout for testing
memory_wait_timeout=1.0, # Short timeout for testing
)
with pytest.raises(MemoryError):
await crawler.arun_many(test_urls, config=run_config, dispatcher=dispatcher)
await crawler.arun_many(
test_urls, config=run_config, dispatcher=dispatcher
)
async def test_empty_urls(self, browser_config, run_config):
async with AsyncWebCrawler(config=browser_config) as crawler:
dispatcher = MemoryAdaptiveDispatcher(max_session_permit=2)
results = await crawler.arun_many([], config=run_config, dispatcher=dispatcher)
results = await crawler.arun_many(
[], config=run_config, dispatcher=dispatcher
)
assert len(results) == 0
async def test_single_url(self, browser_config, run_config):
async with AsyncWebCrawler(config=browser_config) as crawler:
dispatcher = MemoryAdaptiveDispatcher(max_session_permit=2)
results = await crawler.arun_many(["http://example.com"], config=run_config, dispatcher=dispatcher)
results = await crawler.arun_many(
["http://example.com"], config=run_config, dispatcher=dispatcher
)
assert len(results) == 1
assert results[0].success
async def test_invalid_urls(self, browser_config, run_config):
async with AsyncWebCrawler(config=browser_config) as crawler:
dispatcher = MemoryAdaptiveDispatcher(max_session_permit=2)
results = await crawler.arun_many(["http://invalid.url.that.doesnt.exist"], config=run_config, dispatcher=dispatcher)
results = await crawler.arun_many(
["http://invalid.url.that.doesnt.exist"],
config=run_config,
dispatcher=dispatcher,
)
assert len(results) == 1
assert not results[0].success
@@ -121,27 +140,31 @@ class TestDispatchStrategies:
base_delay=(0.1, 0.2),
max_delay=1.0,
max_retries=2,
rate_limit_codes=[200] # Force rate limiting for testing
)
rate_limit_codes=[200], # Force rate limiting for testing
),
)
start_time = time.time()
results = await crawler.arun_many(urls, config=run_config, dispatcher=dispatcher)
results = await crawler.arun_many(
urls, config=run_config, dispatcher=dispatcher
)
duration = time.time() - start_time
assert len(results) == len(urls)
assert duration > 1.0 # Ensure rate limiting caused delays
async def test_monitor_integration(self, browser_config, run_config, test_urls):
async with AsyncWebCrawler(config=browser_config) as crawler:
monitor = CrawlerMonitor(max_visible_rows=5, display_mode=DisplayMode.DETAILED)
dispatcher = MemoryAdaptiveDispatcher(
max_session_permit=2,
monitor=monitor
monitor = CrawlerMonitor(
max_visible_rows=5, display_mode=DisplayMode.DETAILED
)
dispatcher = MemoryAdaptiveDispatcher(max_session_permit=2, monitor=monitor)
results = await crawler.arun_many(
test_urls, config=run_config, dispatcher=dispatcher
)
results = await crawler.arun_many(test_urls, config=run_config, dispatcher=dispatcher)
assert len(results) == len(test_urls)
# Check monitor stats
assert len(monitor.stats) == len(test_urls)
assert all(stat.end_time is not None for stat in monitor.stats.values())
if __name__ == "__main__":
pytest.main([__file__, "-v", "--asyncio-mode=auto"])
pytest.main([__file__, "-v", "--asyncio-mode=auto"])

View File

@@ -2,9 +2,9 @@ import os
import re
import sys
import pytest
import json
from bs4 import BeautifulSoup
import asyncio
# Add the parent directory to the Python path
parent_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
sys.path.append(parent_dir)
@@ -59,19 +59,21 @@ from crawl4ai.async_webcrawler import AsyncWebCrawler
# assert result.success
# assert "github" in result.html.lower()
# Add this test to your existing test file
@pytest.mark.asyncio
async def test_typescript_commits_multi_page():
first_commit = ""
async def on_execution_started(page):
nonlocal first_commit
nonlocal first_commit
try:
# Check if the page firct commit h4 text is different from the first commit (use document.querySelector('li.Box-sc-g0xbh4-0 h4'))
while True:
await page.wait_for_selector('li.Box-sc-g0xbh4-0 h4')
commit = await page.query_selector('li.Box-sc-g0xbh4-0 h4')
commit = await commit.evaluate('(element) => element.textContent')
commit = re.sub(r'\s+', '', commit)
await page.wait_for_selector("li.Box-sc-g0xbh4-0 h4")
commit = await page.query_selector("li.Box-sc-g0xbh4-0 h4")
commit = await commit.evaluate("(element) => element.textContent")
commit = re.sub(r"\s+", "", commit)
if commit and commit != first_commit:
first_commit = commit
break
@@ -79,9 +81,8 @@ async def test_typescript_commits_multi_page():
except Exception as e:
print(f"Warning: New content didn't appear after JavaScript execution: {e}")
async with AsyncWebCrawler(verbose=True) as crawler:
crawler.crawler_strategy.set_hook('on_execution_started', on_execution_started)
crawler.crawler_strategy.set_hook("on_execution_started", on_execution_started)
url = "https://github.com/microsoft/TypeScript/commits/main"
session_id = "typescript_commits_session"
@@ -97,19 +98,21 @@ async def test_typescript_commits_multi_page():
url=url, # Only use URL for the first page
session_id=session_id,
css_selector="li.Box-sc-g0xbh4-0",
js=js_next_page if page > 0 else None, # Don't click 'next' on the first page
js=js_next_page
if page > 0
else None, # Don't click 'next' on the first page
bypass_cache=True,
js_only=page > 0 # Use js_only for subsequent pages
js_only=page > 0, # Use js_only for subsequent pages
)
assert result.success, f"Failed to crawl page {page + 1}"
# Parse the HTML and extract commits
soup = BeautifulSoup(result.cleaned_html, 'html.parser')
soup = BeautifulSoup(result.cleaned_html, "html.parser")
commits = soup.select("li")
# Take first commit find h4 extract text
first_commit = commits[0].find("h4").text
first_commit = re.sub(r'\s+', '', first_commit)
first_commit = re.sub(r"\s+", "", first_commit)
all_commits.extend(commits)
print(f"Page {page + 1}: Found {len(commits)} commits")
@@ -118,10 +121,13 @@ async def test_typescript_commits_multi_page():
await crawler.crawler_strategy.kill_session(session_id)
# Assertions
assert len(all_commits) >= 90, f"Expected at least 90 commits, but got {len(all_commits)}"
print(f"Successfully crawled {len(all_commits)} commits across 3 pages")
assert (
len(all_commits) >= 90
), f"Expected at least 90 commits, but got {len(all_commits)}"
print(f"Successfully crawled {len(all_commits)} commits across 3 pages")
# Entry point for debugging
if __name__ == "__main__":
pytest.main([__file__, "-v"])
pytest.main([__file__, "-v"])

View File

@@ -75,4 +75,4 @@
# # Entry point for debugging
# if __name__ == "__main__":
# pytest.main([__file__, "-v"])
# pytest.main([__file__, "-v"])

View File

@@ -1,11 +1,15 @@
import json
import time
from bs4 import BeautifulSoup
from crawl4ai.content_scraping_strategy import WebScrapingStrategy, LXMLWebScrapingStrategy
from typing import Dict, Any, List, Tuple
from crawl4ai.content_scraping_strategy import (
WebScrapingStrategy,
LXMLWebScrapingStrategy,
)
from typing import Dict, List, Tuple
import difflib
from lxml import html as lhtml, etree
def normalize_dom(element):
"""
Recursively normalizes an lxml HTML element:
@@ -15,7 +19,7 @@ def normalize_dom(element):
Returns the same element (mutated).
"""
# Remove comment nodes
comments = element.xpath('//comment()')
comments = element.xpath("//comment()")
for c in comments:
p = c.getparent()
if p is not None:
@@ -45,7 +49,7 @@ def strip_html_body(root):
"""
If 'root' is <html>, find its <body> child and move all of <body>'s children
into a new <div>. Return that <div>.
If 'root' is <body>, similarly move all of its children into a new <div> and return it.
Otherwise, return 'root' as-is.
@@ -53,8 +57,8 @@ def strip_html_body(root):
tag_name = (root.tag or "").lower()
# Case 1: The root is <html>
if tag_name == 'html':
bodies = root.xpath('./body')
if tag_name == "html":
bodies = root.xpath("./body")
if bodies:
body = bodies[0]
new_div = lhtml.Element("div")
@@ -66,7 +70,7 @@ def strip_html_body(root):
return root
# Case 2: The root is <body>
elif tag_name == 'body':
elif tag_name == "body":
new_div = lhtml.Element("div")
for child in root:
new_div.append(child)
@@ -92,7 +96,9 @@ def compare_nodes(node1, node2, differences, path="/"):
attrs1 = list(node1.attrib.items())
attrs2 = list(node2.attrib.items())
if attrs1 != attrs2:
differences.append(f"Attribute mismatch at {path}/{node1.tag}: {attrs1} vs. {attrs2}")
differences.append(
f"Attribute mismatch at {path}/{node1.tag}: {attrs1} vs. {attrs2}"
)
# 3) Compare text (trim or unify whitespace as needed)
text1 = (node1.text or "").strip()
@@ -102,7 +108,9 @@ def compare_nodes(node1, node2, differences, path="/"):
text2 = " ".join(text2.split())
if text1 != text2:
# If you prefer ignoring newlines or multiple whitespace, do a more robust cleanup
differences.append(f"Text mismatch at {path}/{node1.tag}: '{text1}' vs. '{text2}'")
differences.append(
f"Text mismatch at {path}/{node1.tag}: '{text1}' vs. '{text2}'"
)
# 4) Compare number of children
children1 = list(node1)
@@ -123,7 +131,9 @@ def compare_nodes(node1, node2, differences, path="/"):
tail1 = (node1.tail or "").strip()
tail2 = (node2.tail or "").strip()
if tail1 != tail2:
differences.append(f"Tail mismatch after {path}/{node1.tag}: '{tail1}' vs. '{tail2}'")
differences.append(
f"Tail mismatch after {path}/{node1.tag}: '{tail1}' vs. '{tail2}'"
)
def compare_html_structurally(html1, html2):
@@ -156,11 +166,11 @@ def compare_html_structurally(html1, html2):
return differences
def generate_large_html(n_elements=1000):
html = ['<!DOCTYPE html><html><head></head><body>']
html = ["<!DOCTYPE html><html><head></head><body>"]
for i in range(n_elements):
html.append(f'''
html.append(
f"""
<div class="article">
<h2>Heading {i}</h2>
<p>This is paragraph {i} with some content and a <a href="http://example.com/{i}">link</a></p>
@@ -170,13 +180,15 @@ def generate_large_html(n_elements=1000):
<li>List item {i}.2</li>
</ul>
</div>
''')
html.append('</body></html>')
return ''.join(html)
"""
)
html.append("</body></html>")
return "".join(html)
def generate_complicated_html():
"""
HTML with multiple domains, forms, data attributes,
HTML with multiple domains, forms, data attributes,
various images, comments, style, and noscript to test all parameter toggles.
"""
return """
@@ -258,7 +270,7 @@ def generate_complicated_html():
def get_test_scenarios():
"""
Returns a dictionary of parameter sets (test scenarios) for the scraper.
Each scenario name maps to a dictionary of keyword arguments
Each scenario name maps to a dictionary of keyword arguments
that will be passed into scrap() for testing various features.
"""
TEST_SCENARIOS = {
@@ -341,7 +353,7 @@ def get_test_scenarios():
# "exclude_external_links": True
# },
# "comprehensive_removal": {
# # Exclude multiple tags, remove forms & comments,
# # Exclude multiple tags, remove forms & comments,
# # and also remove targeted selectors
# "excluded_tags": ["aside", "noscript", "script"],
# "excluded_selector": "#promo-section, .social-widget",
@@ -352,19 +364,18 @@ def get_test_scenarios():
return TEST_SCENARIOS
class ScraperEquivalenceTester:
def __init__(self):
self.test_cases = {
'basic': self.generate_basic_html(),
'complex': self.generate_complex_html(),
'malformed': self.generate_malformed_html(),
"basic": self.generate_basic_html(),
"complex": self.generate_complex_html(),
"malformed": self.generate_malformed_html(),
# 'real_world': self.load_real_samples()
}
def generate_basic_html(self):
return generate_large_html(1000) # Your existing function
def generate_complex_html(self):
return """
<html><body>
@@ -384,7 +395,7 @@ class ScraperEquivalenceTester:
</div>
</body></html>
"""
def generate_malformed_html(self):
return """
<div>Unclosed div
@@ -395,139 +406,139 @@ class ScraperEquivalenceTester:
<!-- Malformed comment -- > -->
<![CDATA[Test CDATA]]>
"""
def load_real_samples(self):
# Load some real-world HTML samples you've collected
samples = {
'article': open('tests/samples/article.html').read(),
'product': open('tests/samples/product.html').read(),
'blog': open('tests/samples/blog.html').read()
"article": open("tests/samples/article.html").read(),
"product": open("tests/samples/product.html").read(),
"blog": open("tests/samples/blog.html").read(),
}
return samples
def deep_compare_links(self, old_links: Dict, new_links: Dict) -> List[str]:
"""Detailed comparison of link structures"""
differences = []
for category in ['internal', 'external']:
old_urls = {link['href'] for link in old_links[category]}
new_urls = {link['href'] for link in new_links[category]}
for category in ["internal", "external"]:
old_urls = {link["href"] for link in old_links[category]}
new_urls = {link["href"] for link in new_links[category]}
missing = old_urls - new_urls
extra = new_urls - old_urls
if missing:
differences.append(f"Missing {category} links: {missing}")
if extra:
differences.append(f"Extra {category} links: {extra}")
# Compare link attributes for common URLs
common = old_urls & new_urls
for url in common:
old_link = next(l for l in old_links[category] if l['href'] == url)
new_link = next(l for l in new_links[category] if l['href'] == url)
for attr in ['text', 'title']:
old_link = next(l for l in old_links[category] if l["href"] == url)
new_link = next(l for l in new_links[category] if l["href"] == url)
for attr in ["text", "title"]:
if old_link[attr] != new_link[attr]:
differences.append(
f"Link attribute mismatch for {url} - {attr}:"
f" old='{old_link[attr]}' vs new='{new_link[attr]}'"
)
return differences
def deep_compare_media(self, old_media: Dict, new_media: Dict) -> List[str]:
"""Detailed comparison of media elements"""
differences = []
for media_type in ['images', 'videos', 'audios']:
old_srcs = {item['src'] for item in old_media[media_type]}
new_srcs = {item['src'] for item in new_media[media_type]}
for media_type in ["images", "videos", "audios"]:
old_srcs = {item["src"] for item in old_media[media_type]}
new_srcs = {item["src"] for item in new_media[media_type]}
missing = old_srcs - new_srcs
extra = new_srcs - old_srcs
if missing:
differences.append(f"Missing {media_type}: {missing}")
if extra:
differences.append(f"Extra {media_type}: {extra}")
# Compare media attributes for common sources
common = old_srcs & new_srcs
for src in common:
old_item = next(m for m in old_media[media_type] if m['src'] == src)
new_item = next(m for m in new_media[media_type] if m['src'] == src)
for attr in ['alt', 'description']:
old_item = next(m for m in old_media[media_type] if m["src"] == src)
new_item = next(m for m in new_media[media_type] if m["src"] == src)
for attr in ["alt", "description"]:
if old_item.get(attr) != new_item.get(attr):
differences.append(
f"{media_type} attribute mismatch for {src} - {attr}:"
f" old='{old_item.get(attr)}' vs new='{new_item.get(attr)}'"
)
return differences
def compare_html_content(self, old_html: str, new_html: str) -> List[str]:
"""Compare HTML content structure and text"""
# return compare_html_structurally(old_html, new_html)
differences = []
def normalize_html(html: str) -> Tuple[str, str]:
soup = BeautifulSoup(html, 'lxml')
soup = BeautifulSoup(html, "lxml")
# Get both structure and text
structure = ' '.join(tag.name for tag in soup.find_all())
text = ' '.join(soup.get_text().split())
structure = " ".join(tag.name for tag in soup.find_all())
text = " ".join(soup.get_text().split())
return structure, text
old_structure, old_text = normalize_html(old_html)
new_structure, new_text = normalize_html(new_html)
# Compare structure
if abs(len(old_structure) - len(new_structure)) > 100:
# if old_structure != new_structure:
# if old_structure != new_structure:
diff = difflib.unified_diff(
old_structure.split(),
new_structure.split(),
lineterm=''
old_structure.split(), new_structure.split(), lineterm=""
)
differences.append("HTML structure differences:\n" + '\n'.join(diff))
differences.append("HTML structure differences:\n" + "\n".join(diff))
# Compare text content
if abs(len(old_text) - len(new_text)) > 100:
# if old_text != new_text:
# if old_text != new_text:
# Show detailed text differences
text_diff = difflib.unified_diff(
old_text.split(),
new_text.split(),
lineterm=''
old_text.split(), new_text.split(), lineterm=""
)
differences.append("Text content differences:\n" + '\n'.join(text_diff))
differences.append("Text content differences:\n" + "\n".join(text_diff))
return differences
def compare_results(self, old_result: Dict, new_result: Dict) -> Dict[str, List[str]]:
def compare_results(
self, old_result: Dict, new_result: Dict
) -> Dict[str, List[str]]:
"""Comprehensive comparison of scraper outputs"""
differences = {}
# Compare links
link_differences = self.deep_compare_links(old_result['links'], new_result['links'])
link_differences = self.deep_compare_links(
old_result["links"], new_result["links"]
)
if link_differences:
differences['links'] = link_differences
differences["links"] = link_differences
# Compare media
media_differences = self.deep_compare_media(old_result['media'], new_result['media'])
media_differences = self.deep_compare_media(
old_result["media"], new_result["media"]
)
if media_differences:
differences['media'] = media_differences
differences["media"] = media_differences
# Compare HTML
html_differences = self.compare_html_content(
old_result['cleaned_html'],
new_result['cleaned_html']
old_result["cleaned_html"], new_result["cleaned_html"]
)
if html_differences:
differences['html'] = html_differences
differences["html"] = html_differences
return differences
def run_tests(self) -> Dict:
@@ -535,52 +546,49 @@ class ScraperEquivalenceTester:
# We'll still keep some "test_cases" logic from above (basic, complex, malformed).
# But we add a new section for the complicated HTML scenarios.
results = {
'tests': [],
'summary': {'passed': 0, 'failed': 0}
}
results = {"tests": [], "summary": {"passed": 0, "failed": 0}}
# 1) First, run the existing 3 built-in test cases (basic, complex, malformed).
# for case_name, html in self.test_cases.items():
# print(f"\nTesting built-in case: {case_name}...")
# original = WebScrapingStrategy()
# lxml = LXMLWebScrapingStrategy()
# start = time.time()
# orig_result = original.scrap("http://test.com", html)
# orig_time = time.time() - start
# print("\nOriginal Mode:")
# print(f"Cleaned HTML size: {len(orig_result['cleaned_html'])/1024:.2f} KB")
# print(f"Images: {len(orig_result['media']['images'])}")
# print(f"External links: {len(orig_result['links']['external'])}")
# print(f"Times - Original: {orig_time:.3f}s")
# start = time.time()
# lxml_result = lxml.scrap("http://test.com", html)
# lxml_time = time.time() - start
# print("\nLXML Mode:")
# print(f"Cleaned HTML size: {len(lxml_result['cleaned_html'])/1024:.2f} KB")
# print(f"Images: {len(lxml_result['media']['images'])}")
# print(f"External links: {len(lxml_result['links']['external'])}")
# print(f"Times - LXML: {lxml_time:.3f}s")
# # Compare
# diffs = {}
# link_diff = self.deep_compare_links(orig_result['links'], lxml_result['links'])
# if link_diff:
# diffs['links'] = link_diff
# media_diff = self.deep_compare_media(orig_result['media'], lxml_result['media'])
# if media_diff:
# diffs['media'] = media_diff
# html_diff = self.compare_html_content(orig_result['cleaned_html'], lxml_result['cleaned_html'])
# if html_diff:
# diffs['html'] = html_diff
# test_result = {
# 'case': case_name,
# 'lxml_mode': {
@@ -590,7 +598,7 @@ class ScraperEquivalenceTester:
# 'original_time': orig_time
# }
# results['tests'].append(test_result)
# if not diffs:
# results['summary']['passed'] += 1
# else:
@@ -599,50 +607,55 @@ class ScraperEquivalenceTester:
# 2) Now, run the complicated HTML with multiple parameter scenarios.
complicated_html = generate_complicated_html()
print("\n=== Testing complicated HTML with multiple parameter scenarios ===")
# Create the scrapers once (or you can re-create if needed)
original = WebScrapingStrategy()
lxml = LXMLWebScrapingStrategy()
for scenario_name, params in get_test_scenarios().items():
print(f"\nScenario: {scenario_name}")
start = time.time()
orig_result = original.scrap("http://test.com", complicated_html, **params)
orig_time = time.time() - start
start = time.time()
lxml_result = lxml.scrap("http://test.com", complicated_html, **params)
lxml_time = time.time() - start
diffs = {}
link_diff = self.deep_compare_links(orig_result['links'], lxml_result['links'])
link_diff = self.deep_compare_links(
orig_result["links"], lxml_result["links"]
)
if link_diff:
diffs['links'] = link_diff
diffs["links"] = link_diff
media_diff = self.deep_compare_media(orig_result['media'], lxml_result['media'])
media_diff = self.deep_compare_media(
orig_result["media"], lxml_result["media"]
)
if media_diff:
diffs['media'] = media_diff
diffs["media"] = media_diff
html_diff = self.compare_html_content(orig_result['cleaned_html'], lxml_result['cleaned_html'])
html_diff = self.compare_html_content(
orig_result["cleaned_html"], lxml_result["cleaned_html"]
)
if html_diff:
diffs['html'] = html_diff
diffs["html"] = html_diff
test_result = {
'case': f"complicated_{scenario_name}",
'lxml_mode': {
'differences': diffs,
'execution_time': lxml_time
},
'original_time': orig_time
"case": f"complicated_{scenario_name}",
"lxml_mode": {"differences": diffs, "execution_time": lxml_time},
"original_time": orig_time,
}
results['tests'].append(test_result)
results["tests"].append(test_result)
if not diffs:
results['summary']['passed'] += 1
print(f"✅ [OK] No differences found. Time(Orig: {orig_time:.3f}s, LXML: {lxml_time:.3f}s)")
results["summary"]["passed"] += 1
print(
f"✅ [OK] No differences found. Time(Orig: {orig_time:.3f}s, LXML: {lxml_time:.3f}s)"
)
else:
results['summary']['failed'] += 1
results["summary"]["failed"] += 1
print("❌ Differences found:")
for category, dlist in diffs.items():
print(f" {category}:")
@@ -657,20 +670,22 @@ class ScraperEquivalenceTester:
print(f"Total Cases: {len(results['tests'])}")
print(f"Passed: {results['summary']['passed']}")
print(f"Failed: {results['summary']['failed']}")
for test in results['tests']:
for test in results["tests"]:
print(f"\nTest Case: {test['case']}")
if not test['lxml_mode']['differences']:
if not test["lxml_mode"]["differences"]:
print("✅ All implementations produced identical results")
print(f"Times - Original: {test['original_time']:.3f}s, "
f"LXML: {test['lxml_mode']['execution_time']:.3f}s")
print(
f"Times - Original: {test['original_time']:.3f}s, "
f"LXML: {test['lxml_mode']['execution_time']:.3f}s"
)
else:
print("❌ Differences found:")
if test['lxml_mode']['differences']:
if test["lxml_mode"]["differences"]:
print("\nLXML Mode Differences:")
for category, diffs in test['lxml_mode']['differences'].items():
for category, diffs in test["lxml_mode"]["differences"].items():
print(f"\n{category}:")
for diff in diffs:
print(f" - {diff}")
@@ -680,11 +695,11 @@ def main():
tester = ScraperEquivalenceTester()
results = tester.run_tests()
tester.print_report(results)
# Save detailed results for debugging
with open('scraper_equivalence_results.json', 'w') as f:
with open("scraper_equivalence_results.json", "w") as f:
json.dump(results, f, indent=2)
if __name__ == "__main__":
main()
main()

View File

@@ -4,10 +4,10 @@
# - **State:** open
import os, sys, time
parent_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
sys.path.append(parent_dir)
__location__ = os.path.realpath( os.path.join(os.getcwd(), os.path.dirname(__file__)))
import asyncio
__location__ = os.path.realpath(os.path.join(os.getcwd(), os.path.dirname(__file__)))
import os
import time
from typing import Dict, Any
@@ -16,18 +16,18 @@ from crawl4ai.markdown_generation_strategy import DefaultMarkdownGenerator
# Get current directory
__location__ = os.path.realpath(os.path.join(os.getcwd(), os.path.dirname(__file__)))
def print_test_result(name: str, result: Dict[str, Any], execution_time: float):
"""Helper function to print test results."""
print(f"\n{'='*20} {name} {'='*20}")
print(f"Execution time: {execution_time:.4f} seconds")
# Save markdown to files
for key, content in result.items():
if isinstance(content, str):
with open(__location__ + f"/output/{name.lower()}_{key}.md", "w") as f:
f.write(content)
# # Print first few lines of each markdown version
# for key, content in result.items():
# if isinstance(content, str):
@@ -36,32 +36,39 @@ def print_test_result(name: str, result: Dict[str, Any], execution_time: float):
# print(preview)
# print(f"Total length: {len(content)} characters")
def test_basic_markdown_conversion():
"""Test basic markdown conversion with links."""
with open(__location__ + "/data/wikipedia.html", "r") as f:
cleaned_html = f.read()
generator = DefaultMarkdownGenerator()
start_time = time.perf_counter()
result = generator.generate_markdown(
cleaned_html=cleaned_html,
base_url="https://en.wikipedia.org"
cleaned_html=cleaned_html, base_url="https://en.wikipedia.org"
)
execution_time = time.perf_counter() - start_time
print_test_result("Basic Markdown Conversion", {
'raw': result.raw_markdown,
'with_citations': result.markdown_with_citations,
'references': result.references_markdown
}, execution_time)
print_test_result(
"Basic Markdown Conversion",
{
"raw": result.raw_markdown,
"with_citations": result.markdown_with_citations,
"references": result.references_markdown,
},
execution_time,
)
# Basic assertions
assert result.raw_markdown, "Raw markdown should not be empty"
assert result.markdown_with_citations, "Markdown with citations should not be empty"
assert result.references_markdown, "References should not be empty"
assert "" in result.markdown_with_citations, "Citations should use ⟨⟩ brackets"
assert "## References" in result.references_markdown, "Should contain references section"
assert (
"## References" in result.references_markdown
), "Should contain references section"
def test_relative_links():
"""Test handling of relative links with base URL."""
@@ -69,97 +76,106 @@ def test_relative_links():
Here's a [relative link](/wiki/Apple) and an [absolute link](https://example.com).
Also an [image](/images/test.png) and another [page](/wiki/Banana).
"""
generator = DefaultMarkdownGenerator()
result = generator.generate_markdown(
cleaned_html=markdown,
base_url="https://en.wikipedia.org"
cleaned_html=markdown, base_url="https://en.wikipedia.org"
)
assert "https://en.wikipedia.org/wiki/Apple" in result.references_markdown
assert "https://example.com" in result.references_markdown
assert "https://en.wikipedia.org/images/test.png" in result.references_markdown
def test_duplicate_links():
"""Test handling of duplicate links."""
markdown = """
Here's a [link](/test) and another [link](/test) and a [different link](/other).
"""
generator = DefaultMarkdownGenerator()
result = generator.generate_markdown(
cleaned_html=markdown,
base_url="https://example.com"
cleaned_html=markdown, base_url="https://example.com"
)
# Count citations in markdown
citations = result.markdown_with_citations.count("⟨1⟩")
assert citations == 2, "Same link should use same citation number"
def test_link_descriptions():
"""Test handling of link titles and descriptions."""
markdown = """
Here's a [link with title](/test "Test Title") and a [link with description](/other) to test.
"""
generator = DefaultMarkdownGenerator()
result = generator.generate_markdown(
cleaned_html=markdown,
base_url="https://example.com"
cleaned_html=markdown, base_url="https://example.com"
)
assert "Test Title" in result.references_markdown, "Link title should be in references"
assert "link with description" in result.references_markdown, "Link text should be in references"
assert (
"Test Title" in result.references_markdown
), "Link title should be in references"
assert (
"link with description" in result.references_markdown
), "Link text should be in references"
def test_performance_large_document():
"""Test performance with large document."""
with open(__location__ + "/data/wikipedia.md", "r") as f:
markdown = f.read()
# Test with multiple iterations
iterations = 5
times = []
generator = DefaultMarkdownGenerator()
for i in range(iterations):
start_time = time.perf_counter()
result = generator.generate_markdown(
cleaned_html=markdown,
base_url="https://en.wikipedia.org"
cleaned_html=markdown, base_url="https://en.wikipedia.org"
)
end_time = time.perf_counter()
times.append(end_time - start_time)
avg_time = sum(times) / len(times)
print(f"\n{'='*20} Performance Test {'='*20}")
print(f"Average execution time over {iterations} iterations: {avg_time:.4f} seconds")
print(
f"Average execution time over {iterations} iterations: {avg_time:.4f} seconds"
)
print(f"Min time: {min(times):.4f} seconds")
print(f"Max time: {max(times):.4f} seconds")
def test_image_links():
"""Test handling of image links."""
markdown = """
Here's an ![image](/image.png "Image Title") and another ![image](/other.jpg).
And a regular [link](/page).
"""
generator = DefaultMarkdownGenerator()
result = generator.generate_markdown(
cleaned_html=markdown,
base_url="https://example.com"
cleaned_html=markdown, base_url="https://example.com"
)
assert "![" in result.markdown_with_citations, "Image markdown syntax should be preserved"
assert "Image Title" in result.references_markdown, "Image title should be in references"
assert (
"![" in result.markdown_with_citations
), "Image markdown syntax should be preserved"
assert (
"Image Title" in result.references_markdown
), "Image title should be in references"
if __name__ == "__main__":
print("Running markdown generation strategy tests...")
test_basic_markdown_conversion()
test_relative_links()
test_duplicate_links()
test_link_descriptions()
test_performance_large_document()
test_image_links()

View File

@@ -1,8 +1,6 @@
import os
import sys
import pytest
import asyncio
import json
# Add the parent directory to the Python path
parent_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
@@ -10,24 +8,37 @@ sys.path.append(parent_dir)
from crawl4ai.async_webcrawler import AsyncWebCrawler
@pytest.mark.asyncio
async def test_word_count_threshold():
async with AsyncWebCrawler(verbose=True) as crawler:
url = "https://www.nbcnews.com/business"
result_no_threshold = await crawler.arun(url=url, word_count_threshold=0, bypass_cache=True)
result_with_threshold = await crawler.arun(url=url, word_count_threshold=50, bypass_cache=True)
result_no_threshold = await crawler.arun(
url=url, word_count_threshold=0, bypass_cache=True
)
result_with_threshold = await crawler.arun(
url=url, word_count_threshold=50, bypass_cache=True
)
assert len(result_no_threshold.markdown) > len(result_with_threshold.markdown)
@pytest.mark.asyncio
async def test_css_selector():
async with AsyncWebCrawler(verbose=True) as crawler:
url = "https://www.nbcnews.com/business"
css_selector = "h1, h2, h3"
result = await crawler.arun(url=url, css_selector=css_selector, bypass_cache=True)
result = await crawler.arun(
url=url, css_selector=css_selector, bypass_cache=True
)
assert result.success
assert "<h1" in result.cleaned_html or "<h2" in result.cleaned_html or "<h3" in result.cleaned_html
assert (
"<h1" in result.cleaned_html
or "<h2" in result.cleaned_html
or "<h3" in result.cleaned_html
)
@pytest.mark.asyncio
async def test_javascript_execution():
@@ -36,59 +47,70 @@ async def test_javascript_execution():
# Crawl without JS
result_without_more = await crawler.arun(url=url, bypass_cache=True)
js_code = ["const loadMoreButton = Array.from(document.querySelectorAll('button')).find(button => button.textContent.includes('Load More')); loadMoreButton && loadMoreButton.click();"]
js_code = [
"const loadMoreButton = Array.from(document.querySelectorAll('button')).find(button => button.textContent.includes('Load More')); loadMoreButton && loadMoreButton.click();"
]
result_with_more = await crawler.arun(url=url, js=js_code, bypass_cache=True)
assert result_with_more.success
assert len(result_with_more.markdown) > len(result_without_more.markdown)
@pytest.mark.asyncio
async def test_screenshot():
async with AsyncWebCrawler(verbose=True) as crawler:
url = "https://www.nbcnews.com/business"
result = await crawler.arun(url=url, screenshot=True, bypass_cache=True)
assert result.success
assert result.screenshot
assert isinstance(result.screenshot, str) # Should be a base64 encoded string
@pytest.mark.asyncio
async def test_custom_user_agent():
async with AsyncWebCrawler(verbose=True) as crawler:
url = "https://www.nbcnews.com/business"
custom_user_agent = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36 Crawl4AI/1.0"
result = await crawler.arun(url=url, user_agent=custom_user_agent, bypass_cache=True)
result = await crawler.arun(
url=url, user_agent=custom_user_agent, bypass_cache=True
)
assert result.success
# Note: We can't directly verify the user agent in the result, but we can check if the crawl was successful
@pytest.mark.asyncio
async def test_extract_media_and_links():
async with AsyncWebCrawler(verbose=True) as crawler:
url = "https://www.nbcnews.com/business"
result = await crawler.arun(url=url, bypass_cache=True)
assert result.success
assert result.media
assert isinstance(result.media, dict)
assert 'images' in result.media
assert "images" in result.media
assert result.links
assert isinstance(result.links, dict)
assert 'internal' in result.links and 'external' in result.links
assert "internal" in result.links and "external" in result.links
@pytest.mark.asyncio
async def test_metadata_extraction():
async with AsyncWebCrawler(verbose=True) as crawler:
url = "https://www.nbcnews.com/business"
result = await crawler.arun(url=url, bypass_cache=True)
assert result.success
assert result.metadata
assert isinstance(result.metadata, dict)
# Check for common metadata fields
assert any(key in result.metadata for key in ['title', 'description', 'keywords'])
assert any(
key in result.metadata for key in ["title", "description", "keywords"]
)
# Entry point for debugging
if __name__ == "__main__":
pytest.main([__file__, "-v"])
pytest.main([__file__, "-v"])

View File

@@ -1,7 +1,6 @@
import os
import sys
import pytest
import asyncio
import time
# Add the parent directory to the Python path
@@ -10,6 +9,7 @@ sys.path.append(parent_dir)
from crawl4ai.async_webcrawler import AsyncWebCrawler
@pytest.mark.asyncio
async def test_crawl_speed():
async with AsyncWebCrawler(verbose=True) as crawler:
@@ -17,13 +17,14 @@ async def test_crawl_speed():
start_time = time.time()
result = await crawler.arun(url=url, bypass_cache=True)
end_time = time.time()
assert result.success
crawl_time = end_time - start_time
print(f"Crawl time: {crawl_time:.2f} seconds")
assert crawl_time < 10, f"Crawl took too long: {crawl_time:.2f} seconds"
@pytest.mark.asyncio
async def test_concurrent_crawling_performance():
async with AsyncWebCrawler(verbose=True) as crawler:
@@ -32,41 +33,47 @@ async def test_concurrent_crawling_performance():
"https://www.example.com",
"https://www.python.org",
"https://www.github.com",
"https://www.stackoverflow.com"
"https://www.stackoverflow.com",
]
start_time = time.time()
results = await crawler.arun_many(urls=urls, bypass_cache=True)
end_time = time.time()
total_time = end_time - start_time
print(f"Total time for concurrent crawling: {total_time:.2f} seconds")
assert all(result.success for result in results)
assert len(results) == len(urls)
assert total_time < len(urls) * 5, f"Concurrent crawling not significantly faster: {total_time:.2f} seconds"
assert (
total_time < len(urls) * 5
), f"Concurrent crawling not significantly faster: {total_time:.2f} seconds"
@pytest.mark.asyncio
async def test_crawl_speed_with_caching():
async with AsyncWebCrawler(verbose=True) as crawler:
url = "https://www.nbcnews.com/business"
start_time = time.time()
result1 = await crawler.arun(url=url, bypass_cache=True)
end_time = time.time()
first_crawl_time = end_time - start_time
start_time = time.time()
result2 = await crawler.arun(url=url, bypass_cache=False)
end_time = time.time()
second_crawl_time = end_time - start_time
assert result1.success and result2.success
print(f"First crawl time: {first_crawl_time:.2f} seconds")
print(f"Second crawl time (cached): {second_crawl_time:.2f} seconds")
assert second_crawl_time < first_crawl_time / 2, "Cached crawl not significantly faster"
assert (
second_crawl_time < first_crawl_time / 2
), "Cached crawl not significantly faster"
if __name__ == "__main__":
pytest.main([__file__, "-v"])
pytest.main([__file__, "-v"])

View File

@@ -1,7 +1,6 @@
import os
import sys
import pytest
import asyncio
import base64
from PIL import Image
import io
@@ -12,113 +11,112 @@ sys.path.append(parent_dir)
from crawl4ai.async_webcrawler import AsyncWebCrawler
@pytest.mark.asyncio
async def test_basic_screenshot():
async with AsyncWebCrawler(verbose=True) as crawler:
url = "https://example.com" # A static website
result = await crawler.arun(url=url, bypass_cache=True, screenshot=True)
assert result.success
assert result.screenshot is not None
# Verify the screenshot is a valid image
image_data = base64.b64decode(result.screenshot)
image = Image.open(io.BytesIO(image_data))
assert image.format == "PNG"
@pytest.mark.asyncio
async def test_screenshot_with_wait_for():
async with AsyncWebCrawler(verbose=True) as crawler:
# Using a website with dynamic content
url = "https://www.youtube.com"
wait_for = "css:#content" # Wait for the main content to load
result = await crawler.arun(
url=url,
bypass_cache=True,
screenshot=True,
wait_for=wait_for
url=url, bypass_cache=True, screenshot=True, wait_for=wait_for
)
assert result.success
assert result.screenshot is not None
# Verify the screenshot is a valid image
image_data = base64.b64decode(result.screenshot)
image = Image.open(io.BytesIO(image_data))
assert image.format == "PNG"
# You might want to add more specific checks here, like image dimensions
# or even use image recognition to verify certain elements are present
@pytest.mark.asyncio
async def test_screenshot_with_js_wait_for():
async with AsyncWebCrawler(verbose=True) as crawler:
url = "https://www.amazon.com"
wait_for = "js:() => document.querySelector('#nav-logo-sprites') !== null"
result = await crawler.arun(
url=url,
bypass_cache=True,
screenshot=True,
wait_for=wait_for
url=url, bypass_cache=True, screenshot=True, wait_for=wait_for
)
assert result.success
assert result.screenshot is not None
image_data = base64.b64decode(result.screenshot)
image = Image.open(io.BytesIO(image_data))
assert image.format == "PNG"
@pytest.mark.asyncio
async def test_screenshot_without_wait_for():
async with AsyncWebCrawler(verbose=True) as crawler:
url = "https://www.nytimes.com" # A website with lots of dynamic content
result = await crawler.arun(url=url, bypass_cache=True, screenshot=True)
assert result.success
assert result.screenshot is not None
image_data = base64.b64decode(result.screenshot)
image = Image.open(io.BytesIO(image_data))
assert image.format == "PNG"
@pytest.mark.asyncio
async def test_screenshot_comparison():
async with AsyncWebCrawler(verbose=True) as crawler:
url = "https://www.reddit.com"
wait_for = "css:#SHORTCUT_FOCUSABLE_DIV"
# Take screenshot without wait_for
result_without_wait = await crawler.arun(
url=url,
bypass_cache=True,
screenshot=True
url=url, bypass_cache=True, screenshot=True
)
# Take screenshot with wait_for
result_with_wait = await crawler.arun(
url=url,
bypass_cache=True,
screenshot=True,
wait_for=wait_for
url=url, bypass_cache=True, screenshot=True, wait_for=wait_for
)
assert result_without_wait.success and result_with_wait.success
assert result_without_wait.screenshot is not None
assert result_with_wait.screenshot is not None
# Compare the two screenshots
image_without_wait = Image.open(io.BytesIO(base64.b64decode(result_without_wait.screenshot)))
image_with_wait = Image.open(io.BytesIO(base64.b64decode(result_with_wait.screenshot)))
image_without_wait = Image.open(
io.BytesIO(base64.b64decode(result_without_wait.screenshot))
)
image_with_wait = Image.open(
io.BytesIO(base64.b64decode(result_with_wait.screenshot))
)
# This is a simple size comparison. In a real-world scenario, you might want to use
# more sophisticated image comparison techniques.
assert image_with_wait.size[0] >= image_without_wait.size[0]
assert image_with_wait.size[1] >= image_without_wait.size[1]
# Entry point for debugging
if __name__ == "__main__":
pytest.main([__file__, "-v"])
pytest.main([__file__, "-v"])