feat(crawler): add network request and console message capturing

Implement comprehensive network request and console message capturing functionality:
- Add capture_network_requests and capture_console_messages config parameters
- Add network_requests and console_messages fields to models
- Implement Playwright event listeners to capture requests, responses, and console output
- Create detailed documentation and examples
- Add comprehensive tests

This feature enables deep visibility into web page activity for debugging,
security analysis, performance profiling, and API discovery in web applications.
This commit is contained in:
unclecode
2025-04-10 16:03:48 +08:00
parent a2061bf31e
commit 66ac07b4f3
31 changed files with 1686 additions and 10 deletions

View File

@@ -0,0 +1,56 @@
import asyncio
from crawl4ai import (
AsyncWebCrawler,
CrawlerRunConfig,
HTTPCrawlerConfig,
CacheMode,
DefaultMarkdownGenerator,
PruningContentFilter
)
from crawl4ai.async_crawler_strategy import AsyncHTTPCrawlerStrategy
from crawl4ai.async_logger import AsyncLogger
async def main():
# Initialize HTTP crawler strategy
http_strategy = AsyncHTTPCrawlerStrategy(
browser_config=HTTPCrawlerConfig(
method="GET",
verify_ssl=True,
follow_redirects=True
),
logger=AsyncLogger(verbose=True)
)
# Initialize web crawler with HTTP strategy
async with AsyncWebCrawler(crawler_strategy=http_strategy) as crawler:
crawler_config = CrawlerRunConfig(
cache_mode=CacheMode.BYPASS,
markdown_generator=DefaultMarkdownGenerator(
content_filter=PruningContentFilter(
threshold=0.48,
threshold_type="fixed",
min_word_threshold=0
)
)
)
# Test different URLs
urls = [
"https://example.com",
"https://httpbin.org/get",
"raw://<html><body>Test content</body></html>"
]
for url in urls:
print(f"\n=== Testing {url} ===")
try:
result = await crawler.arun(url=url, config=crawler_config)
print(f"Status: {result.status_code}")
print(f"Raw HTML length: {len(result.html)}")
if hasattr(result, 'markdown'):
print(f"Markdown length: {len(result.markdown.raw_markdown)}")
except Exception as e:
print(f"Error: {e}")
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -0,0 +1,46 @@
import asyncio
import time
from crawl4ai import CrawlerRunConfig, AsyncWebCrawler, CacheMode
from crawl4ai.content_scraping_strategy import LXMLWebScrapingStrategy
from crawl4ai.deep_crawling import BFSDeepCrawlStrategy, BestFirstCrawlingStrategy
from crawl4ai.deep_crawling.filters import FilterChain, URLPatternFilter, DomainFilter, ContentTypeFilter, ContentRelevanceFilter
from crawl4ai.deep_crawling.scorers import KeywordRelevanceScorer
# from crawl4ai.deep_crawling import BFSDeepCrawlStrategy, BestFirstCrawlingStrategy
async def main():
"""Example deep crawl of documentation site."""
filter_chain = FilterChain([
URLPatternFilter(patterns=["*2025*"]),
DomainFilter(allowed_domains=["techcrunch.com"]),
ContentRelevanceFilter(query="Use of artificial intelligence in Defence applications", threshold=1),
ContentTypeFilter(allowed_types=["text/html","application/javascript"])
])
config = CrawlerRunConfig(
deep_crawl_strategy = BestFirstCrawlingStrategy(
max_depth=2,
include_external=False,
filter_chain=filter_chain,
url_scorer=KeywordRelevanceScorer(keywords=["anduril", "defence", "AI"]),
),
stream=False,
verbose=True,
cache_mode=CacheMode.BYPASS,
scraping_strategy=LXMLWebScrapingStrategy()
)
async with AsyncWebCrawler() as crawler:
print("Starting deep crawl in streaming mode:")
config.stream = True
start_time = time.perf_counter()
async for result in await crawler.arun(
url="https://techcrunch.com",
config=config
):
print(f"{result.url} (Depth: {result.metadata.get('depth', 0)})")
print(f"Duration: {time.perf_counter() - start_time:.2f} seconds")
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -0,0 +1,343 @@
import pytest
import pytest_asyncio
import asyncio
from typing import Dict, Any
from pathlib import Path
from unittest.mock import MagicMock, patch
import os
from crawl4ai.async_configs import BrowserConfig, CrawlerRunConfig
from crawl4ai.async_crawler_strategy import AsyncPlaywrightCrawlerStrategy
from crawl4ai.models import AsyncCrawlResponse
from crawl4ai.async_logger import AsyncLogger, LogLevel
CRAWL4AI_HOME_DIR = Path(os.path.expanduser("~")).joinpath(".crawl4ai")
if not CRAWL4AI_HOME_DIR.joinpath("profiles", "test_profile").exists():
CRAWL4AI_HOME_DIR.joinpath("profiles", "test_profile").mkdir(parents=True)
# Test Config Files
@pytest.fixture
def basic_browser_config():
return BrowserConfig(
browser_type="chromium",
headless=True,
verbose=True
)
@pytest.fixture
def advanced_browser_config():
return BrowserConfig(
browser_type="chromium",
headless=True,
use_managed_browser=True,
user_data_dir=CRAWL4AI_HOME_DIR.joinpath("profiles", "test_profile"),
# proxy="http://localhost:8080",
viewport_width=1920,
viewport_height=1080,
user_agent_mode="random"
)
@pytest.fixture
def basic_crawler_config():
return CrawlerRunConfig(
word_count_threshold=100,
wait_until="domcontentloaded",
page_timeout=30000
)
@pytest.fixture
def logger():
return AsyncLogger(verbose=True, log_level=LogLevel.DEBUG)
@pytest_asyncio.fixture
async def crawler_strategy(basic_browser_config, logger):
strategy = AsyncPlaywrightCrawlerStrategy(browser_config=basic_browser_config, logger=logger)
await strategy.start()
yield strategy
await strategy.close()
# Browser Configuration Tests
@pytest.mark.asyncio
async def test_browser_config_initialization():
config = BrowserConfig(
browser_type="chromium",
user_agent_mode="random"
)
assert config.browser_type == "chromium"
assert config.user_agent is not None
assert config.headless is True
@pytest.mark.asyncio
async def test_persistent_browser_config():
config = BrowserConfig(
use_persistent_context=True,
user_data_dir="/tmp/test_dir"
)
assert config.use_managed_browser is True
assert config.user_data_dir == "/tmp/test_dir"
# Crawler Strategy Tests
@pytest.mark.asyncio
async def test_basic_page_load(crawler_strategy):
response = await crawler_strategy.crawl(
"https://example.com",
CrawlerRunConfig()
)
assert response.status_code == 200
assert len(response.html) > 0
assert "Example Domain" in response.html
@pytest.mark.asyncio
async def test_screenshot_capture(crawler_strategy):
config = CrawlerRunConfig(screenshot=True)
response = await crawler_strategy.crawl(
"https://example.com",
config
)
assert response.screenshot is not None
assert len(response.screenshot) > 0
@pytest.mark.asyncio
async def test_pdf_generation(crawler_strategy):
config = CrawlerRunConfig(pdf=True)
response = await crawler_strategy.crawl(
"https://example.com",
config
)
assert response.pdf_data is not None
assert len(response.pdf_data) > 0
@pytest.mark.asyncio
async def test_handle_js_execution(crawler_strategy):
config = CrawlerRunConfig(
js_code="document.body.style.backgroundColor = 'red';"
)
response = await crawler_strategy.crawl(
"https://example.com",
config
)
assert response.status_code == 200
assert 'background-color: red' in response.html.lower()
@pytest.mark.asyncio
async def test_multiple_js_commands(crawler_strategy):
js_commands = [
"document.body.style.backgroundColor = 'blue';",
"document.title = 'Modified Title';",
"const div = document.createElement('div'); div.id = 'test'; div.textContent = 'Test Content'; document.body.appendChild(div);"
]
config = CrawlerRunConfig(js_code=js_commands)
response = await crawler_strategy.crawl(
"https://example.com",
config
)
assert response.status_code == 200
assert 'background-color: blue' in response.html.lower()
assert 'id="test"' in response.html
assert '>Test Content<' in response.html
assert '<title>Modified Title</title>' in response.html
@pytest.mark.asyncio
async def test_complex_dom_manipulation(crawler_strategy):
js_code = """
// Create a complex structure
const container = document.createElement('div');
container.className = 'test-container';
const list = document.createElement('ul');
list.className = 'test-list';
for (let i = 1; i <= 3; i++) {
const item = document.createElement('li');
item.textContent = `Item ${i}`;
item.className = `item-${i}`;
list.appendChild(item);
}
container.appendChild(list);
document.body.appendChild(container);
"""
config = CrawlerRunConfig(js_code=js_code)
response = await crawler_strategy.crawl(
"https://example.com",
config
)
assert response.status_code == 200
assert 'class="test-container"' in response.html
assert 'class="test-list"' in response.html
assert 'class="item-1"' in response.html
assert '>Item 1<' in response.html
assert '>Item 2<' in response.html
assert '>Item 3<' in response.html
@pytest.mark.asyncio
async def test_style_modifications(crawler_strategy):
js_code = """
const testDiv = document.createElement('div');
testDiv.id = 'style-test';
testDiv.style.cssText = 'color: green; font-size: 20px; margin: 10px;';
testDiv.textContent = 'Styled Content';
document.body.appendChild(testDiv);
"""
config = CrawlerRunConfig(js_code=js_code)
response = await crawler_strategy.crawl(
"https://example.com",
config
)
assert response.status_code == 200
assert 'id="style-test"' in response.html
assert 'color: green' in response.html.lower()
assert 'font-size: 20px' in response.html.lower()
assert 'margin: 10px' in response.html.lower()
assert '>Styled Content<' in response.html
@pytest.mark.asyncio
async def test_dynamic_content_loading(crawler_strategy):
js_code = """
// Simulate dynamic content loading
setTimeout(() => {
const dynamic = document.createElement('div');
dynamic.id = 'dynamic-content';
dynamic.textContent = 'Dynamically Loaded';
document.body.appendChild(dynamic);
}, 1000);
// Add a loading indicator immediately
const loading = document.createElement('div');
loading.id = 'loading';
loading.textContent = 'Loading...';
document.body.appendChild(loading);
"""
config = CrawlerRunConfig(js_code=js_code, delay_before_return_html=2.0)
response = await crawler_strategy.crawl(
"https://example.com",
config
)
assert response.status_code == 200
assert 'id="loading"' in response.html
assert '>Loading...</' in response.html
assert 'dynamic-content' in response.html
assert '>Dynamically Loaded<' in response.html
# @pytest.mark.asyncio
# async def test_js_return_values(crawler_strategy):
# js_code = """
# return {
# title: document.title,
# metaCount: document.getElementsByTagName('meta').length,
# bodyClass: document.body.className
# };
# """
# config = CrawlerRunConfig(js_code=js_code)
# response = await crawler_strategy.crawl(
# "https://example.com",
# config
# )
# assert response.status_code == 200
# assert 'Example Domain' in response.html
# assert 'meta name="viewport"' in response.html
# assert 'class="main"' in response.html
@pytest.mark.asyncio
async def test_async_js_execution(crawler_strategy):
js_code = """
await new Promise(resolve => setTimeout(resolve, 1000));
document.body.style.color = 'green';
const computedStyle = window.getComputedStyle(document.body);
return computedStyle.color;
"""
config = CrawlerRunConfig(js_code=js_code)
response = await crawler_strategy.crawl(
"https://example.com",
config
)
assert response.status_code == 200
assert 'color: green' in response.html.lower()
# @pytest.mark.asyncio
# async def test_js_error_handling(crawler_strategy):
# js_code = """
# // Intentionally cause different types of errors
# const results = [];
# try {
# nonExistentFunction();
# } catch (e) {
# results.push(e.name);
# }
# try {
# JSON.parse('{invalid}');
# } catch (e) {
# results.push(e.name);
# }
# return results;
# """
# config = CrawlerRunConfig(js_code=js_code)
# response = await crawler_strategy.crawl(
# "https://example.com",
# config
# )
# assert response.status_code == 200
# assert 'ReferenceError' in response.html
# assert 'SyntaxError' in response.html
@pytest.mark.asyncio
async def test_handle_navigation_timeout():
config = CrawlerRunConfig(page_timeout=1) # 1ms timeout
with pytest.raises(Exception):
async with AsyncPlaywrightCrawlerStrategy() as strategy:
await strategy.crawl("https://example.com", config)
@pytest.mark.asyncio
async def test_session_management(crawler_strategy):
config = CrawlerRunConfig(session_id="test_session")
response1 = await crawler_strategy.crawl(
"https://example.com",
config
)
response2 = await crawler_strategy.crawl(
"https://example.com",
config
)
assert response1.status_code == 200
assert response2.status_code == 200
@pytest.mark.asyncio
async def test_process_iframes(crawler_strategy):
config = CrawlerRunConfig(
process_iframes=True,
wait_for_images=True
)
response = await crawler_strategy.crawl(
"https://example.com",
config
)
assert response.status_code == 200
@pytest.mark.asyncio
async def test_stealth_mode(crawler_strategy):
config = CrawlerRunConfig(
simulate_user=True,
override_navigator=True
)
response = await crawler_strategy.crawl(
"https://bot.sannysoft.com",
config
)
assert response.status_code == 200
# Error Handling Tests
@pytest.mark.asyncio
async def test_invalid_url():
with pytest.raises(ValueError):
async with AsyncPlaywrightCrawlerStrategy() as strategy:
await strategy.crawl("not_a_url", CrawlerRunConfig())
@pytest.mark.asyncio
async def test_network_error_handling():
config = CrawlerRunConfig()
with pytest.raises(Exception):
async with AsyncPlaywrightCrawlerStrategy() as strategy:
await strategy.crawl("https://invalid.example.com", config)
if __name__ == "__main__":
pytest.main([__file__, "-v"])

