- Implemented `test_link_analysis` in `test_docker.py` to validate link analysis functionality. - Created `test_link_analysis.py` with comprehensive tests for link analysis, including basic functionality, configuration options, error handling, performance, and edge cases. - Added integration tests in `test_link_analysis_integration.py` to verify the /links/analyze endpoint, including health checks, authentication, and error handling.
169 lines
5.1 KiB
Python
169 lines
5.1 KiB
Python
import requests
|
|
import json
|
|
import time
|
|
import sys
|
|
|
|
|
|
def test_links_analyze_endpoint():
|
|
"""Integration test for the /links/analyze endpoint"""
|
|
|
|
base_url = "http://localhost:11234"
|
|
|
|
# Health check
|
|
try:
|
|
health_response = requests.get(f"{base_url}/health", timeout=5)
|
|
if health_response.status_code != 200:
|
|
print("❌ Server health check failed")
|
|
return False
|
|
print("✅ Server health check passed")
|
|
except Exception as e:
|
|
print(f"❌ Cannot connect to server: {e}")
|
|
return False
|
|
|
|
# Get auth token
|
|
token = None
|
|
try:
|
|
token_response = requests.post(
|
|
f"{base_url}/token",
|
|
json={"email": "test@example.com"},
|
|
timeout=5
|
|
)
|
|
if token_response.status_code == 200:
|
|
token = token_response.json()["access_token"]
|
|
print("✅ Authentication token obtained")
|
|
except Exception as e:
|
|
print(f"⚠️ Could not get auth token: {e}")
|
|
|
|
# Test the links/analyze endpoint
|
|
headers = {"Content-Type": "application/json"}
|
|
if token:
|
|
headers["Authorization"] = f"Bearer {token}"
|
|
|
|
# Test 1: Basic request
|
|
print("\n🔍 Testing basic link analysis...")
|
|
test_data = {
|
|
"url": "https://httpbin.org/links/10",
|
|
"config": {
|
|
"include_internal": True,
|
|
"include_external": True,
|
|
"max_links": 50,
|
|
"verbose": True
|
|
}
|
|
}
|
|
|
|
try:
|
|
response = requests.post(
|
|
f"{base_url}/links/analyze",
|
|
headers=headers,
|
|
json=test_data,
|
|
timeout=30
|
|
)
|
|
|
|
if response.status_code == 200:
|
|
result = response.json()
|
|
print("✅ Basic link analysis successful")
|
|
print(f"📄 Response structure: {list(result.keys())}")
|
|
|
|
# Verify response structure
|
|
total_links = sum(len(links) for links in result.values())
|
|
print(f"📊 Found {total_links} total links")
|
|
|
|
# Debug: Show what was actually returned
|
|
if total_links == 0:
|
|
print("⚠️ No links found - showing full response:")
|
|
print(json.dumps(result, indent=2))
|
|
|
|
# Check for expected categories
|
|
found_categories = []
|
|
for category in ['internal', 'external', 'social', 'download', 'email', 'phone']:
|
|
if category in result and result[category]:
|
|
found_categories.append(category)
|
|
|
|
print(f"📂 Found categories: {found_categories}")
|
|
|
|
# Verify link objects have required fields
|
|
if total_links > 0:
|
|
sample_found = False
|
|
for category, links in result.items():
|
|
if links:
|
|
sample_link = links[0]
|
|
if 'href' in sample_link and 'total_score' in sample_link:
|
|
sample_found = True
|
|
break
|
|
|
|
if sample_found:
|
|
print("✅ Link objects have required fields")
|
|
else:
|
|
print("⚠️ Link objects missing required fields")
|
|
|
|
else:
|
|
print(f"❌ Basic link analysis failed: {response.status_code}")
|
|
print(f"Response: {response.text}")
|
|
return False
|
|
|
|
except Exception as e:
|
|
print(f"❌ Basic link analysis error: {e}")
|
|
return False
|
|
|
|
# Test 2: With configuration
|
|
print("\n🔍 Testing link analysis with configuration...")
|
|
test_data_with_config = {
|
|
"url": "https://httpbin.org/links/10",
|
|
"config": {
|
|
"include_internal": True,
|
|
"include_external": True,
|
|
"max_links": 50,
|
|
"timeout": 10,
|
|
"verbose": True
|
|
}
|
|
}
|
|
|
|
try:
|
|
response = requests.post(
|
|
f"{base_url}/links/analyze",
|
|
headers=headers,
|
|
json=test_data_with_config,
|
|
timeout=30
|
|
)
|
|
|
|
if response.status_code == 200:
|
|
result = response.json()
|
|
total_links = sum(len(links) for links in result.values())
|
|
print(f"✅ Link analysis with config successful ({total_links} links)")
|
|
else:
|
|
print(f"❌ Link analysis with config failed: {response.status_code}")
|
|
return False
|
|
|
|
except Exception as e:
|
|
print(f"❌ Link analysis with config error: {e}")
|
|
return False
|
|
|
|
# Test 3: Error handling
|
|
print("\n🔍 Testing error handling...")
|
|
invalid_data = {
|
|
"url": "not-a-valid-url"
|
|
}
|
|
|
|
try:
|
|
response = requests.post(
|
|
f"{base_url}/links/analyze",
|
|
headers=headers,
|
|
json=invalid_data,
|
|
timeout=30
|
|
)
|
|
|
|
if response.status_code >= 400:
|
|
print("✅ Error handling works correctly")
|
|
else:
|
|
print("⚠️ Expected error for invalid URL, but got success")
|
|
|
|
except Exception as e:
|
|
print(f"✅ Error handling caught exception: {e}")
|
|
|
|
print("\n🎉 All integration tests passed!")
|
|
return True
|
|
|
|
|
|
if __name__ == "__main__":
|
|
success = test_links_analyze_endpoint()
|
|
sys.exit(0 if success else 1) |