"""Authentication and authorization dependencies for protected API routes.""" from dataclasses import dataclass from datetime import datetime from typing import Annotated from uuid import UUID import hmac from fastapi import Depends, HTTPException, Request, status from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer from sqlalchemy.orm import Session from app.db.base import get_session from app.models.auth import UserRole from app.services.authentication import resolve_auth_session try: from fastapi import Cookie, Header except (ImportError, AttributeError): def Cookie(_default=None, **_kwargs): # type: ignore[no-untyped-def] """Compatibility fallback for environments that stub fastapi without request params.""" return None def Header(_default=None, **_kwargs): # type: ignore[no-untyped-def] """Compatibility fallback for environments that stub fastapi without request params.""" return None bearer_auth = HTTPBearer(auto_error=False) SESSION_COOKIE_NAME = "dcm_session" CSRF_COOKIE_NAME = "dcm_csrf" CSRF_HEADER_NAME = "x-csrf-token" CSRF_PROTECTED_METHODS = frozenset({"POST", "PATCH", "PUT", "DELETE"}) @dataclass(frozen=True) class AuthContext: """Carries authenticated identity and role details for one request.""" user_id: UUID username: str role: UserRole session_id: UUID expires_at: datetime def _requires_csrf_validation(method: str) -> bool: """Returns whether an HTTP method should be protected by cookie CSRF validation.""" return method.upper() in CSRF_PROTECTED_METHODS def _raise_unauthorized() -> None: """Raises a 401 challenge response for missing or invalid auth sessions.""" raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid or expired authentication session", headers={"WWW-Authenticate": "Bearer"}, ) def _raise_csrf_rejected() -> None: """Raises a forbidden response for CSRF validation failure.""" raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Invalid CSRF token", ) def get_request_auth_context( request: Request, credentials: Annotated[HTTPAuthorizationCredentials | None, Depends(bearer_auth)], csrf_header: Annotated[str | None, Header(default=None, alias=CSRF_HEADER_NAME)], csrf_cookie: Annotated[str | None, Cookie(default=None, alias=CSRF_COOKIE_NAME)], session_cookie: Annotated[str | None, Cookie(default=None, alias=SESSION_COOKIE_NAME)], session: Annotated[Session, Depends(get_session)], ) -> AuthContext: """Authenticates auth session token and validates CSRF for cookie sessions.""" token = credentials.credentials.strip() if credentials is not None and credentials.credentials else "" using_cookie_session = False if not token: token = (session_cookie or "").strip() using_cookie_session = True if not token: _raise_unauthorized() if _requires_csrf_validation(request.method) and using_cookie_session: normalized_csrf_header = (csrf_header or "").strip() normalized_csrf_cookie = (csrf_cookie or "").strip() if ( not normalized_csrf_cookie or not normalized_csrf_header or not hmac.compare_digest(normalized_csrf_cookie, normalized_csrf_header) ): _raise_csrf_rejected() resolved_session = resolve_auth_session(session, token=token) if resolved_session is None or resolved_session.user is None: _raise_unauthorized() return AuthContext( user_id=resolved_session.user.id, username=resolved_session.user.username, role=resolved_session.user.role, session_id=resolved_session.id, expires_at=resolved_session.expires_at, ) def require_user_or_admin(context: Annotated[AuthContext, Depends(get_request_auth_context)]) -> AuthContext: """Requires any authenticated user session and returns its request identity context.""" return context def require_admin(context: Annotated[AuthContext, Depends(get_request_auth_context)]) -> AuthContext: """Requires authenticated admin role and rejects standard user sessions.""" if context.role != UserRole.ADMIN: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Administrator role required", ) return context