View File

@@ -0,0 +1,171 @@
import asyncio
from typing import Dict
from crawl4ai.content_filter_strategy import BM25ContentFilter, PruningContentFilter
from crawl4ai.markdown_generation_strategy import DefaultMarkdownGenerator
import time
# Test HTML samples
TEST_HTML_SAMPLES = {
"basic": """
<body>
<h1>Test Title</h1>
<p>This is a test paragraph with <a href="http://example.com">a link</a>.</p>
<div class="content">
<h2>Section 1</h2>
<p>More content here with <b>bold text</b>.</p>
</div>
</body>
""",
"complex": """
<body>
<nav>Navigation menu that should be removed</nav>
<header>Header content to remove</header>
<main>
<article>
<h1>Main Article</h1>
<p>Important content paragraph with <a href="http://test.com">useful link</a>.</p>
<section>
<h2>Key Section</h2>
<p>Detailed explanation with multiple sentences. This should be kept
in the final output. Very important information here.</p>
</section>
</article>
<aside>Sidebar content to remove</aside>
</main>
<footer>Footer content to remove</footer>
</body>
""",
"edge_cases": """
<body>
<div>
<p></p>
<p> </p>
<script>alert('remove me');</script>
<div class="advertisement">Ad content to remove</div>
<p class="social-share">Share buttons to remove</p>
<h1>!!Special>> Characters## Title!!</h1>
<pre><code>def test(): pass</code></pre>
</div>
</body>
""",
"links_citations": """
<body>
<h1>Document with Links</h1>
<p>First link to <a href="http://example.com/1">Example 1</a></p>
<p>Second link to <a href="http://example.com/2" title="Example 2">Test 2</a></p>
<p>Image link: <img src="test.jpg" alt="test image"></p>
<p>Repeated link to <a href="http://example.com/1">Example 1 again</a></p>
</body>
""",
}
def test_content_filters() -> Dict[str, Dict[str, int]]:
"""Test various content filtering strategies and return length comparisons."""
results = {}
# Initialize filters
pruning_filter = PruningContentFilter(
threshold=0.48,
threshold_type="fixed",
min_word_threshold=2
)
bm25_filter = BM25ContentFilter(
bm25_threshold=1.0,
user_query="test article content important"
)
# Test each HTML sample
for test_name, html in TEST_HTML_SAMPLES.items():
# Store results for this test case
results[test_name] = {}
# Test PruningContentFilter
start_time = time.time()
pruned_content = pruning_filter.filter_content(html)
pruning_time = time.time() - start_time
# Test BM25ContentFilter
start_time = time.time()
bm25_content = bm25_filter.filter_content(html)
bm25_time = time.time() - start_time
# Store results
results[test_name] = {
"original_length": len(html),
"pruned_length": sum(len(c) for c in pruned_content),
"bm25_length": sum(len(c) for c in bm25_content),
"pruning_time": pruning_time,
"bm25_time": bm25_time
}
return results
def test_markdown_generation():
"""Test markdown generation with different configurations."""
results = []
# Initialize generators with different configurations
generators = {
"no_filter": DefaultMarkdownGenerator(),
"pruning": DefaultMarkdownGenerator(
content_filter=PruningContentFilter(threshold=0.48)
),
"bm25": DefaultMarkdownGenerator(
content_filter=BM25ContentFilter(
user_query="test article content important"
)
)
}
# Test each generator with each HTML sample
for test_name, html in TEST_HTML_SAMPLES.items():
for gen_name, generator in generators.items():
start_time = time.time()
result = generator.generate_markdown(
html,
base_url="http://example.com",
citations=True
)
results.append({
"test_case": test_name,
"generator": gen_name,
"time": time.time() - start_time,
"raw_length": len(result.raw_markdown),
"fit_length": len(result.fit_markdown) if result.fit_markdown else 0,
"citations": len(result.references_markdown)
})
return results
def main():
"""Run all tests and print results."""
print("Starting content filter tests...")
filter_results = test_content_filters()
print("\nContent Filter Results:")
print("-" * 50)
for test_name, metrics in filter_results.items():
print(f"\nTest case: {test_name}")
print(f"Original length: {metrics['original_length']}")
print(f"Pruned length: {metrics['pruned_length']} ({metrics['pruning_time']:.3f}s)")
print(f"BM25 length: {metrics['bm25_length']} ({metrics['bm25_time']:.3f}s)")
print("\nStarting markdown generation tests...")
markdown_results = test_markdown_generation()
print("\nMarkdown Generation Results:")
print("-" * 50)
for result in markdown_results:
print(f"\nTest: {result['test_case']} - Generator: {result['generator']}")
print(f"Time: {result['time']:.3f}s")
print(f"Raw length: {result['raw_length']}")
print(f"Fit length: {result['fit_length']}")
print(f"Citations: {result['citations']}")
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,149 @@
import asyncio
import pytest
from typing import List
from crawl4ai import (
AsyncWebCrawler,
BrowserConfig,
CrawlerRunConfig,
MemoryAdaptiveDispatcher,
RateLimiter,
CacheMode
)
@pytest.mark.asyncio
@pytest.mark.parametrize("viewport", [
(800, 600),
(1024, 768),
(1920, 1080)
])
async def test_viewport_config(viewport):
"""Test different viewport configurations"""
width, height = viewport
browser_config = BrowserConfig(
browser_type="chromium",
headless=True,
viewport_width=width,
viewport_height=height
)
async with AsyncWebCrawler(config=browser_config) as crawler:
result = await crawler.arun(
url="https://example.com",
config=CrawlerRunConfig(
# cache_mode=CacheMode.BYPASS,
page_timeout=30000 # 30 seconds
)
)
assert result.success
@pytest.mark.asyncio
async def test_memory_management():
"""Test memory-adaptive dispatching"""
browser_config = BrowserConfig(
browser_type="chromium",
headless=True,
viewport_width=1024,
viewport_height=768
)
dispatcher = MemoryAdaptiveDispatcher(
memory_threshold_percent=70.0,
check_interval=1.0,
max_session_permit=5
)
urls = ["https://example.com"] * 3 # Test with multiple identical URLs
async with AsyncWebCrawler(config=browser_config) as crawler:
results = await crawler.arun_many(
urls=urls,
config=CrawlerRunConfig(page_timeout=30000),
dispatcher=dispatcher
)
assert len(results) == len(urls)
@pytest.mark.asyncio
async def test_rate_limiting():
"""Test rate limiting functionality"""
browser_config = BrowserConfig(
browser_type="chromium",
headless=True
)
dispatcher = MemoryAdaptiveDispatcher(
rate_limiter=RateLimiter(
base_delay=(1.0, 2.0),
max_delay=5.0,
max_retries=2
),
memory_threshold_percent=70.0
)
urls = [
"https://example.com",
"https://example.org",
"https://example.net"
]
async with AsyncWebCrawler(config=browser_config) as crawler:
results = await crawler.arun_many(
urls=urls,
config=CrawlerRunConfig(page_timeout=30000),
dispatcher=dispatcher
)
assert len(results) == len(urls)
@pytest.mark.asyncio
async def test_javascript_execution():
"""Test JavaScript execution capabilities"""
browser_config = BrowserConfig(
browser_type="chromium",
headless=True,
java_script_enabled=True
)
js_code = """
document.body.style.backgroundColor = 'red';
return document.body.style.backgroundColor;
"""
async with AsyncWebCrawler(config=browser_config) as crawler:
result = await crawler.arun(
url="https://example.com",
config=CrawlerRunConfig(
js_code=js_code,
page_timeout=30000
)
)
assert result.success
@pytest.mark.asyncio
@pytest.mark.parametrize("error_url", [
"https://invalid.domain.test",
"https://httpbin.org/status/404",
"https://httpbin.org/status/503",
"https://httpbin.org/status/403"
])
async def test_error_handling(error_url):
"""Test error handling for various failure scenarios"""
browser_config = BrowserConfig(
browser_type="chromium",
headless=True
)
async with AsyncWebCrawler(config=browser_config) as crawler:
result = await crawler.arun(
url=error_url,
config=CrawlerRunConfig(
page_timeout=10000, # Short timeout for error cases
cache_mode=CacheMode.BYPASS
)
)
assert not result.success
assert result.error_message is not None
if __name__ == "__main__":
asyncio.run(test_viewport_config((1024, 768)))
asyncio.run(test_memory_management())
asyncio.run(test_rate_limiting())
asyncio.run(test_javascript_execution())

