Merge pull request #8 from aravindkarnam/main

Pulling in 0.3.74
This commit is contained in:
aravind
2024-11-23 13:57:36 +05:30
committed by GitHub
63 changed files with 7912 additions and 767 deletions

File diff suppressed because one or more lines are too long

View File

@@ -0,0 +1,229 @@
import os
import sys
import asyncio
import shutil
from typing import List
import tempfile
import time
# Add the parent directory to the Python path
parent_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
sys.path.append(parent_dir)
from crawl4ai.async_webcrawler import AsyncWebCrawler
class TestDownloads:
def __init__(self):
self.temp_dir = tempfile.mkdtemp(prefix="crawl4ai_test_")
self.download_dir = os.path.join(self.temp_dir, "downloads")
os.makedirs(self.download_dir, exist_ok=True)
self.results: List[str] = []
def cleanup(self):
shutil.rmtree(self.temp_dir)
def log_result(self, test_name: str, success: bool, message: str = ""):
result = f"{'' if success else ''} {test_name}: {message}"
self.results.append(result)
print(result)
async def test_basic_download(self):
"""Test basic file download functionality"""
try:
async with AsyncWebCrawler(
accept_downloads=True,
downloads_path=self.download_dir,
verbose=True
) as crawler:
# Python.org downloads page typically has stable download links
result = await crawler.arun(
url="https://www.python.org/downloads/",
js_code="""
// Click first download link
const downloadLink = document.querySelector('a[href$=".exe"]');
if (downloadLink) downloadLink.click();
"""
)
success = result.downloaded_files is not None and len(result.downloaded_files) > 0
self.log_result(
"Basic Download",
success,
f"Downloaded {len(result.downloaded_files or [])} files" if success else "No files downloaded"
)
except Exception as e:
self.log_result("Basic Download", False, str(e))
async def test_persistent_context_download(self):
"""Test downloads with persistent context"""
try:
user_data_dir = os.path.join(self.temp_dir, "user_data")
os.makedirs(user_data_dir, exist_ok=True)
async with AsyncWebCrawler(
accept_downloads=True,
downloads_path=self.download_dir,
use_persistent_context=True,
user_data_dir=user_data_dir,
verbose=True
) as crawler:
result = await crawler.arun(
url="https://www.python.org/downloads/",
js_code="""
const downloadLink = document.querySelector('a[href$=".exe"]');
if (downloadLink) downloadLink.click();
"""
)
success = result.downloaded_files is not None and len(result.downloaded_files) > 0
self.log_result(
"Persistent Context Download",
success,
f"Downloaded {len(result.downloaded_files or [])} files" if success else "No files downloaded"
)
except Exception as e:
self.log_result("Persistent Context Download", False, str(e))
async def test_multiple_downloads(self):
"""Test multiple simultaneous downloads"""
try:
async with AsyncWebCrawler(
accept_downloads=True,
downloads_path=self.download_dir,
verbose=True
) as crawler:
result = await crawler.arun(
url="https://www.python.org/downloads/",
js_code="""
// Click multiple download links
const downloadLinks = document.querySelectorAll('a[href$=".exe"]');
downloadLinks.forEach(link => link.click());
"""
)
success = result.downloaded_files is not None and len(result.downloaded_files) > 1
self.log_result(
"Multiple Downloads",
success,
f"Downloaded {len(result.downloaded_files or [])} files" if success else "Not enough files downloaded"
)
except Exception as e:
self.log_result("Multiple Downloads", False, str(e))
async def test_different_browsers(self):
"""Test downloads across different browser types"""
browsers = ["chromium", "firefox", "webkit"]
for browser_type in browsers:
try:
async with AsyncWebCrawler(
accept_downloads=True,
downloads_path=self.download_dir,
browser_type=browser_type,
verbose=True
) as crawler:
result = await crawler.arun(
url="https://www.python.org/downloads/",
js_code="""
const downloadLink = document.querySelector('a[href$=".exe"]');
if (downloadLink) downloadLink.click();
"""
)
success = result.downloaded_files is not None and len(result.downloaded_files) > 0
self.log_result(
f"{browser_type.title()} Download",
success,
f"Downloaded {len(result.downloaded_files or [])} files" if success else "No files downloaded"
)
except Exception as e:
self.log_result(f"{browser_type.title()} Download", False, str(e))
async def test_edge_cases(self):
"""Test various edge cases"""
# Test 1: Downloads without specifying download path
try:
async with AsyncWebCrawler(
accept_downloads=True,
verbose=True
) as crawler:
result = await crawler.arun(
url="https://www.python.org/downloads/",
js_code="document.querySelector('a[href$=\".exe\"]').click()"
)
self.log_result(
"Default Download Path",
True,
f"Downloaded to default path: {result.downloaded_files[0] if result.downloaded_files else 'None'}"
)
except Exception as e:
self.log_result("Default Download Path", False, str(e))
# Test 2: Downloads with invalid path
try:
async with AsyncWebCrawler(
accept_downloads=True,
downloads_path="/invalid/path/that/doesnt/exist",
verbose=True
) as crawler:
result = await crawler.arun(
url="https://www.python.org/downloads/",
js_code="document.querySelector('a[href$=\".exe\"]').click()"
)
self.log_result("Invalid Download Path", False, "Should have raised an error")
except Exception as e:
self.log_result("Invalid Download Path", True, "Correctly handled invalid path")
# Test 3: Download with accept_downloads=False
try:
async with AsyncWebCrawler(
accept_downloads=False,
verbose=True
) as crawler:
result = await crawler.arun(
url="https://www.python.org/downloads/",
js_code="document.querySelector('a[href$=\".exe\"]').click()"
)
success = result.downloaded_files is None
self.log_result(
"Disabled Downloads",
success,
"Correctly ignored downloads" if success else "Unexpectedly downloaded files"
)
except Exception as e:
self.log_result("Disabled Downloads", False, str(e))
async def run_all_tests(self):
"""Run all test cases"""
print("\n🧪 Running Download Tests...\n")
test_methods = [
self.test_basic_download,
self.test_persistent_context_download,
self.test_multiple_downloads,
self.test_different_browsers,
self.test_edge_cases
]
for test in test_methods:
print(f"\n📝 Running {test.__doc__}...")
await test()
await asyncio.sleep(2) # Brief pause between tests
print("\n📊 Test Results Summary:")
for result in self.results:
print(result)
successes = len([r for r in self.results if '' in r])
total = len(self.results)
print(f"\nTotal: {successes}/{total} tests passed")
self.cleanup()
async def main():
tester = TestDownloads()
await tester.run_all_tests()
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -0,0 +1,175 @@
import os, sys
import pytest
from bs4 import BeautifulSoup
from typing import List
# Add the parent directory to the Python path
parent_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
sys.path.append(parent_dir)
from crawl4ai.content_filter_strategy import BM25ContentFilter
@pytest.fixture
def basic_html():
return """
<html>
<head>
<title>Test Article</title>
<meta name="description" content="Test description">
<meta name="keywords" content="test, keywords">
</head>
<body>
<h1>Main Heading</h1>
<article>
<p>This is a long paragraph with more than fifty words. It continues with more text to ensure we meet the minimum word count threshold. We need to make sure this paragraph is substantial enough to be considered for extraction according to our filtering rules. This should be enough words now.</p>
<div class="navigation">Skip this nav content</div>
</article>
</body>
</html>
"""
@pytest.fixture
def wiki_html():
return """
<html>
<head>
<title>Wikipedia Article</title>
</head>
<body>
<h1>Article Title</h1>
<h2>Section 1</h2>
<p>Short but important section header description.</p>
<div class="content">
<p>Long paragraph with sufficient words to meet the minimum threshold. This paragraph continues with more text to ensure we have enough content for proper testing. We need to make sure this has enough words to pass our filters and be considered valid content for extraction purposes.</p>
</div>
</body>
</html>
"""
@pytest.fixture
def no_meta_html():
return """
<html>
<body>
<h1>Simple Page</h1>
<p>First paragraph that should be used as fallback for query when no meta tags exist. This text needs to be long enough to serve as a meaningful fallback for our content extraction process.</p>
</body>
</html>
"""
class TestBM25ContentFilter:
def test_basic_extraction(self, basic_html):
"""Test basic content extraction functionality"""
filter = BM25ContentFilter()
contents = filter.filter_content(basic_html)
assert contents, "Should extract content"
assert len(contents) >= 1, "Should extract at least one content block"
assert "long paragraph" in ' '.join(contents).lower()
assert "navigation" not in ' '.join(contents).lower()
def test_user_query_override(self, basic_html):
"""Test that user query overrides metadata extraction"""
user_query = "specific test query"
filter = BM25ContentFilter(user_query=user_query)
# Access internal state to verify query usage
soup = BeautifulSoup(basic_html, 'lxml')
extracted_query = filter.extract_page_query(soup.find('head'))
assert extracted_query == user_query
assert "Test description" not in extracted_query
def test_header_extraction(self, wiki_html):
"""Test that headers are properly extracted despite length"""
filter = BM25ContentFilter()
contents = filter.filter_content(wiki_html)
combined_content = ' '.join(contents).lower()
assert "section 1" in combined_content, "Should include section header"
assert "article title" in combined_content, "Should include main title"
def test_no_metadata_fallback(self, no_meta_html):
"""Test fallback behavior when no metadata is present"""
filter = BM25ContentFilter()
contents = filter.filter_content(no_meta_html)
assert contents, "Should extract content even without metadata"
assert "First paragraph" in ' '.join(contents), "Should use first paragraph content"
def test_empty_input(self):
"""Test handling of empty input"""
filter = BM25ContentFilter()
assert filter.filter_content("") == []
assert filter.filter_content(None) == []
def test_malformed_html(self):
"""Test handling of malformed HTML"""
malformed_html = "<p>Unclosed paragraph<div>Nested content</p></div>"
filter = BM25ContentFilter()
contents = filter.filter_content(malformed_html)
assert isinstance(contents, list), "Should return list even with malformed HTML"
def test_threshold_behavior(self, basic_html):
"""Test different BM25 threshold values"""
strict_filter = BM25ContentFilter(bm25_threshold=2.0)
lenient_filter = BM25ContentFilter(bm25_threshold=0.5)
strict_contents = strict_filter.filter_content(basic_html)
lenient_contents = lenient_filter.filter_content(basic_html)
assert len(strict_contents) <= len(lenient_contents), \
"Strict threshold should extract fewer elements"
def test_html_cleaning(self, basic_html):
"""Test HTML cleaning functionality"""
filter = BM25ContentFilter()
contents = filter.filter_content(basic_html)
cleaned_content = ' '.join(contents)
assert 'class=' not in cleaned_content, "Should remove class attributes"
assert 'style=' not in cleaned_content, "Should remove style attributes"
assert '<script' not in cleaned_content, "Should remove script tags"
def test_large_content(self):
"""Test handling of large content blocks"""
large_html = f"""
<html><body>
<article>{'<p>Test content. ' * 1000}</article>
</body></html>
"""
filter = BM25ContentFilter()
contents = filter.filter_content(large_html)
assert contents, "Should handle large content blocks"
@pytest.mark.parametrize("unwanted_tag", [
'script', 'style', 'nav', 'footer', 'header'
])
def test_excluded_tags(self, unwanted_tag):
"""Test that specific tags are properly excluded"""
html = f"""
<html><body>
<{unwanted_tag}>Should not appear</{unwanted_tag}>
<p>Should appear</p>
</body></html>
"""
filter = BM25ContentFilter()
contents = filter.filter_content(html)
combined_content = ' '.join(contents).lower()
assert "should not appear" not in combined_content
def test_performance(self, basic_html):
"""Test performance with timer"""
filter = BM25ContentFilter()
import time
start = time.perf_counter()
filter.filter_content(basic_html)
duration = time.perf_counter() - start
assert duration < 1.0, f"Processing took too long: {duration:.2f} seconds"
if __name__ == "__main__":
pytest.main([__file__])

