|
|
|
|
@@ -28,25 +28,43 @@ def create_access_token(data: dict, expires_delta: Optional[timedelta] = None) -
|
|
|
|
|
signing_key = get_jwk_from_secret(SECRET_KEY)
|
|
|
|
|
return instance.encode(to_encode, signing_key, alg='HS256')
|
|
|
|
|
|
|
|
|
|
def verify_token(credentials: HTTPAuthorizationCredentials = Depends(security)) -> Dict:
|
|
|
|
|
def verify_token(credentials: HTTPAuthorizationCredentials) -> Dict:
|
|
|
|
|
"""Verify the JWT token from the Authorization header."""
|
|
|
|
|
|
|
|
|
|
if credentials is None:
|
|
|
|
|
return None
|
|
|
|
|
|
|
|
|
|
if not credentials or not credentials.credentials:
|
|
|
|
|
raise HTTPException(
|
|
|
|
|
status_code=401,
|
|
|
|
|
detail="No token provided",
|
|
|
|
|
headers={"WWW-Authenticate": "Bearer"}
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
token = credentials.credentials
|
|
|
|
|
verifying_key = get_jwk_from_secret(SECRET_KEY)
|
|
|
|
|
try:
|
|
|
|
|
payload = instance.decode(token, verifying_key, do_time_check=True, algorithms='HS256')
|
|
|
|
|
return payload
|
|
|
|
|
except Exception:
|
|
|
|
|
raise HTTPException(status_code=401, detail="Invalid or expired token")
|
|
|
|
|
except Exception as e:
|
|
|
|
|
raise HTTPException(
|
|
|
|
|
status_code=401,
|
|
|
|
|
detail=f"Invalid or expired token: {str(e)}",
|
|
|
|
|
headers={"WWW-Authenticate": "Bearer"}
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def get_token_dependency(config: Dict):
|
|
|
|
|
"""Return the token dependency if JWT is enabled, else a function that returns None."""
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if config.get("security", {}).get("jwt_enabled", False):
|
|
|
|
|
return verify_token
|
|
|
|
|
def jwt_required(credentials: HTTPAuthorizationCredentials = Depends(security)) -> Dict:
|
|
|
|
|
"""Enforce JWT authentication when enabled."""
|
|
|
|
|
if credentials is None:
|
|
|
|
|
raise HTTPException(
|
|
|
|
|
status_code=401,
|
|
|
|
|
detail="Authentication required. Please provide a valid Bearer token.",
|
|
|
|
|
headers={"WWW-Authenticate": "Bearer"}
|
|
|
|
|
)
|
|
|
|
|
return verify_token(credentials)
|
|
|
|
|
return jwt_required
|
|
|
|
|
else:
|
|
|
|
|
return lambda: None
|
|
|
|
|
|
|
|
|
|
|