View File

@@ -0,0 +1,85 @@
import asyncio
from crawl4ai import AsyncWebCrawler, BrowserConfig, CrawlerRunConfig, CacheMode
from playwright.async_api import Page, BrowserContext
async def test_reuse_context_by_config():
# We will store each context ID in these maps to confirm reuse
context_ids_for_A = []
context_ids_for_B = []
# Create a small hook to track context creation
async def on_page_context_created(page: Page, context: BrowserContext, config: CrawlerRunConfig, **kwargs):
c_id = id(context)
print(f"[HOOK] on_page_context_created - Context ID: {c_id}")
# Distinguish which config we used by checking a custom hook param
config_label = config.shared_data.get("config_label", "unknown")
if config_label == "A":
context_ids_for_A.append(c_id)
elif config_label == "B":
context_ids_for_B.append(c_id)
return page
# Browser config - Headless, verbose so we see logs
browser_config = BrowserConfig(headless=True, verbose=True)
# Two crawler run configs that differ (for example, text_mode):
configA = CrawlerRunConfig(
only_text=True,
cache_mode=CacheMode.BYPASS,
wait_until="domcontentloaded",
shared_data = {
"config_label" : "A"
}
)
configB = CrawlerRunConfig(
only_text=False,
cache_mode=CacheMode.BYPASS,
wait_until="domcontentloaded",
shared_data = {
"config_label" : "B"
}
)
# Create the crawler
crawler = AsyncWebCrawler(config=browser_config)
# Attach our custom hook
# Note: "on_page_context_created" will be called each time a new context+page is generated
crawler.crawler_strategy.set_hook("on_page_context_created", on_page_context_created)
# Start the crawler (launches the browser)
await crawler.start()
# For demonstration, well crawl a benign site multiple times with each config
test_url = "https://example.com"
print("\n--- Crawling with config A (text_mode=True) ---")
for _ in range(2):
# Pass an extra kwarg to the hook so we know which config is being used
await crawler.arun(test_url, config=configA)
print("\n--- Crawling with config B (text_mode=False) ---")
for _ in range(2):
await crawler.arun(test_url, config=configB)
# Close the crawler (shuts down the browser, closes contexts)
await crawler.close()
# Validate and show the results
print("\n=== RESULTS ===")
print(f"Config A context IDs: {context_ids_for_A}")
print(f"Config B context IDs: {context_ids_for_B}")
if len(set(context_ids_for_A)) == 1:
print("✅ All config A crawls used the SAME BrowserContext.")
else:
print("❌ Config A crawls created multiple contexts unexpectedly.")
if len(set(context_ids_for_B)) == 1:
print("✅ All config B crawls used the SAME BrowserContext.")
else:
print("❌ Config B crawls created multiple contexts unexpectedly.")
if set(context_ids_for_A).isdisjoint(context_ids_for_B):
print("✅ Config A context is different from Config B context.")
else:
print("❌ A and B ended up sharing the same context somehow!")
if __name__ == "__main__":
asyncio.run(test_reuse_context_by_config())

View File