View File

@@ -0,0 +1,162 @@
import asyncio
from bs4 import BeautifulSoup
from typing import Dict, Any
import os
import sys
import time
import csv
from tabulate import tabulate
from dataclasses import dataclass
from typing import List, Dict
parent_dir = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
sys.path.append(parent_dir)
__location__ = os.path.realpath(os.path.join(os.getcwd(), os.path.dirname(__file__)))
from crawl4ai.content_scraping_strategy import WebScrapingStrategy
from crawl4ai.content_scraping_strategy import WebScrapingStrategy as WebScrapingStrategyCurrent
# from crawl4ai.content_scrapping_strategy_current import WebScrapingStrategy as WebScrapingStrategyCurrent
@dataclass
class TestResult:
name: str
success: bool
images: int
internal_links: int
external_links: int
markdown_length: int
execution_time: float
class StrategyTester:
def __init__(self):
self.new_scraper = WebScrapingStrategy()
self.current_scraper = WebScrapingStrategyCurrent()
with open(__location__ + '/sample_wikipedia.html', 'r', encoding='utf-8') as f:
self.WIKI_HTML = f.read()
self.results = {'new': [], 'current': []}
def run_test(self, name: str, **kwargs) -> tuple[TestResult, TestResult]:
results = []
for scraper in [self.new_scraper, self.current_scraper]:
start_time = time.time()
result = scraper._get_content_of_website_optimized(
url="https://en.wikipedia.org/wiki/Test",
html=self.WIKI_HTML,
**kwargs
)
execution_time = time.time() - start_time
test_result = TestResult(
name=name,
success=result['success'],
images=len(result['media']['images']),
internal_links=len(result['links']['internal']),
external_links=len(result['links']['external']),
markdown_length=len(result['markdown']),
execution_time=execution_time
)
results.append(test_result)
return results[0], results[1] # new, current
def run_all_tests(self):
test_cases = [
("Basic Extraction", {}),
("Exclude Tags", {'excluded_tags': ['table', 'div.infobox', 'div.navbox']}),
("Word Threshold", {'word_count_threshold': 50}),
("CSS Selector", {'css_selector': 'div.mw-parser-output > p'}),
("Link Exclusions", {
'exclude_external_links': True,
'exclude_social_media_links': True,
'exclude_domains': ['facebook.com', 'twitter.com']
}),
("Media Handling", {
'exclude_external_images': True,
'image_description_min_word_threshold': 20
}),
("Text Only", {
'only_text': True,
'remove_forms': True
}),
("HTML Cleaning", {
'clean_html': True,
'keep_data_attributes': True
}),
("HTML2Text Options", {
'html2text': {
'skip_internal_links': True,
'single_line_break': True,
'mark_code': True,
'preserve_tags': ['pre', 'code']
}
})
]
all_results = []
for name, kwargs in test_cases:
try:
new_result, current_result = self.run_test(name, **kwargs)
all_results.append((name, new_result, current_result))
except Exception as e:
print(f"Error in {name}: {str(e)}")
self.save_results_to_csv(all_results)
self.print_comparison_table(all_results)
def save_results_to_csv(self, all_results: List[tuple]):
csv_file = os.path.join(__location__, 'strategy_comparison_results.csv')
with open(csv_file, 'w', newline='') as f:
writer = csv.writer(f)
writer.writerow(['Test Name', 'Strategy', 'Success', 'Images', 'Internal Links',
'External Links', 'Markdown Length', 'Execution Time'])
for name, new_result, current_result in all_results:
writer.writerow([name, 'New', new_result.success, new_result.images,
new_result.internal_links, new_result.external_links,
new_result.markdown_length, f"{new_result.execution_time:.3f}"])
writer.writerow([name, 'Current', current_result.success, current_result.images,
current_result.internal_links, current_result.external_links,
current_result.markdown_length, f"{current_result.execution_time:.3f}"])
def print_comparison_table(self, all_results: List[tuple]):
table_data = []
headers = ['Test Name', 'Strategy', 'Success', 'Images', 'Internal Links',
'External Links', 'Markdown Length', 'Time (s)']
for name, new_result, current_result in all_results:
# Check for differences
differences = []
if new_result.images != current_result.images: differences.append('images')
if new_result.internal_links != current_result.internal_links: differences.append('internal_links')
if new_result.external_links != current_result.external_links: differences.append('external_links')
if new_result.markdown_length != current_result.markdown_length: differences.append('markdown')
# Add row for new strategy
new_row = [
name, 'New', new_result.success, new_result.images,
new_result.internal_links, new_result.external_links,
new_result.markdown_length, f"{new_result.execution_time:.3f}"
]
table_data.append(new_row)
# Add row for current strategy
current_row = [
'', 'Current', current_result.success, current_result.images,
current_result.internal_links, current_result.external_links,
current_result.markdown_length, f"{current_result.execution_time:.3f}"
]
table_data.append(current_row)
# Add difference summary if any
if differences:
table_data.append(['', '⚠️ Differences', ', '.join(differences), '', '', '', '', ''])
# Add empty row for better readability
table_data.append([''] * len(headers))
print("\nStrategy Comparison Results:")
print(tabulate(table_data, headers=headers, tablefmt='grid'))
if __name__ == "__main__":
tester = StrategyTester()
tester.run_all_tests()

