82 lines
2.5 KiB
Python
82 lines
2.5 KiB
Python
"""Authentication and authorization dependencies for protected API routes."""
|
|
|
|
from dataclasses import dataclass
|
|
from datetime import datetime
|
|
from typing import Annotated
|
|
from uuid import UUID
|
|
|
|
from fastapi import Depends, HTTPException, status
|
|
from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer
|
|
from sqlalchemy.orm import Session
|
|
|
|
from app.db.base import get_session
|
|
from app.models.auth import UserRole
|
|
from app.services.authentication import resolve_auth_session
|
|
|
|
|
|
bearer_auth = HTTPBearer(auto_error=False)
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class AuthContext:
|
|
"""Carries authenticated identity and role details for one request."""
|
|
|
|
user_id: UUID
|
|
username: str
|
|
role: UserRole
|
|
session_id: UUID
|
|
expires_at: datetime
|
|
|
|
|
|
def _raise_unauthorized() -> None:
|
|
"""Raises a 401 challenge response for missing or invalid bearer sessions."""
|
|
|
|
raise HTTPException(
|
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
|
detail="Invalid or expired authentication session",
|
|
headers={"WWW-Authenticate": "Bearer"},
|
|
)
|
|
|
|
|
|
def get_request_auth_context(
|
|
credentials: Annotated[HTTPAuthorizationCredentials | None, Depends(bearer_auth)],
|
|
session: Annotated[Session, Depends(get_session)],
|
|
) -> AuthContext:
|
|
"""Authenticates bearer session token and returns role-bound request identity context."""
|
|
|
|
if credentials is None:
|
|
_raise_unauthorized()
|
|
|
|
token = credentials.credentials.strip()
|
|
if not token:
|
|
_raise_unauthorized()
|
|
|
|
resolved_session = resolve_auth_session(session, token=token)
|
|
if resolved_session is None or resolved_session.user is None:
|
|
_raise_unauthorized()
|
|
|
|
return AuthContext(
|
|
user_id=resolved_session.user.id,
|
|
username=resolved_session.user.username,
|
|
role=resolved_session.user.role,
|
|
session_id=resolved_session.id,
|
|
expires_at=resolved_session.expires_at,
|
|
)
|
|
|
|
|
|
def require_user_or_admin(context: Annotated[AuthContext, Depends(get_request_auth_context)]) -> AuthContext:
|
|
"""Requires any authenticated user session and returns its request identity context."""
|
|
|
|
return context
|
|
|
|
|
|
def require_admin(context: Annotated[AuthContext, Depends(get_request_auth_context)]) -> AuthContext:
|
|
"""Requires authenticated admin role and rejects standard user sessions."""
|
|
|
|
if context.role != UserRole.ADMIN:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_403_FORBIDDEN,
|
|
detail="Administrator role required",
|
|
)
|
|
return context
|