@@ -0,0 +1,17 @@
# example_usageexample_usageexample_usage# example_usage.py
import asyncio
from crawl4ai.crawlers import get_crawler
async def main():
# Get the registered crawler
example_crawler = get_crawler("example_site.content")
# Crawl example.com
result = await example_crawler(url="https://example.com")
print(result)
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -0,0 +1,46 @@
import asyncio
import time
from crawl4ai import CrawlerRunConfig, AsyncWebCrawler, CacheMode
from crawl4ai.content_scraping_strategy import LXMLWebScrapingStrategy
from crawl4ai.deep_crawling import BFSDeepCrawlStrategy
# from crawl4ai.deep_crawling import BFSDeepCrawlStrategy, BestFirstCrawlingStrategy
async def main():
"""Example deep crawl of documentation site."""
config = CrawlerRunConfig(
deep_crawl_strategy = BFSDeepCrawlStrategy(
max_depth=2,
include_external=False
),
stream=False,
verbose=True,
cache_mode=CacheMode.BYPASS,
scraping_strategy=LXMLWebScrapingStrategy()
)
async with AsyncWebCrawler() as crawler:
start_time = time.perf_counter()
print("\nStarting deep crawl in batch mode:")
results = await crawler.arun(
url="https://docs.crawl4ai.com",
config=config
)
print(f"Crawled {len(results)} pages")
print(f"Example page: {results[0].url}")
print(f"Duration: {time.perf_counter() - start_time:.2f} seconds\n")
print("Starting deep crawl in streaming mode:")
config.stream = True
start_time = time.perf_counter()
async for result in await crawler.arun(
url="https://docs.crawl4ai.com",
config=config
):
print(f"{result.url} (Depth: {result.metadata.get('depth', 0)})")
print(f"Duration: {time.perf_counter() - start_time:.2f} seconds")
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -0,0 +1,279 @@
from crawl4ai.deep_crawling.filters import ContentRelevanceFilter, URLPatternFilter, DomainFilter, ContentTypeFilter, SEOFilter
async def test_pattern_filter():
# Test cases as list of tuples instead of dict for multiple patterns
test_cases = [
# Simple suffix patterns (*.html)
("*.html", {
"https://example.com/page.html": True,
"https://example.com/path/doc.html": True,
"https://example.com/page.htm": False,
"https://example.com/page.html?param=1": True,
}),
# Path prefix patterns (/foo/*)
("*/article/*", {
"https://example.com/article/123": True,
"https://example.com/blog/article/456": True,
"https://example.com/articles/789": False,
"https://example.com/article": False,
}),
# Complex patterns
("blog-*-[0-9]", {
"https://example.com/blog-post-1": True,
"https://example.com/blog-test-9": True,
"https://example.com/blog-post": False,
"https://example.com/blog-post-x": False,
}),
# Multiple patterns case
(["*.pdf", "*/download/*"], {
"https://example.com/doc.pdf": True,
"https://example.com/download/file.txt": True,
"https://example.com/path/download/doc": True,
"https://example.com/uploads/file.txt": False,
}),
# Edge cases
("*", {
"https://example.com": True,
"": True,
"http://test.com/path": True,
}),
# Complex regex
(r"^https?://.*\.example\.com/\d+", {
"https://sub.example.com/123": True,
"http://test.example.com/456": True,
"https://example.com/789": False,
"https://sub.example.com/abc": False,
})
]
def run_accuracy_test():
print("\nAccuracy Tests:")
print("-" * 50)
all_passed = True
for patterns, test_urls in test_cases:
filter_obj = URLPatternFilter(patterns)
for url, expected in test_urls.items():
result = filter_obj.apply(url)
if result != expected:
print(f"❌ Failed: Pattern '{patterns}' with URL '{url}'")
print(f" Expected: {expected}, Got: {result}")
all_passed = False
else:
print(f"✅ Passed: Pattern '{patterns}' with URL '{url}'")
return all_passed
# Run tests
print("Running Pattern Filter Tests...")
accuracy_passed = run_accuracy_test()
if accuracy_passed:
print("\n✨ All accuracy tests passed!")
else:
print("\n❌ Some accuracy tests failed!")
async def test_domain_filter():
from itertools import chain
# Test cases
test_cases = [
# Allowed domains
({"allowed": "example.com"}, {
"https://example.com/page": True,
"http://example.com": True,
"https://sub.example.com": False,
"https://other.com": False,
}),
({"allowed": ["example.com", "test.com"]}, {
"https://example.com/page": True,
"https://test.com/home": True,
"https://other.com": False,
}),
# Blocked domains
({"blocked": "malicious.com"}, {
"https://malicious.com": False,
"https://safe.com": True,
"http://malicious.com/login": False,
}),
({"blocked": ["spam.com", "ads.com"]}, {
"https://spam.com": False,
"https://ads.com/banner": False,
"https://example.com": True,
}),
# Allowed and Blocked combination
({"allowed": "example.com", "blocked": "sub.example.com"}, {
"https://example.com": True,
"https://sub.example.com": False,
"https://other.com": False,
}),
]
def run_accuracy_test():
print("\nAccuracy Tests:")
print("-" * 50)
all_passed = True
for params, test_urls in test_cases:
filter_obj = DomainFilter(
allowed_domains=params.get("allowed"),
blocked_domains=params.get("blocked"),
)
for url, expected in test_urls.items():
result = filter_obj.apply(url)
if result != expected:
print(f"\u274C Failed: Params {params} with URL '{url}'")
print(f" Expected: {expected}, Got: {result}")
all_passed = False
else:
print(f"\u2705 Passed: Params {params} with URL '{url}'")
return all_passed
# Run tests
print("Running Domain Filter Tests...")
accuracy_passed = run_accuracy_test()
if accuracy_passed:
print("\n\u2728 All accuracy tests passed!")
else:
print("\n\u274C Some accuracy tests failed!")
async def test_content_relevance_filter():
relevance_filter = ContentRelevanceFilter(
query="What was the cause of american civil war?",
threshold=1
)
test_cases = {
"https://en.wikipedia.org/wiki/Cricket": False,
"https://en.wikipedia.org/wiki/American_Civil_War": True,
}
print("\nRunning Content Relevance Filter Tests...")
print("-" * 50)
all_passed = True
for url, expected in test_cases.items():
result = await relevance_filter.apply(url)
if result != expected:
print(f"\u274C Failed: URL '{url}'")
print(f" Expected: {expected}, Got: {result}")
all_passed = False
else:
print(f"\u2705 Passed: URL '{url}'")
if all_passed:
print("\n\u2728 All content relevance tests passed!")
else:
print("\n\u274C Some content relevance tests failed!")
async def test_content_type_filter():
from itertools import chain
# Test cases
test_cases = [
# Allowed single type
({"allowed": "image/png"}, {
"https://example.com/image.png": True,
"https://example.com/photo.jpg": False,
"https://example.com/document.pdf": False,
}),
# Multiple allowed types
({"allowed": ["image/jpeg", "application/pdf"]}, {
"https://example.com/photo.jpg": True,
"https://example.com/document.pdf": True,
"https://example.com/script.js": False,
}),
# No extension should be allowed
({"allowed": "application/json"}, {
"https://example.com/api/data": True,
"https://example.com/data.json": True,
"https://example.com/page.html": False,
}),
# Unknown extensions should not be allowed
({"allowed": "application/octet-stream"}, {
"https://example.com/file.unknown": True,
"https://example.com/archive.zip": False,
"https://example.com/software.exe": False,
}),
]
def run_accuracy_test():
print("\nAccuracy Tests:")
print("-" * 50)
all_passed = True
for params, test_urls in test_cases:
filter_obj = ContentTypeFilter(
allowed_types=params.get("allowed"),
)
for url, expected in test_urls.items():
result = filter_obj.apply(url)
if result != expected:
print(f"\u274C Failed: Params {params} with URL '{url}'")
print(f" Expected: {expected}, Got: {result}")
all_passed = False
else:
print(f"\u2705 Passed: Params {params} with URL '{url}'")
return all_passed
# Run tests
print("Running Content Type Filter Tests...")
accuracy_passed = run_accuracy_test()
if accuracy_passed:
print("\n\u2728 All accuracy tests passed!")
else:
print("\n\u274C Some accuracy tests failed!")
async def test_seo_filter():
seo_filter = SEOFilter(threshold=0.5, keywords=["SEO", "search engines", "Optimization"])
test_cases = {
"https://en.wikipedia.org/wiki/Search_engine_optimization": True,
"https://en.wikipedia.org/wiki/Randomness": False,
}
print("\nRunning SEO Filter Tests...")
print("-" * 50)
all_passed = True
for url, expected in test_cases.items():
result = await seo_filter.apply(url)
if result != expected:
print(f"\u274C Failed: URL '{url}'")
print(f" Expected: {expected}, Got: {result}")
all_passed = False
else:
print(f"\u2705 Passed: URL '{url}'")
if all_passed:
print("\n\u2728 All SEO filter tests passed!")
else:
print("\n\u274C Some SEO filter tests failed!")
import asyncio
if __name__ == "__main__":
asyncio.run(test_pattern_filter())
asyncio.run(test_domain_filter())
asyncio.run(test_content_type_filter())
asyncio.run(test_content_relevance_filter())
asyncio.run(test_seo_filter())

View File