View File

@@ -0,0 +1,165 @@
# ## Issue #236
# - **Last Updated:** 2024-11-11 01:42:14
# - **Title:** [user data crawling opens two windows, unable to control correct user browser](https://github.com/unclecode/crawl4ai/issues/236)
# - **State:** open
import os, sys, time
parent_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
sys.path.append(parent_dir)
__location__ = os.path.realpath( os.path.join(os.getcwd(), os.path.dirname(__file__)))
import asyncio
import os
import time
from typing import Dict, Any
from crawl4ai.markdown_generation_strategy import DefaultMarkdownGenerationStrategy
# Get current directory
__location__ = os.path.realpath(os.path.join(os.getcwd(), os.path.dirname(__file__)))
def print_test_result(name: str, result: Dict[str, Any], execution_time: float):
"""Helper function to print test results."""
print(f"\n{'='*20} {name} {'='*20}")
print(f"Execution time: {execution_time:.4f} seconds")
# Save markdown to files
for key, content in result.items():
if isinstance(content, str):
with open(__location__ + f"/output/{name.lower()}_{key}.md", "w") as f:
f.write(content)
# # Print first few lines of each markdown version
# for key, content in result.items():
# if isinstance(content, str):
# preview = '\n'.join(content.split('\n')[:3])
# print(f"\n{key} (first 3 lines):")
# print(preview)
# print(f"Total length: {len(content)} characters")
def test_basic_markdown_conversion():
"""Test basic markdown conversion with links."""
with open(__location__ + "/data/wikipedia.html", "r") as f:
cleaned_html = f.read()
generator = DefaultMarkdownGenerationStrategy()
start_time = time.perf_counter()
result = generator.generate_markdown(
cleaned_html=cleaned_html,
base_url="https://en.wikipedia.org"
)
execution_time = time.perf_counter() - start_time
print_test_result("Basic Markdown Conversion", {
'raw': result.raw_markdown,
'with_citations': result.markdown_with_citations,
'references': result.references_markdown
}, execution_time)
# Basic assertions
assert result.raw_markdown, "Raw markdown should not be empty"
assert result.markdown_with_citations, "Markdown with citations should not be empty"
assert result.references_markdown, "References should not be empty"
assert "" in result.markdown_with_citations, "Citations should use ⟨⟩ brackets"
assert "## References" in result.references_markdown, "Should contain references section"
def test_relative_links():
"""Test handling of relative links with base URL."""
markdown = """
Here's a [relative link](/wiki/Apple) and an [absolute link](https://example.com).
Also an [image](/images/test.png) and another [page](/wiki/Banana).
"""
generator = DefaultMarkdownGenerationStrategy()
result = generator.generate_markdown(
cleaned_html=markdown,
base_url="https://en.wikipedia.org"
)
assert "https://en.wikipedia.org/wiki/Apple" in result.references_markdown
assert "https://example.com" in result.references_markdown
assert "https://en.wikipedia.org/images/test.png" in result.references_markdown
def test_duplicate_links():
"""Test handling of duplicate links."""
markdown = """
Here's a [link](/test) and another [link](/test) and a [different link](/other).
"""
generator = DefaultMarkdownGenerationStrategy()
result = generator.generate_markdown(
cleaned_html=markdown,
base_url="https://example.com"
)
# Count citations in markdown
citations = result.markdown_with_citations.count("⟨1⟩")
assert citations == 2, "Same link should use same citation number"
def test_link_descriptions():
"""Test handling of link titles and descriptions."""
markdown = """
Here's a [link with title](/test "Test Title") and a [link with description](/other) to test.
"""
generator = DefaultMarkdownGenerationStrategy()
result = generator.generate_markdown(
cleaned_html=markdown,
base_url="https://example.com"
)
assert "Test Title" in result.references_markdown, "Link title should be in references"
assert "link with description" in result.references_markdown, "Link text should be in references"
def test_performance_large_document():
"""Test performance with large document."""
with open(__location__ + "/data/wikipedia.md", "r") as f:
markdown = f.read()
# Test with multiple iterations
iterations = 5
times = []
generator = DefaultMarkdownGenerationStrategy()
for i in range(iterations):
start_time = time.perf_counter()
result = generator.generate_markdown(
cleaned_html=markdown,
base_url="https://en.wikipedia.org"
)
end_time = time.perf_counter()
times.append(end_time - start_time)
avg_time = sum(times) / len(times)
print(f"\n{'='*20} Performance Test {'='*20}")
print(f"Average execution time over {iterations} iterations: {avg_time:.4f} seconds")
print(f"Min time: {min(times):.4f} seconds")
print(f"Max time: {max(times):.4f} seconds")
def test_image_links():
"""Test handling of image links."""
markdown = """
Here's an ![image](/image.png "Image Title") and another ![image](/other.jpg).
And a regular [link](/page).
"""
generator = DefaultMarkdownGenerationStrategy()
result = generator.generate_markdown(
cleaned_html=markdown,
base_url="https://example.com"
)
assert "![" in result.markdown_with_citations, "Image markdown syntax should be preserved"
assert "Image Title" in result.references_markdown, "Image title should be in references"
if __name__ == "__main__":
print("Running markdown generation strategy tests...")
test_basic_markdown_conversion()
test_relative_links()
test_duplicate_links()
test_link_descriptions()
test_performance_large_document()
test_image_links()

