fix(models): make model fields optional with default values

Make fields in MediaItem and Link models optional with default values to prevent validation errors when data is incomplete. Also expose BaseDispatcher in __init__ and fix markdown field handling in database manager.

BREAKING CHANGE: MediaItem and Link model fields are now optional with default values which may affect existing code expecting required fields.
This commit is contained in:
UncleCode
2025-01-15 22:58:14 +08:00
parent 20c027b79c
commit 9d694da939
6 changed files with 681 additions and 12 deletions

View File

@@ -23,6 +23,7 @@ from .async_dispatcher import (
RateLimiter,
CrawlerMonitor,
DisplayMode,
BaseDispatcher
)
__all__ = [
@@ -43,6 +44,7 @@ __all__ = [
"DefaultMarkdownGenerator",
"PruningContentFilter",
"BM25ContentFilter",
"BaseDispatcher",
"MemoryAdaptiveDispatcher",
"SemaphoreDispatcher",
"RateLimiter",

View File

@@ -14,8 +14,8 @@ from .async_logger import AsyncLogger
from .utils import get_error_context, create_box_message
# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# logging.basicConfig(level=logging.INFO)
# logger = logging.getLogger(__name__)
base_directory = DB_PATH = os.path.join(
os.getenv("CRAWL4_AI_BASE_DIRECTORY", Path.home()), ".crawl4ai"
@@ -333,7 +333,11 @@ class AsyncDatabaseManager:
json.loads(row_dict[field]) if row_dict[field] else {}
)
except json.JSONDecodeError:
row_dict[field] = {}
# Very UGLY, never mention it to me please
if field == "markdown" and isinstance(row_dict[field], str):
row_dict[field] = row_dict[field]
else:
row_dict[field] = {}
if isinstance(row_dict["markdown"], Dict):
row_dict["markdown_v2"] = row_dict["markdown"]

View File

@@ -140,21 +140,21 @@ class AsyncCrawlResponse(BaseModel):
# Scraping Models
###############################
class MediaItem(BaseModel):
src: str
alt: Optional[str] = None
desc: Optional[str] = None
score: int
src: Optional[str] = ""
alt: Optional[str] = ""
desc: Optional[str] = ""
score: Optional[int] = 0
type: str = "image"
group_id: int
group_id: Optional[int] = 0
format: Optional[str] = None
width: Optional[int] = None
class Link(BaseModel):
href: str
text: str
title: Optional[str] = None
base_domain: str
href: Optional[str] = ""
text: Optional[str] = ""
title: Optional[str] = ""
base_domain: Optional[str] = ""
class Media(BaseModel):

View File

@@ -0,0 +1,343 @@
import pytest
import pytest_asyncio
import asyncio
from typing import Dict, Any
from pathlib import Path
from unittest.mock import MagicMock, patch
import os
from crawl4ai.async_configs import BrowserConfig, CrawlerRunConfig
from crawl4ai.async_crawler_strategy import AsyncPlaywrightCrawlerStrategy
from crawl4ai.models import AsyncCrawlResponse
from crawl4ai.async_logger import AsyncLogger, LogLevel
CRAWL4AI_HOME_DIR = Path(os.path.expanduser("~")).joinpath(".crawl4ai")
if not CRAWL4AI_HOME_DIR.joinpath("profiles", "test_profile").exists():
CRAWL4AI_HOME_DIR.joinpath("profiles", "test_profile").mkdir(parents=True)
# Test Config Files
@pytest.fixture
def basic_browser_config():
return BrowserConfig(
browser_type="chromium",
headless=True,
verbose=True
)
@pytest.fixture
def advanced_browser_config():
return BrowserConfig(
browser_type="chromium",
headless=True,
use_managed_browser=True,
user_data_dir=CRAWL4AI_HOME_DIR.joinpath("profiles", "test_profile"),
# proxy="http://localhost:8080",
viewport_width=1920,
viewport_height=1080,
user_agent_mode="random"
)
@pytest.fixture
def basic_crawler_config():
return CrawlerRunConfig(
word_count_threshold=100,
wait_until="domcontentloaded",
page_timeout=30000
)
@pytest.fixture
def logger():
return AsyncLogger(verbose=True, log_level=LogLevel.DEBUG)
@pytest_asyncio.fixture
async def crawler_strategy(basic_browser_config, logger):
strategy = AsyncPlaywrightCrawlerStrategy(browser_config=basic_browser_config, logger=logger)
await strategy.start()
yield strategy
await strategy.close()
# Browser Configuration Tests
@pytest.mark.asyncio
async def test_browser_config_initialization():
config = BrowserConfig(
browser_type="chromium",
user_agent_mode="random"
)
assert config.browser_type == "chromium"
assert config.user_agent is not None
assert config.headless is True
@pytest.mark.asyncio
async def test_persistent_browser_config():
config = BrowserConfig(
use_persistent_context=True,
user_data_dir="/tmp/test_dir"
)
assert config.use_managed_browser is True
assert config.user_data_dir == "/tmp/test_dir"
# Crawler Strategy Tests
@pytest.mark.asyncio
async def test_basic_page_load(crawler_strategy):
response = await crawler_strategy.crawl(
"https://example.com",
CrawlerRunConfig()
)
assert response.status_code == 200
assert len(response.html) > 0
assert "Example Domain" in response.html
@pytest.mark.asyncio
async def test_screenshot_capture(crawler_strategy):
config = CrawlerRunConfig(screenshot=True)
response = await crawler_strategy.crawl(
"https://example.com",
config
)
assert response.screenshot is not None
assert len(response.screenshot) > 0
@pytest.mark.asyncio
async def test_pdf_generation(crawler_strategy):
config = CrawlerRunConfig(pdf=True)
response = await crawler_strategy.crawl(
"https://example.com",
config
)
assert response.pdf_data is not None
assert len(response.pdf_data) > 0
@pytest.mark.asyncio
async def test_handle_js_execution(crawler_strategy):
config = CrawlerRunConfig(
js_code="document.body.style.backgroundColor = 'red';"
)
response = await crawler_strategy.crawl(
"https://example.com",
config
)
assert response.status_code == 200
assert 'background-color: red' in response.html.lower()
@pytest.mark.asyncio
async def test_multiple_js_commands(crawler_strategy):
js_commands = [
"document.body.style.backgroundColor = 'blue';",
"document.title = 'Modified Title';",
"const div = document.createElement('div'); div.id = 'test'; div.textContent = 'Test Content'; document.body.appendChild(div);"
]
config = CrawlerRunConfig(js_code=js_commands)
response = await crawler_strategy.crawl(
"https://example.com",
config
)
assert response.status_code == 200
assert 'background-color: blue' in response.html.lower()
assert 'id="test"' in response.html
assert '>Test Content<' in response.html
assert '<title>Modified Title</title>' in response.html
@pytest.mark.asyncio
async def test_complex_dom_manipulation(crawler_strategy):
js_code = """
// Create a complex structure
const container = document.createElement('div');
container.className = 'test-container';
const list = document.createElement('ul');
list.className = 'test-list';
for (let i = 1; i <= 3; i++) {
const item = document.createElement('li');
item.textContent = `Item ${i}`;
item.className = `item-${i}`;
list.appendChild(item);
}
container.appendChild(list);
document.body.appendChild(container);
"""
config = CrawlerRunConfig(js_code=js_code)
response = await crawler_strategy.crawl(
"https://example.com",
config
)
assert response.status_code == 200
assert 'class="test-container"' in response.html
assert 'class="test-list"' in response.html
assert 'class="item-1"' in response.html
assert '>Item 1<' in response.html
assert '>Item 2<' in response.html
assert '>Item 3<' in response.html
@pytest.mark.asyncio
async def test_style_modifications(crawler_strategy):
js_code = """
const testDiv = document.createElement('div');
testDiv.id = 'style-test';
testDiv.style.cssText = 'color: green; font-size: 20px; margin: 10px;';
testDiv.textContent = 'Styled Content';
document.body.appendChild(testDiv);
"""
config = CrawlerRunConfig(js_code=js_code)
response = await crawler_strategy.crawl(
"https://example.com",
config
)
assert response.status_code == 200
assert 'id="style-test"' in response.html
assert 'color: green' in response.html.lower()
assert 'font-size: 20px' in response.html.lower()
assert 'margin: 10px' in response.html.lower()
assert '>Styled Content<' in response.html
@pytest.mark.asyncio
async def test_dynamic_content_loading(crawler_strategy):
js_code = """
// Simulate dynamic content loading
setTimeout(() => {
const dynamic = document.createElement('div');
dynamic.id = 'dynamic-content';
dynamic.textContent = 'Dynamically Loaded';
document.body.appendChild(dynamic);
}, 1000);
// Add a loading indicator immediately
const loading = document.createElement('div');
loading.id = 'loading';
loading.textContent = 'Loading...';
document.body.appendChild(loading);
"""
config = CrawlerRunConfig(js_code=js_code, delay_before_return_html=2.0)
response = await crawler_strategy.crawl(
"https://example.com",
config
)
assert response.status_code == 200
assert 'id="loading"' in response.html
assert '>Loading...</' in response.html
assert 'dynamic-content' in response.html
assert '>Dynamically Loaded<' in response.html
# @pytest.mark.asyncio
# async def test_js_return_values(crawler_strategy):
# js_code = """
# return {
# title: document.title,
# metaCount: document.getElementsByTagName('meta').length,
# bodyClass: document.body.className
# };
# """
# config = CrawlerRunConfig(js_code=js_code)
# response = await crawler_strategy.crawl(
# "https://example.com",
# config
# )
# assert response.status_code == 200
# assert 'Example Domain' in response.html
# assert 'meta name="viewport"' in response.html
# assert 'class="main"' in response.html
@pytest.mark.asyncio
async def test_async_js_execution(crawler_strategy):
js_code = """
await new Promise(resolve => setTimeout(resolve, 1000));
document.body.style.color = 'green';
const computedStyle = window.getComputedStyle(document.body);
return computedStyle.color;
"""
config = CrawlerRunConfig(js_code=js_code)
response = await crawler_strategy.crawl(
"https://example.com",
config
)
assert response.status_code == 200
assert 'color: green' in response.html.lower()
# @pytest.mark.asyncio
# async def test_js_error_handling(crawler_strategy):
# js_code = """
# // Intentionally cause different types of errors
# const results = [];
# try {
# nonExistentFunction();
# } catch (e) {
# results.push(e.name);
# }
# try {
# JSON.parse('{invalid}');
# } catch (e) {
# results.push(e.name);
# }
# return results;
# """
# config = CrawlerRunConfig(js_code=js_code)
# response = await crawler_strategy.crawl(
# "https://example.com",
# config
# )
# assert response.status_code == 200
# assert 'ReferenceError' in response.html
# assert 'SyntaxError' in response.html
@pytest.mark.asyncio
async def test_handle_navigation_timeout():
config = CrawlerRunConfig(page_timeout=1) # 1ms timeout
with pytest.raises(Exception):
async with AsyncPlaywrightCrawlerStrategy() as strategy:
await strategy.crawl("https://example.com", config)
@pytest.mark.asyncio
async def test_session_management(crawler_strategy):
config = CrawlerRunConfig(session_id="test_session")
response1 = await crawler_strategy.crawl(
"https://example.com",
config
)
response2 = await crawler_strategy.crawl(
"https://example.com",
config
)
assert response1.status_code == 200
assert response2.status_code == 200
@pytest.mark.asyncio
async def test_process_iframes(crawler_strategy):
config = CrawlerRunConfig(
process_iframes=True,
wait_for_images=True
)
response = await crawler_strategy.crawl(
"https://example.com",
config
)
assert response.status_code == 200
@pytest.mark.asyncio
async def test_stealth_mode(crawler_strategy):
config = CrawlerRunConfig(
simulate_user=True,
override_navigator=True
)
response = await crawler_strategy.crawl(
"https://bot.sannysoft.com",
config
)
assert response.status_code == 200
# Error Handling Tests
@pytest.mark.asyncio
async def test_invalid_url():
with pytest.raises(ValueError):
async with AsyncPlaywrightCrawlerStrategy() as strategy:
await strategy.crawl("not_a_url", CrawlerRunConfig())
@pytest.mark.asyncio
async def test_network_error_handling():
config = CrawlerRunConfig()
with pytest.raises(Exception):
async with AsyncPlaywrightCrawlerStrategy() as strategy:
await strategy.crawl("https://invalid.example.com", config)
if __name__ == "__main__":
pytest.main([__file__, "-v"])

View File

@@ -0,0 +1,171 @@
import asyncio
from typing import Dict
from crawl4ai.content_filter_strategy import BM25ContentFilter, PruningContentFilter
from crawl4ai.markdown_generation_strategy import DefaultMarkdownGenerator
import time
# Test HTML samples
TEST_HTML_SAMPLES = {
"basic": """
<body>
<h1>Test Title</h1>
<p>This is a test paragraph with <a href="http://example.com">a link</a>.</p>
<div class="content">
<h2>Section 1</h2>
<p>More content here with <b>bold text</b>.</p>
</div>
</body>
""",
"complex": """
<body>
<nav>Navigation menu that should be removed</nav>
<header>Header content to remove</header>
<main>
<article>
<h1>Main Article</h1>
<p>Important content paragraph with <a href="http://test.com">useful link</a>.</p>
<section>
<h2>Key Section</h2>
<p>Detailed explanation with multiple sentences. This should be kept
in the final output. Very important information here.</p>
</section>
</article>
<aside>Sidebar content to remove</aside>
</main>
<footer>Footer content to remove</footer>
</body>
""",
"edge_cases": """
<body>
<div>
<p></p>
<p> </p>
<script>alert('remove me');</script>
<div class="advertisement">Ad content to remove</div>
<p class="social-share">Share buttons to remove</p>
<h1>!!Special>> Characters## Title!!</h1>
<pre><code>def test(): pass</code></pre>
</div>
</body>
""",
"links_citations": """
<body>
<h1>Document with Links</h1>
<p>First link to <a href="http://example.com/1">Example 1</a></p>
<p>Second link to <a href="http://example.com/2" title="Example 2">Test 2</a></p>
<p>Image link: <img src="test.jpg" alt="test image"></p>
<p>Repeated link to <a href="http://example.com/1">Example 1 again</a></p>
</body>
""",
}
def test_content_filters() -> Dict[str, Dict[str, int]]:
"""Test various content filtering strategies and return length comparisons."""
results = {}
# Initialize filters
pruning_filter = PruningContentFilter(
threshold=0.48,
threshold_type="fixed",
min_word_threshold=2
)
bm25_filter = BM25ContentFilter(
bm25_threshold=1.0,
user_query="test article content important"
)
# Test each HTML sample
for test_name, html in TEST_HTML_SAMPLES.items():
# Store results for this test case
results[test_name] = {}
# Test PruningContentFilter
start_time = time.time()
pruned_content = pruning_filter.filter_content(html)
pruning_time = time.time() - start_time
# Test BM25ContentFilter
start_time = time.time()
bm25_content = bm25_filter.filter_content(html)
bm25_time = time.time() - start_time
# Store results
results[test_name] = {
"original_length": len(html),
"pruned_length": sum(len(c) for c in pruned_content),
"bm25_length": sum(len(c) for c in bm25_content),
"pruning_time": pruning_time,
"bm25_time": bm25_time
}
return results
def test_markdown_generation():
"""Test markdown generation with different configurations."""
results = []
# Initialize generators with different configurations
generators = {
"no_filter": DefaultMarkdownGenerator(),
"pruning": DefaultMarkdownGenerator(
content_filter=PruningContentFilter(threshold=0.48)
),
"bm25": DefaultMarkdownGenerator(
content_filter=BM25ContentFilter(
user_query="test article content important"
)
)
}
# Test each generator with each HTML sample
for test_name, html in TEST_HTML_SAMPLES.items():
for gen_name, generator in generators.items():
start_time = time.time()
result = generator.generate_markdown(
html,
base_url="http://example.com",
citations=True
)
results.append({
"test_case": test_name,
"generator": gen_name,
"time": time.time() - start_time,
"raw_length": len(result.raw_markdown),
"fit_length": len(result.fit_markdown) if result.fit_markdown else 0,
"citations": len(result.references_markdown)
})
return results
def main():
"""Run all tests and print results."""
print("Starting content filter tests...")
filter_results = test_content_filters()
print("\nContent Filter Results:")
print("-" * 50)
for test_name, metrics in filter_results.items():
print(f"\nTest case: {test_name}")
print(f"Original length: {metrics['original_length']}")
print(f"Pruned length: {metrics['pruned_length']} ({metrics['pruning_time']:.3f}s)")
print(f"BM25 length: {metrics['bm25_length']} ({metrics['bm25_time']:.3f}s)")
print("\nStarting markdown generation tests...")
markdown_results = test_markdown_generation()
print("\nMarkdown Generation Results:")
print("-" * 50)
for result in markdown_results:
print(f"\nTest: {result['test_case']} - Generator: {result['generator']}")
print(f"Time: {result['time']:.3f}s")
print(f"Raw length: {result['raw_length']}")
print(f"Fit length: {result['fit_length']}")
print(f"Citations: {result['citations']}")
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,149 @@
import asyncio
import pytest
from typing import List
from crawl4ai import (
AsyncWebCrawler,
BrowserConfig,
CrawlerRunConfig,
MemoryAdaptiveDispatcher,
RateLimiter,
CacheMode
)
@pytest.mark.asyncio
@pytest.mark.parametrize("viewport", [
(800, 600),
(1024, 768),
(1920, 1080)
])
async def test_viewport_config(viewport):
"""Test different viewport configurations"""
width, height = viewport
browser_config = BrowserConfig(
browser_type="chromium",
headless=True,
viewport_width=width,
viewport_height=height
)
async with AsyncWebCrawler(config=browser_config) as crawler:
result = await crawler.arun(
url="https://example.com",
config=CrawlerRunConfig(
# cache_mode=CacheMode.BYPASS,
page_timeout=30000 # 30 seconds
)
)
assert result.success
@pytest.mark.asyncio
async def test_memory_management():
"""Test memory-adaptive dispatching"""
browser_config = BrowserConfig(
browser_type="chromium",
headless=True,
viewport_width=1024,
viewport_height=768
)
dispatcher = MemoryAdaptiveDispatcher(
memory_threshold_percent=70.0,
check_interval=1.0,
max_session_permit=5
)
urls = ["https://example.com"] * 3 # Test with multiple identical URLs
async with AsyncWebCrawler(config=browser_config) as crawler:
results = await crawler.arun_many(
urls=urls,
config=CrawlerRunConfig(page_timeout=30000),
dispatcher=dispatcher
)
assert len(results) == len(urls)
@pytest.mark.asyncio
async def test_rate_limiting():
"""Test rate limiting functionality"""
browser_config = BrowserConfig(
browser_type="chromium",
headless=True
)
dispatcher = MemoryAdaptiveDispatcher(
rate_limiter=RateLimiter(
base_delay=(1.0, 2.0),
max_delay=5.0,
max_retries=2
),
memory_threshold_percent=70.0
)
urls = [
"https://example.com",
"https://example.org",
"https://example.net"
]
async with AsyncWebCrawler(config=browser_config) as crawler:
results = await crawler.arun_many(
urls=urls,
config=CrawlerRunConfig(page_timeout=30000),
dispatcher=dispatcher
)
assert len(results) == len(urls)
@pytest.mark.asyncio
async def test_javascript_execution():
"""Test JavaScript execution capabilities"""
browser_config = BrowserConfig(
browser_type="chromium",
headless=True,
java_script_enabled=True
)
js_code = """
document.body.style.backgroundColor = 'red';
return document.body.style.backgroundColor;
"""
async with AsyncWebCrawler(config=browser_config) as crawler:
result = await crawler.arun(
url="https://example.com",
config=CrawlerRunConfig(
js_code=js_code,
page_timeout=30000
)
)
assert result.success
@pytest.mark.asyncio
@pytest.mark.parametrize("error_url", [
"https://invalid.domain.test",
"https://httpbin.org/status/404",
"https://httpbin.org/status/503",
"https://httpbin.org/status/403"
])
async def test_error_handling(error_url):
"""Test error handling for various failure scenarios"""
browser_config = BrowserConfig(
browser_type="chromium",
headless=True
)
async with AsyncWebCrawler(config=browser_config) as crawler:
result = await crawler.arun(
url=error_url,
config=CrawlerRunConfig(
page_timeout=10000, # Short timeout for error cases
cache_mode=CacheMode.BYPASS
)
)
assert not result.success
assert result.error_message is not None
if __name__ == "__main__":
asyncio.run(test_viewport_config((1024, 768)))
asyncio.run(test_memory_management())
asyncio.run(test_rate_limiting())
asyncio.run(test_javascript_execution())