@@ -0,0 +1,179 @@
from crawl4ai.deep_crawling.scorers import CompositeScorer, ContentTypeScorer, DomainAuthorityScorer, FreshnessScorer, KeywordRelevanceScorer, PathDepthScorer
def test_scorers():
test_cases = [
# Keyword Scorer Tests
{
"scorer_type": "keyword",
"config": {
"keywords": ["python", "blog"],
"weight": 1.0,
"case_sensitive": False
},
"urls": {
"https://example.com/python-blog": 1.0,
"https://example.com/PYTHON-BLOG": 1.0,
"https://example.com/python-only": 0.5,
"https://example.com/other": 0.0
}
},
# Path Depth Scorer Tests
{
"scorer_type": "path_depth",
"config": {
"optimal_depth": 2,
"weight": 1.0
},
"urls": {
"https://example.com/a/b": 1.0,
"https://example.com/a": 0.5,
"https://example.com/a/b/c": 0.5,
"https://example.com": 0.33333333
}
},
# Content Type Scorer Tests
{
"scorer_type": "content_type",
"config": {
"type_weights": {
".html$": 1.0,
".pdf$": 0.8,
".jpg$": 0.6
},
"weight": 1.0
},
"urls": {
"https://example.com/doc.html": 1.0,
"https://example.com/doc.pdf": 0.8,
"https://example.com/img.jpg": 0.6,
"https://example.com/other.txt": 0.0
}
},
# Freshness Scorer Tests
{
"scorer_type": "freshness",
"config": {
"weight": 1.0, # Remove current_year since original doesn't support it
},
"urls": {
"https://example.com/2024/01/post": 1.0,
"https://example.com/2023/12/post": 0.9,
"https://example.com/2022/post": 0.8,
"https://example.com/no-date": 0.5
}
},
# Domain Authority Scorer Tests
{
"scorer_type": "domain",
"config": {
"domain_weights": {
"python.org": 1.0,
"github.com": 0.8,
"medium.com": 0.6
},
"default_weight": 0.3,
"weight": 1.0
},
"urls": {
"https://python.org/about": 1.0,
"https://github.com/repo": 0.8,
"https://medium.com/post": 0.6,
"https://unknown.com": 0.3
}
}
]
def create_scorer(scorer_type, config):
if scorer_type == "keyword":
return KeywordRelevanceScorer(**config)
elif scorer_type == "path_depth":
return PathDepthScorer(**config)
elif scorer_type == "content_type":
return ContentTypeScorer(**config)
elif scorer_type == "freshness":
return FreshnessScorer(**config,current_year=2024)
elif scorer_type == "domain":
return DomainAuthorityScorer(**config)
def run_accuracy_test():
print("\nAccuracy Tests:")
print("-" * 50)
all_passed = True
for test_case in test_cases:
print(f"\nTesting {test_case['scorer_type']} scorer:")
scorer = create_scorer(
test_case['scorer_type'],
test_case['config']
)
for url, expected in test_case['urls'].items():
score = round(scorer.score(url), 8)
expected = round(expected, 8)
if abs(score - expected) > 0.00001:
print(f"❌ Scorer Failed: URL '{url}'")
print(f" Expected: {expected}, Got: {score}")
all_passed = False
else:
print(f"✅ Scorer Passed: URL '{url}'")
return all_passed
def run_composite_test():
print("\nTesting Composite Scorer:")
print("-" * 50)
# Create test data
test_urls = {
"https://python.org/blog/2024/01/new-release.html":0.86666667,
"https://github.com/repo/old-code.pdf": 0.62,
"https://unknown.com/random": 0.26
}
# Create composite scorers with all types
scorers = []
for test_case in test_cases:
scorer = create_scorer(
test_case['scorer_type'],
test_case['config']
)
scorers.append(scorer)
composite = CompositeScorer(scorers, normalize=True)
all_passed = True
for url, expected in test_urls.items():
score = round(composite.score(url), 8)
if abs(score - expected) > 0.00001:
print(f"❌ Composite Failed: URL '{url}'")
print(f" Expected: {expected}, Got: {score}")
all_passed = False
else:
print(f"✅ Composite Passed: URL '{url}'")
return all_passed
# Run tests
print("Running Scorer Tests...")
accuracy_passed = run_accuracy_test()
composite_passed = run_composite_test()
if accuracy_passed and composite_passed:
print("\n✨ All tests passed!")
# Note: Already have performance tests in run_scorer_performance_test()
else:
print("\n❌ Some tests failed!")
if __name__ == "__main__":
test_scorers()

View File

@@ -0,0 +1,116 @@
from tkinter import N
from crawl4ai.async_crawler_strategy import AsyncHTTPCrawlerStrategy
from crawl4ai.async_logger import AsyncLogger
from crawl4ai import CrawlerRunConfig, HTTPCrawlerConfig
from crawl4ai.async_crawler_strategy import ConnectionTimeoutError
import asyncio
import os
async def main():
"""Test the AsyncHTTPCrawlerStrategy with various scenarios"""
logger = AsyncLogger(verbose=True)
# Initialize the strategy with default HTTPCrawlerConfig
crawler = AsyncHTTPCrawlerStrategy(
browser_config=HTTPCrawlerConfig(),
logger=logger
)
# Test 1: Basic HTTP GET
print("\n=== Test 1: Basic HTTP GET ===")
result = await crawler.crawl("https://example.com")
print(f"Status: {result.status_code}")
print(f"Content length: {len(result.html)}")
print(f"Headers: {dict(result.response_headers)}")
# Test 2: POST request with JSON
print("\n=== Test 2: POST with JSON ===")
crawler.browser_config = crawler.browser_config.clone(
method="POST",
json={"test": "data"},
headers={"Content-Type": "application/json"}
)
try:
result = await crawler.crawl(
"https://httpbin.org/post",
)
print(f"Status: {result.status_code}")
print(f"Response: {result.html[:200]}...")
except Exception as e:
print(f"Error: {e}")
# Test 3: File handling
crawler.browser_config = HTTPCrawlerConfig()
print("\n=== Test 3: Local file handling ===")
# Create a tmp file with test content
from tempfile import NamedTemporaryFile
with NamedTemporaryFile(delete=False) as f:
f.write(b"<html><body>Test content</body></html>")
f.close()
result = await crawler.crawl(f"file://{f.name}")
print(f"File content: {result.html}")
# Test 4: Raw content
print("\n=== Test 4: Raw content handling ===")
raw_html = "raw://<html><body>Raw test content</body></html>"
result = await crawler.crawl(raw_html)
print(f"Raw content: {result.html}")
# Test 5: Custom hooks
print("\n=== Test 5: Custom hooks ===")
async def before_request(url, kwargs):
print(f"Before request to {url}")
kwargs['headers']['X-Custom'] = 'test'
async def after_request(response):
print(f"After request, status: {response.status_code}")
crawler.set_hook('before_request', before_request)
crawler.set_hook('after_request', after_request)
result = await crawler.crawl("https://example.com")
# Test 6: Error handling
print("\n=== Test 6: Error handling ===")
try:
await crawler.crawl("https://nonexistent.domain.test")
except Exception as e:
print(f"Expected error: {e}")
# Test 7: Redirects
print("\n=== Test 7: Redirect handling ===")
crawler.browser_config = HTTPCrawlerConfig(follow_redirects=True)
result = await crawler.crawl("http://httpbin.org/redirect/1")
print(f"Final URL: {result.redirected_url}")
# Test 8: Custom timeout
print("\n=== Test 8: Custom timeout ===")
try:
await crawler.crawl(
"https://httpbin.org/delay/5",
config=CrawlerRunConfig(page_timeout=2)
)
except ConnectionTimeoutError as e:
print(f"Expected timeout: {e}")
# Test 9: SSL verification
print("\n=== Test 9: SSL verification ===")
crawler.browser_config = HTTPCrawlerConfig(verify_ssl=False)
try:
await crawler.crawl("https://expired.badssl.com/")
print("Connected to invalid SSL site with verification disabled")
except Exception as e:
print(f"SSL error: {e}")
# Test 10: Large file streaming
print("\n=== Test 10: Large file streaming ===")
from tempfile import NamedTemporaryFile
with NamedTemporaryFile(delete=False) as f:
f.write(b"<html><body>" + b"X" * 1024 * 1024 * 10 + b"</body></html>")
f.close()
result = await crawler.crawl("file://" + f.name)
print(f"Large file content length: {len(result.html)}")
os.remove(f.name)
crawler.close()
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -0,0 +1,86 @@
import os
import asyncio
from crawl4ai import AsyncWebCrawler, BrowserConfig, CrawlerRunConfig, CacheMode
from crawl4ai import LLMConfig
from crawl4ai.content_filter_strategy import LLMContentFilter
async def test_llm_filter():
# Create an HTML source that needs intelligent filtering
url = "https://docs.python.org/3/tutorial/classes.html"
browser_config = BrowserConfig(
headless=True,
verbose=True
)
# run_config = CrawlerRunConfig(cache_mode=CacheMode.BYPASS)
run_config = CrawlerRunConfig(cache_mode=CacheMode.ENABLED)
async with AsyncWebCrawler(config=browser_config) as crawler:
# First get the raw HTML
result = await crawler.arun(url, config=run_config)
html = result.cleaned_html
# Initialize LLM filter with focused instruction
filter = LLMContentFilter(
llm_config=LLMConfig(provider="openai/gpt-4o",api_token=os.getenv('OPENAI_API_KEY')),
instruction="""
Focus on extracting the core educational content about Python classes.
Include:
- Key concepts and their explanations
- Important code examples
- Essential technical details
Exclude:
- Navigation elements
- Sidebars
- Footer content
- Version information
- Any non-essential UI elements
Format the output as clean markdown with proper code blocks and headers.
""",
verbose=True
)
filter = LLMContentFilter(
llm_config = LLMConfig(provider="openai/gpt-4o",api_token=os.getenv('OPENAI_API_KEY')),
chunk_token_threshold=2 ** 12 * 2, # 2048 * 2
instruction="""
Extract the main educational content while preserving its original wording and substance completely. Your task is to:
1. Maintain the exact language and terminology used in the main content
2. Keep all technical explanations, examples, and educational content intact
3. Preserve the original flow and structure of the core content
4. Remove only clearly irrelevant elements like:
- Navigation menus
- Advertisement sections
- Cookie notices
- Footers with site information
- Sidebars with external links
- Any UI elements that don't contribute to learning
The goal is to create a clean markdown version that reads exactly like the original article,
keeping all valuable content but free from distracting elements. Imagine you're creating
a perfect reading experience where nothing valuable is lost, but all noise is removed.
""",
verbose=True
)
# Apply filtering
filtered_content = filter.filter_content(html, ignore_cache = True)
# Show results
print("\nFiltered Content Length:", len(filtered_content))
print("\nFirst 500 chars of filtered content:")
if filtered_content:
print(filtered_content[0][:500])
# Save on disc the markdown version
with open("filtered_content.md", "w", encoding="utf-8") as f:
f.write("\n".join(filtered_content))
# Show token usage
filter.show_usage()
if __name__ == "__main__":
asyncio.run(test_llm_filter())

213
tests/general/test_mhtml.py Normal file
View File