332
tests/docker_example.py Normal file
View File

@@ -0,0 +1,332 @@
import requests
import json
import time
import sys
import base64
import os
from typing import Dict, Any
class Crawl4AiTester:
def __init__(self, base_url: str = "http://localhost:11235", api_token: str = None):
self.base_url = base_url
self.api_token = api_token or os.getenv('CRAWL4AI_API_TOKEN') # Check environment variable as fallback
self.headers = {'Authorization': f'Bearer {self.api_token}'} if self.api_token else {}
def submit_and_wait(self, request_data: Dict[str, Any], timeout: int = 300) -> Dict[str, Any]:
# Submit crawl job
response = requests.post(f"{self.base_url}/crawl", json=request_data, headers=self.headers)
if response.status_code == 403:
raise Exception("API token is invalid or missing")
task_id = response.json()["task_id"]
print(f"Task ID: {task_id}")
# Poll for result
start_time = time.time()
while True:
if time.time() - start_time > timeout:
raise TimeoutError(f"Task {task_id} did not complete within {timeout} seconds")
result = requests.get(f"{self.base_url}/task/{task_id}", headers=self.headers)
status = result.json()
if status["status"] == "failed":
print("Task failed:", status.get("error"))
raise Exception(f"Task failed: {status.get('error')}")
if status["status"] == "completed":
return status
time.sleep(2)
def submit_sync(self, request_data: Dict[str, Any]) -> Dict[str, Any]:
response = requests.post(f"{self.base_url}/crawl_sync", json=request_data, headers=self.headers, timeout=60)
if response.status_code == 408:
raise TimeoutError("Task did not complete within server timeout")
response.raise_for_status()
return response.json()
def test_docker_deployment(version="basic"):
tester = Crawl4AiTester(
# base_url="http://localhost:11235" ,
base_url="https://crawl4ai-sby74.ondigitalocean.app",
api_token="test"
)
print(f"Testing Crawl4AI Docker {version} version")
# Health check with timeout and retry
max_retries = 5
for i in range(max_retries):
try:
health = requests.get(f"{tester.base_url}/health", timeout=10)
print("Health check:", health.json())
break
except requests.exceptions.RequestException as e:
if i == max_retries - 1:
print(f"Failed to connect after {max_retries} attempts")
sys.exit(1)
print(f"Waiting for service to start (attempt {i+1}/{max_retries})...")
time.sleep(5)
# Test cases based on version
test_basic_crawl(tester)
test_basic_crawl(tester)
test_basic_crawl_sync(tester)
# if version in ["full", "transformer"]:
# test_cosine_extraction(tester)
# test_js_execution(tester)
# test_css_selector(tester)
# test_structured_extraction(tester)
# test_llm_extraction(tester)
# test_llm_with_ollama(tester)
# test_screenshot(tester)
def test_basic_crawl(tester: Crawl4AiTester):
print("\n=== Testing Basic Crawl ===")
request = {
"urls": "https://www.nbcnews.com/business",
"priority": 10,
"session_id": "test"
}
result = tester.submit_and_wait(request)
print(f"Basic crawl result length: {len(result['result']['markdown'])}")
assert result["result"]["success"]
assert len(result["result"]["markdown"]) > 0
def test_basic_crawl_sync(tester: Crawl4AiTester):
print("\n=== Testing Basic Crawl (Sync) ===")
request = {
"urls": "https://www.nbcnews.com/business",
"priority": 10,
"session_id": "test"
}
result = tester.submit_sync(request)
print(f"Basic crawl result length: {len(result['result']['markdown'])}")
assert result['status'] == 'completed'
assert result['result']['success']
assert len(result['result']['markdown']) > 0
def test_js_execution(tester: Crawl4AiTester):
print("\n=== Testing JS Execution ===")
request = {
"urls": "https://www.nbcnews.com/business",
"priority": 8,
"js_code": [
"const loadMoreButton = Array.from(document.querySelectorAll('button')).find(button => button.textContent.includes('Load More')); loadMoreButton && loadMoreButton.click();"
],
"wait_for": "article.tease-card:nth-child(10)",
"crawler_params": {
"headless": True
}
}
result = tester.submit_and_wait(request)
print(f"JS execution result length: {len(result['result']['markdown'])}")
assert result["result"]["success"]
def test_css_selector(tester: Crawl4AiTester):
print("\n=== Testing CSS Selector ===")
request = {
"urls": "https://www.nbcnews.com/business",
"priority": 7,
"css_selector": ".wide-tease-item__description",
"crawler_params": {
"headless": True
},
"extra": {"word_count_threshold": 10}
}
result = tester.submit_and_wait(request)
print(f"CSS selector result length: {len(result['result']['markdown'])}")
assert result["result"]["success"]
def test_structured_extraction(tester: Crawl4AiTester):
print("\n=== Testing Structured Extraction ===")
schema = {
"name": "Coinbase Crypto Prices",
"baseSelector": ".cds-tableRow-t45thuk",
"fields": [
{
"name": "crypto",
"selector": "td:nth-child(1) h2",
"type": "text",
},
{
"name": "symbol",
"selector": "td:nth-child(1) p",
"type": "text",
},
{
"name": "price",
"selector": "td:nth-child(2)",
"type": "text",
}
],
}
request = {
"urls": "https://www.coinbase.com/explore",
"priority": 9,
"extraction_config": {
"type": "json_css",
"params": {
"schema": schema
}
}
}
result = tester.submit_and_wait(request)
extracted = json.loads(result["result"]["extracted_content"])
print(f"Extracted {len(extracted)} items")
print("Sample item:", json.dumps(extracted[0], indent=2))
assert result["result"]["success"]
assert len(extracted) > 0
def test_llm_extraction(tester: Crawl4AiTester):
print("\n=== Testing LLM Extraction ===")
schema = {
"type": "object",
"properties": {
"model_name": {
"type": "string",
"description": "Name of the OpenAI model."
},
"input_fee": {
"type": "string",
"description": "Fee for input token for the OpenAI model."
},
"output_fee": {
"type": "string",
"description": "Fee for output token for the OpenAI model."
}
},
"required": ["model_name", "input_fee", "output_fee"]
}
request = {
"urls": "https://openai.com/api/pricing",
"priority": 8,
"extraction_config": {
"type": "llm",
"params": {
"provider": "openai/gpt-4o-mini",
"api_token": os.getenv("OPENAI_API_KEY"),
"schema": schema,
"extraction_type": "schema",
"instruction": """From the crawled content, extract all mentioned model names along with their fees for input and output tokens."""
}
},
"crawler_params": {"word_count_threshold": 1}
}
try:
result = tester.submit_and_wait(request)
extracted = json.loads(result["result"]["extracted_content"])
print(f"Extracted {len(extracted)} model pricing entries")
print("Sample entry:", json.dumps(extracted[0], indent=2))
assert result["result"]["success"]
except Exception as e:
print(f"LLM extraction test failed (might be due to missing API key): {str(e)}")
def test_llm_with_ollama(tester: Crawl4AiTester):
print("\n=== Testing LLM with Ollama ===")
schema = {
"type": "object",
"properties": {
"article_title": {
"type": "string",
"description": "The main title of the news article"
},
"summary": {
"type": "string",
"description": "A brief summary of the article content"
},
"main_topics": {
"type": "array",
"items": {"type": "string"},
"description": "Main topics or themes discussed in the article"
}
}
}
request = {
"urls": "https://www.nbcnews.com/business",
"priority": 8,
"extraction_config": {
"type": "llm",
"params": {
"provider": "ollama/llama2",
"schema": schema,
"extraction_type": "schema",
"instruction": "Extract the main article information including title, summary, and main topics."
}
},
"extra": {"word_count_threshold": 1},
"crawler_params": {"verbose": True}
}
try:
result = tester.submit_and_wait(request)
extracted = json.loads(result["result"]["extracted_content"])
print("Extracted content:", json.dumps(extracted, indent=2))
assert result["result"]["success"]
except Exception as e:
print(f"Ollama extraction test failed: {str(e)}")
def test_cosine_extraction(tester: Crawl4AiTester):
print("\n=== Testing Cosine Extraction ===")
request = {
"urls": "https://www.nbcnews.com/business",
"priority": 8,
"extraction_config": {
"type": "cosine",
"params": {
"semantic_filter": "business finance economy",
"word_count_threshold": 10,
"max_dist": 0.2,
"top_k": 3
}
}
}
try:
result = tester.submit_and_wait(request)
extracted = json.loads(result["result"]["extracted_content"])
print(f"Extracted {len(extracted)} text clusters")
print("First cluster tags:", extracted[0]["tags"])
assert result["result"]["success"]
except Exception as e:
print(f"Cosine extraction test failed: {str(e)}")
def test_screenshot(tester: Crawl4AiTester):
print("\n=== Testing Screenshot ===")
request = {
"urls": "https://www.nbcnews.com/business",
"priority": 5,
"screenshot": True,
"crawler_params": {
"headless": True
}
}
result = tester.submit_and_wait(request)
print("Screenshot captured:", bool(result["result"]["screenshot"]))
if result["result"]["screenshot"]:
# Save screenshot
screenshot_data = base64.b64decode(result["result"]["screenshot"])
with open("test_screenshot.jpg", "wb") as f:
f.write(screenshot_data)
print("Screenshot saved as test_screenshot.jpg")
assert result["result"]["success"]
if __name__ == "__main__":
version = sys.argv[1] if len(sys.argv) > 1 else "basic"
# version = "full"
test_docker_deployment(version)