Files
ledgerdock/backend/app/api/auth.py

82 lines
2.5 KiB
Python

"""Authentication and authorization dependencies for protected API routes."""
from dataclasses import dataclass
from datetime import datetime
from typing import Annotated
from uuid import UUID
from fastapi import Depends, HTTPException, status
from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer
from sqlalchemy.orm import Session
from app.db.base import get_session
from app.models.auth import UserRole
from app.services.authentication import resolve_auth_session
bearer_auth = HTTPBearer(auto_error=False)
@dataclass(frozen=True)
class AuthContext:
"""Carries authenticated identity and role details for one request."""
user_id: UUID
username: str
role: UserRole
session_id: UUID
expires_at: datetime
def _raise_unauthorized() -> None:
"""Raises a 401 challenge response for missing or invalid bearer sessions."""
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid or expired authentication session",
headers={"WWW-Authenticate": "Bearer"},
)
def get_request_auth_context(
credentials: Annotated[HTTPAuthorizationCredentials | None, Depends(bearer_auth)],
session: Annotated[Session, Depends(get_session)],
) -> AuthContext:
"""Authenticates bearer session token and returns role-bound request identity context."""
if credentials is None:
_raise_unauthorized()
token = credentials.credentials.strip()
if not token:
_raise_unauthorized()
resolved_session = resolve_auth_session(session, token=token)
if resolved_session is None or resolved_session.user is None:
_raise_unauthorized()
return AuthContext(
user_id=resolved_session.user.id,
username=resolved_session.user.username,
role=resolved_session.user.role,
session_id=resolved_session.id,
expires_at=resolved_session.expires_at,
)
def require_user_or_admin(context: Annotated[AuthContext, Depends(get_request_auth_context)]) -> AuthContext:
"""Requires any authenticated user session and returns its request identity context."""
return context
def require_admin(context: Annotated[AuthContext, Depends(get_request_auth_context)]) -> AuthContext:
"""Requires authenticated admin role and rejects standard user sessions."""
if context.role != UserRole.ADMIN:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Administrator role required",
)
return context