@@ -0,0 +1,213 @@
# test_mhtml_capture.py
import pytest
import asyncio
import re # For more robust MHTML checks
# Assuming these can be imported directly from the crawl4ai library
from crawl4ai import AsyncWebCrawler, BrowserConfig, CrawlerRunConfig, CrawlResult
# A reliable, simple static HTML page for testing
# Using httpbin as it's designed for testing clients
TEST_URL_SIMPLE = "https://httpbin.org/html"
EXPECTED_CONTENT_SIMPLE = "Herman Melville - Moby-Dick"
# A slightly more complex page that might involve JS (good secondary test)
TEST_URL_JS = "https://quotes.toscrape.com/js/"
EXPECTED_CONTENT_JS = "Quotes to Scrape" # Title of the page, which should be present in MHTML
# Removed the custom event_loop fixture as pytest-asyncio provides a default one.
@pytest.mark.asyncio
async def test_mhtml_capture_when_enabled():
"""
Verify that when CrawlerRunConfig has capture_mhtml=True,
the CrawlResult contains valid MHTML content.
"""
# Create a fresh browser config and crawler instance for this test
browser_config = BrowserConfig(headless=True) # Use headless for testing CI/CD
# --- Key: Enable MHTML capture in the run config ---
run_config = CrawlerRunConfig(capture_mhtml=True)
# Create a fresh crawler instance
crawler = AsyncWebCrawler(config=browser_config)
try:
# Start the browser
await crawler.start()
# Perform the crawl with the MHTML-enabled config
result: CrawlResult = await crawler.arun(TEST_URL_SIMPLE, config=run_config)
# --- Assertions ---
assert result is not None, "Crawler should return a result object"
assert result.success is True, f"Crawling {TEST_URL_SIMPLE} should succeed. Error: {result.error_message}"
# 1. Check if the mhtml attribute exists (will fail if CrawlResult not updated)
assert hasattr(result, 'mhtml'), "CrawlResult object must have an 'mhtml' attribute"
# 2. Check if mhtml is populated
assert result.mhtml is not None, "MHTML content should be captured when enabled"
assert isinstance(result.mhtml, str), "MHTML content should be a string"
assert len(result.mhtml) > 500, "MHTML content seems too short, likely invalid" # Basic sanity check
# 3. Check for MHTML structure indicators (more robust than simple string contains)
# MHTML files are multipart MIME messages
assert re.search(r"Content-Type: multipart/related;", result.mhtml, re.IGNORECASE), \
"MHTML should contain 'Content-Type: multipart/related;'"
# Should contain a boundary definition
assert re.search(r"boundary=\"----MultipartBoundary", result.mhtml), \
"MHTML should contain a multipart boundary"
# Should contain the main HTML part
assert re.search(r"Content-Type: text/html", result.mhtml, re.IGNORECASE), \
"MHTML should contain a 'Content-Type: text/html' part"
# 4. Check if the *actual page content* is within the MHTML string
# This confirms the snapshot captured the rendered page
assert EXPECTED_CONTENT_SIMPLE in result.mhtml, \
f"Expected content '{EXPECTED_CONTENT_SIMPLE}' not found within the captured MHTML"
# 5. Ensure standard HTML is still present and correct
assert result.html is not None, "Standard HTML should still be present"
assert isinstance(result.html, str), "Standard HTML should be a string"
assert EXPECTED_CONTENT_SIMPLE in result.html, \
f"Expected content '{EXPECTED_CONTENT_SIMPLE}' not found within the standard HTML"
finally:
# Important: Ensure browser is completely closed even if assertions fail
await crawler.close()
# Help the garbage collector clean up
crawler = None
@pytest.mark.asyncio
async def test_mhtml_capture_when_disabled_explicitly():
"""
Verify that when CrawlerRunConfig explicitly has capture_mhtml=False,
the CrawlResult.mhtml attribute is None.
"""
# Create a fresh browser config and crawler instance for this test
browser_config = BrowserConfig(headless=True)
# --- Key: Explicitly disable MHTML capture ---
run_config = CrawlerRunConfig(capture_mhtml=False)
# Create a fresh crawler instance
crawler = AsyncWebCrawler(config=browser_config)
try:
# Start the browser
await crawler.start()
result: CrawlResult = await crawler.arun(TEST_URL_SIMPLE, config=run_config)
assert result is not None
assert result.success is True, f"Crawling {TEST_URL_SIMPLE} should succeed. Error: {result.error_message}"
# 1. Check attribute existence (important for TDD start)
assert hasattr(result, 'mhtml'), "CrawlResult object must have an 'mhtml' attribute"
# 2. Check mhtml is None
assert result.mhtml is None, "MHTML content should be None when explicitly disabled"
# 3. Ensure standard HTML is still present
assert result.html is not None
assert EXPECTED_CONTENT_SIMPLE in result.html
finally:
# Important: Ensure browser is completely closed even if assertions fail
await crawler.close()
# Help the garbage collector clean up
crawler = None
@pytest.mark.asyncio
async def test_mhtml_capture_when_disabled_by_default():
"""
Verify that if capture_mhtml is not specified (using its default),
the CrawlResult.mhtml attribute is None.
(This assumes the default value for capture_mhtml in CrawlerRunConfig is False)
"""
# Create a fresh browser config and crawler instance for this test
browser_config = BrowserConfig(headless=True)
# --- Key: Use default run config ---
run_config = CrawlerRunConfig() # Do not specify capture_mhtml
# Create a fresh crawler instance
crawler = AsyncWebCrawler(config=browser_config)
try:
# Start the browser
await crawler.start()
result: CrawlResult = await crawler.arun(TEST_URL_SIMPLE, config=run_config)
assert result is not None
assert result.success is True, f"Crawling {TEST_URL_SIMPLE} should succeed. Error: {result.error_message}"
# 1. Check attribute existence
assert hasattr(result, 'mhtml'), "CrawlResult object must have an 'mhtml' attribute"
# 2. Check mhtml is None (assuming default is False)
assert result.mhtml is None, "MHTML content should be None when using default config (assuming default=False)"
# 3. Ensure standard HTML is still present
assert result.html is not None
assert EXPECTED_CONTENT_SIMPLE in result.html
finally:
# Important: Ensure browser is completely closed even if assertions fail
await crawler.close()
# Help the garbage collector clean up
crawler = None
# Optional: Add a test for a JS-heavy page if needed
@pytest.mark.asyncio
async def test_mhtml_capture_on_js_page_when_enabled():
"""
Verify MHTML capture works on a page requiring JavaScript execution.
"""
# Create a fresh browser config and crawler instance for this test
browser_config = BrowserConfig(headless=True)
run_config = CrawlerRunConfig(
capture_mhtml=True,
# Add a small wait or JS execution if needed for the JS page to fully render
# For quotes.toscrape.com/js/, it renders quickly, but a wait might be safer
# wait_for_timeout=2000 # Example: wait up to 2 seconds
js_code="await new Promise(r => setTimeout(r, 500));" # Small delay after potential load
)
# Create a fresh crawler instance
crawler = AsyncWebCrawler(config=browser_config)
try:
# Start the browser
await crawler.start()
result: CrawlResult = await crawler.arun(TEST_URL_JS, config=run_config)
assert result is not None
assert result.success is True, f"Crawling {TEST_URL_JS} should succeed. Error: {result.error_message}"
assert hasattr(result, 'mhtml'), "CrawlResult object must have an 'mhtml' attribute"
assert result.mhtml is not None, "MHTML content should be captured on JS page when enabled"
assert isinstance(result.mhtml, str), "MHTML content should be a string"
assert len(result.mhtml) > 500, "MHTML content from JS page seems too short"
# Check for MHTML structure
assert re.search(r"Content-Type: multipart/related;", result.mhtml, re.IGNORECASE)
assert re.search(r"Content-Type: text/html", result.mhtml, re.IGNORECASE)
# Check for content rendered by JS within the MHTML
assert EXPECTED_CONTENT_JS in result.mhtml, \
f"Expected JS-rendered content '{EXPECTED_CONTENT_JS}' not found within the captured MHTML"
# Check standard HTML too
assert result.html is not None
assert EXPECTED_CONTENT_JS in result.html, \
f"Expected JS-rendered content '{EXPECTED_CONTENT_JS}' not found within the standard HTML"
finally:
# Important: Ensure browser is completely closed even if assertions fail
await crawler.close()
# Help the garbage collector clean up
crawler = None
if __name__ == "__main__":
# Use pytest for async tests
pytest.main(["-xvs", __file__])

View File

