96 lines
3.0 KiB
Python
96 lines
3.0 KiB
Python
"""Token-based authentication and authorization dependencies for privileged API routes."""
|
|
|
|
import hmac
|
|
from typing import Annotated
|
|
|
|
from fastapi import Depends, HTTPException, status
|
|
from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer
|
|
|
|
from app.core.config import Settings, get_settings
|
|
|
|
|
|
bearer_auth = HTTPBearer(auto_error=False)
|
|
|
|
|
|
class AuthRole:
|
|
"""Declares supported authorization roles for privileged API operations."""
|
|
|
|
ADMIN = "admin"
|
|
USER = "user"
|
|
|
|
|
|
def _raise_unauthorized() -> None:
|
|
"""Raises an HTTP 401 response with bearer authentication challenge headers."""
|
|
|
|
raise HTTPException(
|
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
|
detail="Invalid or missing API token",
|
|
headers={"WWW-Authenticate": "Bearer"},
|
|
)
|
|
|
|
|
|
def _configured_admin_token(settings: Settings) -> str:
|
|
"""Returns required admin token or raises configuration error when unset."""
|
|
|
|
token = settings.admin_api_token.strip()
|
|
if token:
|
|
return token
|
|
raise HTTPException(
|
|
status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
|
|
detail="Admin API token is not configured",
|
|
)
|
|
|
|
|
|
def _resolve_token_role(token: str, settings: Settings) -> str:
|
|
"""Resolves role from a bearer token using constant-time comparisons."""
|
|
|
|
admin_token = _configured_admin_token(settings)
|
|
if hmac.compare_digest(token, admin_token):
|
|
return AuthRole.ADMIN
|
|
|
|
user_token = settings.user_api_token.strip()
|
|
if user_token and hmac.compare_digest(token, user_token):
|
|
return AuthRole.USER
|
|
|
|
_raise_unauthorized()
|
|
|
|
|
|
def get_request_role(
|
|
credentials: Annotated[HTTPAuthorizationCredentials | None, Depends(bearer_auth)],
|
|
settings: Annotated[Settings, Depends(get_settings)],
|
|
) -> str:
|
|
"""Authenticates request token and returns its authorization role.
|
|
|
|
Development environments can optionally allow tokenless user access for non-admin routes to
|
|
preserve local workflow compatibility while production remains token-enforced.
|
|
"""
|
|
|
|
if credentials is None:
|
|
if settings.allow_development_anonymous_user_access and settings.app_env.strip().lower() in {"development", "dev"}:
|
|
return AuthRole.USER
|
|
_raise_unauthorized()
|
|
|
|
token = credentials.credentials.strip()
|
|
if not token:
|
|
if settings.allow_development_anonymous_user_access and settings.app_env.strip().lower() in {"development", "dev"}:
|
|
return AuthRole.USER
|
|
_raise_unauthorized()
|
|
return _resolve_token_role(token=token, settings=settings)
|
|
|
|
|
|
def require_user_or_admin(role: Annotated[str, Depends(get_request_role)]) -> str:
|
|
"""Requires a valid user or admin token and returns resolved role."""
|
|
|
|
return role
|
|
|
|
|
|
def require_admin(role: Annotated[str, Depends(get_request_role)]) -> str:
|
|
"""Requires admin role and rejects requests authenticated as regular users."""
|
|
|
|
if role != AuthRole.ADMIN:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_403_FORBIDDEN,
|
|
detail="Admin token required",
|
|
)
|
|
return role
|