Merge branch '2025-JUN-1' into next-MAY
This commit is contained in:
75
tests/deep_crwaling/test_filter.py
Normal file
75
tests/deep_crwaling/test_filter.py
Normal file
@@ -0,0 +1,75 @@
|
||||
# // File: tests/deep_crawling/test_filters.py
|
||||
import pytest
|
||||
from urllib.parse import urlparse
|
||||
from crawl4ai import ContentTypeFilter, URLFilter
|
||||
|
||||
# Minimal URLFilter base class stub if not already importable directly for tests
|
||||
# In a real scenario, this would be imported from the library
|
||||
if not hasattr(URLFilter, '_update_stats'): # Check if it's a basic stub
|
||||
class URLFilter: # Basic stub for testing if needed
|
||||
def __init__(self, name=None): self.name = name
|
||||
def apply(self, url: str) -> bool: raise NotImplementedError
|
||||
def _update_stats(self, passed: bool): pass # Mock implementation
|
||||
|
||||
# Assume ContentTypeFilter is structured as discussed. If its definition is not fully
|
||||
# available for direct import in the test environment, a more elaborate stub or direct
|
||||
# instantiation of the real class (if possible) would be needed.
|
||||
# For this example, we assume ContentTypeFilter can be imported and used.
|
||||
|
||||
class TestContentTypeFilter:
|
||||
@pytest.mark.parametrize(
|
||||
"url, allowed_types, expected",
|
||||
[
|
||||
# Existing tests (examples)
|
||||
("http://example.com/page.html", ["text/html"], True),
|
||||
("http://example.com/page.json", ["application/json"], True),
|
||||
("http://example.com/image.png", ["text/html"], False),
|
||||
("http://example.com/document.pdf", ["application/pdf"], True),
|
||||
("http://example.com/page", ["text/html"], True), # No extension, allowed
|
||||
("http://example.com/page", ["text/html"], False), # No extension, disallowed
|
||||
("http://example.com/page.unknown", ["text/html"], False), # Unknown extension
|
||||
|
||||
# Tests for PHP extensions
|
||||
("http://example.com/index.php", ["application/x-httpd-php"], True),
|
||||
("http://example.com/script.php3", ["application/x-httpd-php"], True),
|
||||
("http://example.com/legacy.php4", ["application/x-httpd-php"], True),
|
||||
("http://example.com/main.php5", ["application/x-httpd-php"], True),
|
||||
("http://example.com/api.php7", ["application/x-httpd-php"], True),
|
||||
("http://example.com/index.phtml", ["application/x-httpd-php"], True),
|
||||
("http://example.com/source.phps", ["application/x-httpd-php-source"], True),
|
||||
|
||||
# Test rejection of PHP extensions
|
||||
("http://example.com/index.php", ["text/html"], False),
|
||||
("http://example.com/script.php3", ["text/plain"], False),
|
||||
("http://example.com/source.phps", ["application/x-httpd-php"], False), # Mismatch MIME
|
||||
("http://example.com/source.php", ["application/x-httpd-php-source"], False), # Mismatch MIME for .php
|
||||
|
||||
# Test case-insensitivity of extensions in URL
|
||||
("http://example.com/PAGE.HTML", ["text/html"], True),
|
||||
("http://example.com/INDEX.PHP", ["application/x-httpd-php"], True),
|
||||
("http://example.com/SOURCE.PHPS", ["application/x-httpd-php-source"], True),
|
||||
|
||||
# Test case-insensitivity of allowed_types
|
||||
("http://example.com/index.php", ["APPLICATION/X-HTTPD-PHP"], True),
|
||||
],
|
||||
)
|
||||
def test_apply(self, url, allowed_types, expected):
|
||||
content_filter = ContentTypeFilter(
|
||||
allowed_types=allowed_types
|
||||
)
|
||||
assert content_filter.apply(url) == expected
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"url, expected_extension",
|
||||
[
|
||||
("http://example.com/file.html", "html"),
|
||||
("http://example.com/file.tar.gz", "gz"),
|
||||
("http://example.com/path/", ""),
|
||||
("http://example.com/nodot", ""),
|
||||
("http://example.com/.config", "config"), # hidden file with extension
|
||||
("http://example.com/path/to/archive.BIG.zip", "zip"), # Case test
|
||||
]
|
||||
)
|
||||
def test_extract_extension(self, url, expected_extension):
|
||||
# Test the static method directly
|
||||
assert ContentTypeFilter._extract_extension(url) == expected_extension
|
||||
@@ -15,6 +15,24 @@ CRAWL4AI_HOME_DIR = Path(os.path.expanduser("~")).joinpath(".crawl4ai")
|
||||
if not CRAWL4AI_HOME_DIR.joinpath("profiles", "test_profile").exists():
|
||||
CRAWL4AI_HOME_DIR.joinpath("profiles", "test_profile").mkdir(parents=True)
|
||||
|
||||
@pytest.fixture
|
||||
def basic_html():
|
||||
return """
|
||||
<html lang="en">
|
||||
<head>
|
||||
<title>Basic HTML</title>
|
||||
</head>
|
||||
<body>
|
||||
<h1>Main Heading</h1>
|
||||
<main>
|
||||
<div class="container">
|
||||
<p>Basic HTML document for testing purposes.</p>
|
||||
</div>
|
||||
</main>
|
||||
</body>
|
||||
</html>
|
||||
"""
|
||||
|
||||
# Test Config Files
|
||||
@pytest.fixture
|
||||
def basic_browser_config():
|
||||
@@ -325,6 +343,13 @@ async def test_stealth_mode(crawler_strategy):
|
||||
)
|
||||
assert response.status_code == 200
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@pytest.mark.parametrize("prefix", ("raw:", "raw://"))
|
||||
async def test_raw_urls(crawler_strategy, basic_html, prefix):
|
||||
url = f"{prefix}{basic_html}"
|
||||
response = await crawler_strategy.crawl(url, CrawlerRunConfig())
|
||||
assert response.html == basic_html
|
||||
|
||||
# Error Handling Tests
|
||||
@pytest.mark.asyncio
|
||||
async def test_invalid_url():
|
||||
|
||||
34
tests/general/test_download_file.py
Normal file
34
tests/general/test_download_file.py
Normal file
@@ -0,0 +1,34 @@
|
||||
import asyncio
|
||||
from crawl4ai import CrawlerRunConfig, AsyncWebCrawler, BrowserConfig
|
||||
from pathlib import Path
|
||||
import os
|
||||
|
||||
async def test_basic_download():
|
||||
|
||||
# Custom folder (otherwise defaults to ~/.crawl4ai/downloads)
|
||||
downloads_path = os.path.join(Path.home(), ".crawl4ai", "downloads")
|
||||
os.makedirs(downloads_path, exist_ok=True)
|
||||
browser_config = BrowserConfig(
|
||||
accept_downloads=True,
|
||||
downloads_path=downloads_path
|
||||
)
|
||||
async with AsyncWebCrawler(config=browser_config) as crawler:
|
||||
run_config = CrawlerRunConfig(
|
||||
js_code="""
|
||||
const link = document.querySelector('a[href$=".exe"]');
|
||||
if (link) { link.click(); }
|
||||
""",
|
||||
delay_before_return_html=5
|
||||
)
|
||||
result = await crawler.arun("https://www.python.org/downloads/", config=run_config)
|
||||
|
||||
if result.downloaded_files:
|
||||
print("Downloaded files:")
|
||||
for file_path in result.downloaded_files:
|
||||
print("•", file_path)
|
||||
else:
|
||||
print("No files downloaded.")
|
||||
|
||||
if __name__ == "__main__":
|
||||
asyncio.run(test_basic_download())
|
||||
|
||||
115
tests/general/test_max_scroll.py
Normal file
115
tests/general/test_max_scroll.py
Normal file
@@ -0,0 +1,115 @@
|
||||
"""
|
||||
Sample script to test the max_scroll_steps parameter implementation
|
||||
"""
|
||||
import asyncio
|
||||
import os
|
||||
import sys
|
||||
|
||||
# Get the grandparent directory
|
||||
grandparent_dir = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
||||
sys.path.append(grandparent_dir)
|
||||
__location__ = os.path.realpath(os.path.join(os.getcwd(), os.path.dirname(__file__)))
|
||||
|
||||
|
||||
|
||||
from crawl4ai import AsyncWebCrawler
|
||||
from crawl4ai.async_configs import CrawlerRunConfig
|
||||
|
||||
async def test_max_scroll_steps():
|
||||
"""
|
||||
Test the max_scroll_steps parameter with different configurations
|
||||
"""
|
||||
print("🚀 Testing max_scroll_steps parameter implementation")
|
||||
print("=" * 60)
|
||||
|
||||
async with AsyncWebCrawler(verbose=True) as crawler:
|
||||
|
||||
# Test 1: Without max_scroll_steps (unlimited scrolling)
|
||||
print("\\n📋 Test 1: Unlimited scrolling (max_scroll_steps=None)")
|
||||
config1 = CrawlerRunConfig(
|
||||
scan_full_page=True,
|
||||
scroll_delay=0.1,
|
||||
max_scroll_steps=None, # Default behavior
|
||||
verbose=True
|
||||
)
|
||||
|
||||
print(f"Config: scan_full_page={config1.scan_full_page}, max_scroll_steps={config1.max_scroll_steps}")
|
||||
|
||||
try:
|
||||
result1 = await crawler.arun(
|
||||
url="https://example.com", # Simple page for testing
|
||||
config=config1
|
||||
)
|
||||
print(f"✅ Test 1 Success: Crawled {len(result1.markdown)} characters")
|
||||
except Exception as e:
|
||||
print(f"❌ Test 1 Failed: {e}")
|
||||
|
||||
# Test 2: With limited scroll steps
|
||||
print("\\n📋 Test 2: Limited scrolling (max_scroll_steps=3)")
|
||||
config2 = CrawlerRunConfig(
|
||||
scan_full_page=True,
|
||||
scroll_delay=0.1,
|
||||
max_scroll_steps=3, # Limit to 3 scroll steps
|
||||
verbose=True
|
||||
)
|
||||
|
||||
print(f"Config: scan_full_page={config2.scan_full_page}, max_scroll_steps={config2.max_scroll_steps}")
|
||||
|
||||
try:
|
||||
result2 = await crawler.arun(
|
||||
url="https://techcrunch.com/", # Another test page
|
||||
config=config2
|
||||
)
|
||||
print(f"✅ Test 2 Success: Crawled {len(result2.markdown)} characters")
|
||||
except Exception as e:
|
||||
print(f"❌ Test 2 Failed: {e}")
|
||||
|
||||
# Test 3: Test serialization/deserialization
|
||||
print("\\n📋 Test 3: Configuration serialization test")
|
||||
config3 = CrawlerRunConfig(
|
||||
scan_full_page=True,
|
||||
max_scroll_steps=5,
|
||||
scroll_delay=0.2
|
||||
)
|
||||
|
||||
# Test to_dict
|
||||
config_dict = config3.to_dict()
|
||||
print(f"Serialized max_scroll_steps: {config_dict.get('max_scroll_steps')}")
|
||||
|
||||
# Test from_kwargs
|
||||
config4 = CrawlerRunConfig.from_kwargs({
|
||||
'scan_full_page': True,
|
||||
'max_scroll_steps': 7,
|
||||
'scroll_delay': 0.3
|
||||
})
|
||||
print(f"Deserialized max_scroll_steps: {config4.max_scroll_steps}")
|
||||
print("✅ Test 3 Success: Serialization works correctly")
|
||||
|
||||
# Test 4: Edge case - max_scroll_steps = 0
|
||||
print("\\n📋 Test 4: Edge case (max_scroll_steps=0)")
|
||||
config5 = CrawlerRunConfig(
|
||||
scan_full_page=True,
|
||||
max_scroll_steps=0, # Should not scroll at all
|
||||
verbose=True
|
||||
)
|
||||
|
||||
try:
|
||||
result5 = await crawler.arun(
|
||||
url="https://techcrunch.com/",
|
||||
config=config5
|
||||
)
|
||||
print(f"✅ Test 4 Success: No scrolling performed, crawled {len(result5.markdown)} characters")
|
||||
except Exception as e:
|
||||
print(f"❌ Test 4 Failed: {e}")
|
||||
|
||||
print("\\n" + "=" * 60)
|
||||
print("🎉 All tests completed!")
|
||||
print("\\nThe max_scroll_steps parameter is working correctly:")
|
||||
print("- None: Unlimited scrolling (default behavior)")
|
||||
print("- Positive integer: Limits scroll steps to that number")
|
||||
print("- 0: No scrolling performed")
|
||||
print("- Properly serializes/deserializes in config")
|
||||
|
||||
if __name__ == "__main__":
|
||||
print("Starting max_scroll_steps test...")
|
||||
asyncio.run(test_max_scroll_steps())
|
||||
85
tests/general/test_url_pattern.py
Normal file
85
tests/general/test_url_pattern.py
Normal file
@@ -0,0 +1,85 @@
|
||||
import sys
|
||||
import os
|
||||
|
||||
# Get the grandparent directory
|
||||
grandparent_dir = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
||||
sys.path.append(grandparent_dir)
|
||||
__location__ = os.path.realpath(os.path.join(os.getcwd(), os.path.dirname(__file__)))
|
||||
|
||||
import asyncio
|
||||
from crawl4ai.deep_crawling.filters import URLPatternFilter
|
||||
|
||||
|
||||
def test_prefix_boundary_matching():
|
||||
"""Test that prefix patterns respect path boundaries"""
|
||||
print("=== Testing URLPatternFilter Prefix Boundary Fix ===")
|
||||
|
||||
filter_obj = URLPatternFilter(patterns=['https://langchain-ai.github.io/langgraph/*'])
|
||||
|
||||
test_cases = [
|
||||
('https://langchain-ai.github.io/langgraph/', True),
|
||||
('https://langchain-ai.github.io/langgraph/concepts/', True),
|
||||
('https://langchain-ai.github.io/langgraph/tutorials/', True),
|
||||
('https://langchain-ai.github.io/langgraph?param=1', True),
|
||||
('https://langchain-ai.github.io/langgraph#section', True),
|
||||
('https://langchain-ai.github.io/langgraphjs/', False),
|
||||
('https://langchain-ai.github.io/langgraphjs/concepts/', False),
|
||||
('https://other-site.com/langgraph/', False),
|
||||
]
|
||||
|
||||
all_passed = True
|
||||
for url, expected in test_cases:
|
||||
result = filter_obj.apply(url)
|
||||
status = "PASS" if result == expected else "FAIL"
|
||||
if result != expected:
|
||||
all_passed = False
|
||||
print(f"{status:4} | Expected: {expected:5} | Got: {result:5} | {url}")
|
||||
|
||||
return all_passed
|
||||
|
||||
|
||||
def test_edge_cases():
|
||||
"""Test edge cases for path boundary matching"""
|
||||
print("\n=== Testing Edge Cases ===")
|
||||
|
||||
test_patterns = [
|
||||
('/api/*', [
|
||||
('/api/', True),
|
||||
('/api/v1', True),
|
||||
('/api?param=1', True),
|
||||
('/apiv2/', False),
|
||||
('/api_old/', False),
|
||||
]),
|
||||
|
||||
('*/docs/*', [
|
||||
('example.com/docs/', True),
|
||||
('example.com/docs/guide', True),
|
||||
('example.com/documentation/', False),
|
||||
('example.com/docs_old/', False),
|
||||
]),
|
||||
]
|
||||
|
||||
all_passed = True
|
||||
for pattern, test_cases in test_patterns:
|
||||
print(f"\nPattern: {pattern}")
|
||||
filter_obj = URLPatternFilter(patterns=[pattern])
|
||||
|
||||
for url, expected in test_cases:
|
||||
result = filter_obj.apply(url)
|
||||
status = "PASS" if result == expected else "FAIL"
|
||||
if result != expected:
|
||||
all_passed = False
|
||||
print(f" {status:4} | Expected: {expected:5} | Got: {result:5} | {url}")
|
||||
|
||||
return all_passed
|
||||
|
||||
if __name__ == "__main__":
|
||||
test1_passed = test_prefix_boundary_matching()
|
||||
test2_passed = test_edge_cases()
|
||||
|
||||
if test1_passed and test2_passed:
|
||||
print("\n✅ All tests passed!")
|
||||
sys.exit(0)
|
||||
else:
|
||||
print("\n❌ Some tests failed!")
|
||||
sys.exit(1)
|
||||
Reference in New Issue
Block a user