@@ -0,0 +1,185 @@
from crawl4ai.async_webcrawler import AsyncWebCrawler
from crawl4ai.async_configs import CrawlerRunConfig, BrowserConfig
import asyncio
import aiohttp
from aiohttp import web
import tempfile
import shutil
import os, sys, time, json
async def start_test_server():
app = web.Application()
async def basic_page(request):
return web.Response(text="""
<!DOCTYPE html>
<html>
<head>
<title>Network Request Test</title>
</head>
<body>
<h1>Test Page for Network Capture</h1>
<p>This page performs network requests and console logging.</p>
<img src="/image.png" alt="Test Image">
<script>
console.log("Basic console log");
console.error("Error message");
console.warn("Warning message");
// Make some XHR requests
const xhr = new XMLHttpRequest();
xhr.open('GET', '/api/data', true);
xhr.send();
// Make a fetch request
fetch('/api/json')
.then(response => response.json())
.catch(error => console.error('Fetch error:', error));
// Trigger an error
setTimeout(() => {
try {
nonExistentFunction();
} catch (e) {
console.error("Caught error:", e);
}
}, 100);
</script>
</body>
</html>
""", content_type="text/html")
async def image(request):
# Return a small 1x1 transparent PNG
return web.Response(body=bytes.fromhex('89504E470D0A1A0A0000000D49484452000000010000000108060000001F15C4890000000D4944415478DA63FAFFFF3F030079DB00018D959DE70000000049454E44AE426082'), content_type="image/png")
async def api_data(request):
return web.Response(text="sample data")
async def api_json(request):
return web.json_response({"status": "success", "message": "JSON data"})
# Register routes
app.router.add_get('/', basic_page)
app.router.add_get('/image.png', image)
app.router.add_get('/api/data', api_data)
app.router.add_get('/api/json', api_json)
runner = web.AppRunner(app)
await runner.setup()
site = web.TCPSite(runner, 'localhost', 8080)
await site.start()
return runner
async def test_network_console_capture():
print("\n=== Testing Network and Console Capture ===\n")
# Start test server
runner = await start_test_server()
try:
browser_config = BrowserConfig(headless=True)
# Test with capture disabled (default)
print("\n1. Testing with capture disabled (default)...")
async with AsyncWebCrawler(config=browser_config) as crawler:
config = CrawlerRunConfig(
wait_until="networkidle", # Wait for network to be idle
)
result = await crawler.arun(url="http://localhost:8080/", config=config)
assert result.network_requests is None, "Network requests should be None when capture is disabled"
assert result.console_messages is None, "Console messages should be None when capture is disabled"
print("✓ Default config correctly returns None for network_requests and console_messages")
# Test with network capture enabled
print("\n2. Testing with network capture enabled...")
async with AsyncWebCrawler(config=browser_config) as crawler:
config = CrawlerRunConfig(
wait_until="networkidle", # Wait for network to be idle
capture_network_requests=True
)
result = await crawler.arun(url="http://localhost:8080/", config=config)
assert result.network_requests is not None, "Network requests should be captured"
print(f"✓ Captured {len(result.network_requests)} network requests")
# Check if we have both requests and responses
request_count = len([r for r in result.network_requests if r.get("event_type") == "request"])
response_count = len([r for r in result.network_requests if r.get("event_type") == "response"])
print(f" - {request_count} requests, {response_count} responses")
# Check if we captured specific resources
urls = [r.get("url") for r in result.network_requests]
has_image = any("/image.png" in url for url in urls)
has_api_data = any("/api/data" in url for url in urls)
has_api_json = any("/api/json" in url for url in urls)
assert has_image, "Should have captured image request"
assert has_api_data, "Should have captured API data request"
assert has_api_json, "Should have captured API JSON request"
print("✓ Captured expected network requests (image, API endpoints)")
# Test with console capture enabled
print("\n3. Testing with console capture enabled...")
async with AsyncWebCrawler(config=browser_config) as crawler:
config = CrawlerRunConfig(
wait_until="networkidle", # Wait for network to be idle
capture_console_messages=True
)
result = await crawler.arun(url="http://localhost:8080/", config=config)
assert result.console_messages is not None, "Console messages should be captured"
print(f"✓ Captured {len(result.console_messages)} console messages")
# Check if we have different types of console messages
message_types = set(msg.get("type") for msg in result.console_messages if "type" in msg)
print(f" - Message types: {', '.join(message_types)}")
# Print all captured messages for debugging
print(" - Captured messages:")
for msg in result.console_messages:
print(f" * Type: {msg.get('type', 'N/A')}, Text: {msg.get('text', 'N/A')}")
# Look for specific messages
messages = [msg.get("text") for msg in result.console_messages if "text" in msg]
has_basic_log = any("Basic console log" in msg for msg in messages)
has_error_msg = any("Error message" in msg for msg in messages)
has_warning_msg = any("Warning message" in msg for msg in messages)
assert has_basic_log, "Should have captured basic console.log message"
assert has_error_msg, "Should have captured console.error message"
assert has_warning_msg, "Should have captured console.warn message"
print("✓ Captured expected console messages (log, error, warning)")
# Test with both captures enabled
print("\n4. Testing with both network and console capture enabled...")
async with AsyncWebCrawler(config=browser_config) as crawler:
config = CrawlerRunConfig(
wait_until="networkidle", # Wait for network to be idle
capture_network_requests=True,
capture_console_messages=True
)
result = await crawler.arun(url="http://localhost:8080/", config=config)
assert result.network_requests is not None, "Network requests should be captured"
assert result.console_messages is not None, "Console messages should be captured"
print(f"✓ Successfully captured both {len(result.network_requests)} network requests and {len(result.console_messages)} console messages")
finally:
await runner.cleanup()
print("\nTest server shutdown")
async def main():
try:
await test_network_console_capture()
print("\n✅ All tests passed successfully!")
except Exception as e:
print(f"\n❌ Test failed: {str(e)}")
raise
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -0,0 +1,159 @@
from crawl4ai.utils import RobotsParser
import asyncio
import aiohttp
from aiohttp import web
import tempfile
import shutil
import os, sys, time, json
async def test_robots_parser():
print("\n=== Testing RobotsParser ===\n")
# Setup temporary directory for testing
temp_dir = tempfile.mkdtemp()
try:
# 1. Basic setup test
print("1. Testing basic initialization...")
parser = RobotsParser(cache_dir=temp_dir)
assert os.path.exists(parser.db_path), "Database file not created"
print("✓ Basic initialization passed")
# 2. Test common cases
print("\n2. Testing common cases...")
allowed = await parser.can_fetch("https://www.example.com", "MyBot/1.0")
print(f"✓ Regular website fetch: {'allowed' if allowed else 'denied'}")
# Test caching
print("Testing cache...")
start = time.time()
await parser.can_fetch("https://www.example.com", "MyBot/1.0")
duration = time.time() - start
print(f"✓ Cached lookup took: {duration*1000:.2f}ms")
assert duration < 0.03, "Cache lookup too slow"
# 3. Edge cases
print("\n3. Testing edge cases...")
# Empty URL
result = await parser.can_fetch("", "MyBot/1.0")
print(f"✓ Empty URL handled: {'allowed' if result else 'denied'}")
# Invalid URL
result = await parser.can_fetch("not_a_url", "MyBot/1.0")
print(f"✓ Invalid URL handled: {'allowed' if result else 'denied'}")
# URL without scheme
result = await parser.can_fetch("example.com/page", "MyBot/1.0")
print(f"✓ URL without scheme handled: {'allowed' if result else 'denied'}")
# 4. Test with local server
async def start_test_server():
app = web.Application()
async def robots_txt(request):
return web.Response(text="""User-agent: *
Disallow: /private/
Allow: /public/
""")
async def malformed_robots(request):
return web.Response(text="<<<malformed>>>")
async def timeout_robots(request):
await asyncio.sleep(5)
return web.Response(text="Should timeout")
async def empty_robots(request):
return web.Response(text="")
async def giant_robots(request):
return web.Response(text="User-agent: *\nDisallow: /\n" * 10000)
# Mount all handlers at root level
app.router.add_get('/robots.txt', robots_txt)
app.router.add_get('/malformed/robots.txt', malformed_robots)
app.router.add_get('/timeout/robots.txt', timeout_robots)
app.router.add_get('/empty/robots.txt', empty_robots)
app.router.add_get('/giant/robots.txt', giant_robots)
runner = web.AppRunner(app)
await runner.setup()
site = web.TCPSite(runner, 'localhost', 8080)
await site.start()
return runner
runner = await start_test_server()
try:
print("\n4. Testing robots.txt rules...")
base_url = "http://localhost:8080"
# Test public access
result = await parser.can_fetch(f"{base_url}/public/page", "bot")
print(f"Public access (/public/page): {'allowed' if result else 'denied'}")
assert result, "Public path should be allowed"
# Test private access
result = await parser.can_fetch(f"{base_url}/private/secret", "bot")
print(f"Private access (/private/secret): {'allowed' if result else 'denied'}")
assert not result, "Private path should be denied"
# Test malformed
result = await parser.can_fetch("http://localhost:8080/malformed/page", "bot")
print(f"✓ Malformed robots.txt handled: {'allowed' if result else 'denied'}")
# Test timeout
start = time.time()
result = await parser.can_fetch("http://localhost:8080/timeout/page", "bot")
duration = time.time() - start
print(f"✓ Timeout handled (took {duration:.2f}s): {'allowed' if result else 'denied'}")
assert duration < 3, "Timeout not working"
# Test empty
result = await parser.can_fetch("http://localhost:8080/empty/page", "bot")
print(f"✓ Empty robots.txt handled: {'allowed' if result else 'denied'}")
# Test giant file
start = time.time()
result = await parser.can_fetch("http://localhost:8080/giant/page", "bot")
duration = time.time() - start
print(f"✓ Giant robots.txt handled (took {duration:.2f}s): {'allowed' if result else 'denied'}")
finally:
await runner.cleanup()
# 5. Cache manipulation
print("\n5. Testing cache manipulation...")
# Clear expired
parser.clear_expired()
print("✓ Clear expired entries completed")
# Clear all
parser.clear_cache()
print("✓ Clear all cache completed")
# Test with custom TTL
custom_parser = RobotsParser(cache_dir=temp_dir, cache_ttl=1) # 1 second TTL
await custom_parser.can_fetch("https://www.example.com", "bot")
print("✓ Custom TTL fetch completed")
await asyncio.sleep(1.1)
start = time.time()
await custom_parser.can_fetch("https://www.example.com", "bot")
print(f"✓ TTL expiry working (refetched after {time.time() - start:.2f}s)")
finally:
# Cleanup
shutil.rmtree(temp_dir)
print("\nTest cleanup completed")
async def main():
try:
await test_robots_parser()
except Exception as e:
print(f"Test failed: {str(e)}")
raise
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -0,0 +1,112 @@
# https://claude.ai/chat/c4bbe93d-fb54-44ce-92af-76b4c8086c6b
# https://claude.ai/chat/c24a768c-d8b2-478a-acc7-d76d42a308da
import os, sys
parent_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
sys.path.append(parent_dir)
__location__ = os.path.realpath(os.path.join(os.getcwd(), os.path.dirname(__file__)))
import asyncio
from crawl4ai import AsyncWebCrawler, BrowserConfig, CrawlerRunConfig, CacheMode
from crawl4ai.markdown_generation_strategy import DefaultMarkdownGenerator
from crawl4ai.extraction_strategy import JsonCssExtractionStrategy, JsonXPathExtractionStrategy
from crawl4ai.utils import preprocess_html_for_schema, JsonXPathExtractionStrategy
import json
# Test HTML - A complex job board with companies, departments, and positions
test_html = """
<div class="company-listings">
<div class="company" data-company-id="123">
<div class="company-header">
<img class="company-logo" src="google.png" alt="Google">
<h1 class="company-name">Google</h1>
<div class="company-meta">
<span class="company-size">10,000+ employees</span>
<span class="company-industry">Technology</span>
<a href="https://google.careers" class="careers-link">Careers Page</a>
</div>
</div>
<div class="departments">
<div class="department">
<h2 class="department-name">Engineering</h2>
<div class="positions">
<div class="position-card" data-position-id="eng-1">
<h3 class="position-title">Senior Software Engineer</h3>
<span class="salary-range">$150,000 - $250,000</span>
<div class="position-meta">
<span class="location">Mountain View, CA</span>
<span class="job-type">Full-time</span>
<span class="experience">5+ years</span>
</div>
<div class="skills-required">
<span class="skill">Python</span>
<span class="skill">Kubernetes</span>
<span class="skill">Machine Learning</span>
</div>
<p class="position-description">Join our core engineering team...</p>
<div class="application-info">
<span class="posting-date">Posted: 2024-03-15</span>
<button class="apply-btn" data-req-id="REQ12345">Apply Now</button>
</div>
</div>
<!-- More positions -->
</div>
</div>
<div class="department">
<h2 class="department-name">Marketing</h2>
<div class="positions">
<div class="position-card" data-position-id="mkt-1">
<h3 class="position-title">Growth Marketing Manager</h3>
<span class="salary-range">$120,000 - $180,000</span>
<div class="position-meta">
<span class="location">New York, NY</span>
<span class="job-type">Full-time</span>
<span class="experience">3+ years</span>
</div>
<div class="skills-required">
<span class="skill">SEO</span>
<span class="skill">Analytics</span>
<span class="skill">Content Strategy</span>
</div>
<p class="position-description">Drive our growth initiatives...</p>
<div class="application-info">
<span class="posting-date">Posted: 2024-03-14</span>
<button class="apply-btn" data-req-id="REQ12346">Apply Now</button>
</div>
</div>
</div>
</div>
</div>
</div>
</div>
"""
# Test cases
def test_schema_generation():
# Test 1: No query (should extract everything)
print("\nTest 1: No Query (Full Schema)")
schema1 = JsonCssExtractionStrategy.generate_schema(test_html)
print(json.dumps(schema1, indent=2))
# Test 2: Query for just basic job info
print("\nTest 2: Basic Job Info Query")
query2 = "I only need job titles, salaries, and locations"
schema2 = JsonCssExtractionStrategy.generate_schema(test_html, query2)
print(json.dumps(schema2, indent=2))
# Test 3: Query for company and department structure
print("\nTest 3: Organizational Structure Query")
query3 = "Extract company details and department names, without position details"
schema3 = JsonCssExtractionStrategy.generate_schema(test_html, query3)
print(json.dumps(schema3, indent=2))
# Test 4: Query for specific skills tracking
print("\nTest 4: Skills Analysis Query")
query4 = "I want to analyze required skills across all positions"
schema4 = JsonCssExtractionStrategy.generate_schema(test_html, query4)
print(json.dumps(schema4, indent=2))
if __name__ == "__main__":
test_schema_generation()

