Enhance authentication flow by implementing JWT token retrieval and adding authorization headers to API requests

This commit is contained in:
ntohidi
2026-01-12 13:46:32 +01:00
parent f24396c23e
commit acfab80dd4

View File

@@ -7,9 +7,46 @@ adapted for the Docker API with real URLs
import requests import requests
import json import json
import time import time
from typing import Dict, Any from typing import Dict, Optional
API_BASE_URL = "http://localhost:11234" API_BASE_URL = "http://localhost:11235"
# Global token storage
_auth_token: Optional[str] = None
def get_auth_token(email: str = "test@gmail.com") -> str:
"""
Get a JWT token from the /token endpoint.
The email domain must have valid MX records.
"""
global _auth_token
if _auth_token:
return _auth_token
print(f"🔐 Requesting JWT token for {email}...")
response = requests.post(
f"{API_BASE_URL}/token",
json={"email": email}
)
if response.status_code == 200:
data = response.json()
_auth_token = data["access_token"]
print(f"✅ Token obtained successfully")
return _auth_token
else:
raise Exception(f"Failed to get token: {response.status_code} - {response.text}")
def get_auth_headers() -> Dict[str, str]:
"""Get headers with JWT Bearer token."""
token = get_auth_token()
return {
"Authorization": f"Bearer {token}",
"Content-Type": "application/json"
}
def test_all_hooks_demo(): def test_all_hooks_demo():
@@ -164,8 +201,8 @@ async def hook(page, context, html, **kwargs):
print("\nSending request with all 8 hooks...") print("\nSending request with all 8 hooks...")
start_time = time.time() start_time = time.time()
response = requests.post(f"{API_BASE_URL}/crawl", json=payload) response = requests.post(f"{API_BASE_URL}/crawl", json=payload, headers=get_auth_headers())
elapsed_time = time.time() - start_time elapsed_time = time.time() - start_time
print(f"Request completed in {elapsed_time:.2f} seconds") print(f"Request completed in {elapsed_time:.2f} seconds")
@@ -278,7 +315,7 @@ async def hook(page, context, url, **kwargs):
} }
print("\nTesting authentication with httpbin endpoints...") print("\nTesting authentication with httpbin endpoints...")
response = requests.post(f"{API_BASE_URL}/crawl", json=payload) response = requests.post(f"{API_BASE_URL}/crawl", json=payload, headers=get_auth_headers())
if response.status_code == 200: if response.status_code == 200:
data = response.json() data = response.json()
@@ -371,8 +408,8 @@ async def hook(page, context, **kwargs):
print("\nTesting performance optimization hooks...") print("\nTesting performance optimization hooks...")
start_time = time.time() start_time = time.time()
response = requests.post(f"{API_BASE_URL}/crawl", json=payload) response = requests.post(f"{API_BASE_URL}/crawl", json=payload, headers=get_auth_headers())
elapsed_time = time.time() - start_time elapsed_time = time.time() - start_time
print(f"Request completed in {elapsed_time:.2f} seconds") print(f"Request completed in {elapsed_time:.2f} seconds")
@@ -462,7 +499,7 @@ async def hook(page, context, **kwargs):
} }
print("\nTesting content extraction hooks...") print("\nTesting content extraction hooks...")
response = requests.post(f"{API_BASE_URL}/crawl", json=payload) response = requests.post(f"{API_BASE_URL}/crawl", json=payload, headers=get_auth_headers())
if response.status_code == 200: if response.status_code == 200:
data = response.json() data = response.json()
@@ -485,7 +522,16 @@ def main():
print("🔧 Crawl4AI Docker API - Comprehensive Hooks Testing") print("🔧 Crawl4AI Docker API - Comprehensive Hooks Testing")
print("Based on docs/examples/hooks_example.py") print("Based on docs/examples/hooks_example.py")
print("=" * 70) print("=" * 70)
# Get JWT token first (required when jwt_enabled=true)
try:
get_auth_token()
print("=" * 70)
except Exception as e:
print(f"❌ Failed to authenticate: {e}")
print("Make sure the server is running and jwt_enabled is configured correctly.")
return
tests = [ tests = [
("All Hooks Demo", test_all_hooks_demo), ("All Hooks Demo", test_all_hooks_demo),
("Authentication Flow", test_authentication_flow), ("Authentication Flow", test_authentication_flow),