View File

@@ -0,0 +1,50 @@
import os, sys
# append 2 parent directories to sys.path to import crawl4ai
parent_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
sys.path.append(parent_dir)
parent_parent_dir = os.path.dirname(parent_dir)
sys.path.append(parent_parent_dir)
import asyncio
from crawl4ai import *
async def test_crawler():
# Setup configurations
browser_config = BrowserConfig(headless=True, verbose=False)
crawler_config = CrawlerRunConfig(
cache_mode=CacheMode.BYPASS,
markdown_generator=DefaultMarkdownGenerator(
content_filter=PruningContentFilter(
threshold=0.48,
threshold_type="fixed",
min_word_threshold=0
)
),
)
# Test URLs - mix of different sites
urls = [
"http://example.com",
"http://example.org",
"http://example.net",
] * 10 # 15 total URLs
async with AsyncWebCrawler(config=browser_config) as crawler:
print("\n=== Testing Streaming Mode ===")
async for result in await crawler.arun_many(
urls=urls,
config=crawler_config.clone(stream=True),
):
print(f"Received result for: {result.url} - Success: {result.success}")
print("\n=== Testing Batch Mode ===")
results = await crawler.arun_many(
urls=urls,
config=crawler_config,
)
print(f"Received all {len(results)} results at once")
for result in results:
print(f"Batch result for: {result.url} - Success: {result.success}")
if __name__ == "__main__":
asyncio.run(test_crawler())

View File

@@ -0,0 +1,39 @@
import os, sys
# append 2 parent directories to sys.path to import crawl4ai
parent_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
sys.path.append(parent_dir)
parent_parent_dir = os.path.dirname(parent_dir)
sys.path.append(parent_parent_dir)
import asyncio
from typing import List
from crawl4ai import *
from crawl4ai.async_dispatcher import MemoryAdaptiveDispatcher
async def test_streaming():
browser_config = BrowserConfig(headless=True, verbose=True)
crawler_config = CrawlerRunConfig(
cache_mode=CacheMode.BYPASS,
markdown_generator=DefaultMarkdownGenerator(
# content_filter=PruningContentFilter(
# threshold=0.48,
# threshold_type="fixed",
# min_word_threshold=0
# )
),
)
urls = ["http://example.com"] * 10
async with AsyncWebCrawler(config=browser_config) as crawler:
dispatcher = MemoryAdaptiveDispatcher(
max_session_permit=5,
check_interval=0.5
)
async for result in dispatcher.run_urls_stream(urls, crawler, crawler_config):
print(f"Got result for {result.url} - Success: {result.result.success}")
if __name__ == "__main__":
asyncio.run(test_streaming())

View File

@@ -0,0 +1,62 @@
import asyncio
from crawl4ai import *
async def test_real_websites():
print("\n=== Testing Real Website Robots.txt Compliance ===\n")
browser_config = BrowserConfig(headless=True, verbose=True)
async with AsyncWebCrawler(config=browser_config) as crawler:
# Test cases with URLs
test_cases = [
# Public sites that should be allowed
("https://example.com", True), # Simple public site
("https://httpbin.org/get", True), # API endpoint
# Sites with known strict robots.txt
("https://www.facebook.com/robots.txt", False), # Social media
("https://www.google.com/search", False), # Search pages
# Edge cases
("https://api.github.com", True), # API service
("https://raw.githubusercontent.com", True), # Content delivery
# Non-existent/error cases
("https://thisisnotarealwebsite.com", True), # Non-existent domain
("https://localhost:12345", True), # Invalid port
]
for url, expected in test_cases:
print(f"\nTesting: {url}")
try:
config = CrawlerRunConfig(
cache_mode=CacheMode.BYPASS,
check_robots_txt=True, # Enable robots.txt checking
verbose=True
)
result = await crawler.arun(url=url, config=config)
allowed = result.success and not result.error_message
print(f"Expected: {'allowed' if expected else 'denied'}")
print(f"Actual: {'allowed' if allowed else 'denied'}")
print(f"Status Code: {result.status_code}")
if result.error_message:
print(f"Error: {result.error_message}")
# Optional: Print robots.txt content if available
if result.metadata and 'robots_txt' in result.metadata:
print(f"Robots.txt rules:\n{result.metadata['robots_txt']}")
except Exception as e:
print(f"Test failed with error: {str(e)}")
async def main():
try:
await test_real_websites()
except Exception as e:
print(f"Test suite failed: {str(e)}")
raise
if __name__ == "__main__":
asyncio.run(main())