"""Authentication endpoints for credential login, session introspection, and logout.""" import logging from fastapi import APIRouter, Depends, HTTPException, Request, status from sqlalchemy.orm import Session from app.api.auth import AuthContext, require_user_or_admin from app.db.base import get_session from app.schemas.auth import ( AuthLoginRequest, AuthLoginResponse, AuthLogoutResponse, AuthSessionResponse, AuthUserResponse, ) from app.services.auth_login_throttle import ( check_login_throttle, clear_login_throttle, record_failed_login_attempt, ) from app.services.authentication import authenticate_user, issue_user_session, revoke_auth_session router = APIRouter(prefix="/auth", tags=["auth"]) logger = logging.getLogger(__name__) LOGIN_THROTTLED_DETAIL = "Too many login attempts. Try again later." LOGIN_RATE_LIMITER_UNAVAILABLE_DETAIL = "Login rate limiter backend unavailable" def _request_ip_address(request: Request) -> str | None: """Returns best-effort client IP extracted from the request transport context.""" return request.client.host if request.client is not None else None def _request_user_agent(request: Request) -> str | None: """Returns best-effort user-agent metadata for created auth sessions.""" user_agent = request.headers.get("user-agent", "").strip() return user_agent[:512] if user_agent else None def _retry_after_headers(retry_after_seconds: int) -> dict[str, str]: """Returns a bounded Retry-After header payload for throttled authentication responses.""" return {"Retry-After": str(max(1, int(retry_after_seconds)))} @router.post("/login", response_model=AuthLoginResponse) def login( payload: AuthLoginRequest, request: Request, session: Session = Depends(get_session), ) -> AuthLoginResponse: """Authenticates credentials with throttle protection and returns an issued bearer session token.""" ip_address = _request_ip_address(request) try: throttle_status = check_login_throttle( username=payload.username, ip_address=ip_address, ) except RuntimeError as error: raise HTTPException( status_code=status.HTTP_503_SERVICE_UNAVAILABLE, detail=LOGIN_RATE_LIMITER_UNAVAILABLE_DETAIL, ) from error if throttle_status.is_throttled: raise HTTPException( status_code=status.HTTP_429_TOO_MANY_REQUESTS, detail=LOGIN_THROTTLED_DETAIL, headers=_retry_after_headers(throttle_status.retry_after_seconds), ) user = authenticate_user( session, username=payload.username, password=payload.password, ) if user is None: try: lockout_seconds = record_failed_login_attempt( username=payload.username, ip_address=ip_address, ) except RuntimeError as error: raise HTTPException( status_code=status.HTTP_503_SERVICE_UNAVAILABLE, detail=LOGIN_RATE_LIMITER_UNAVAILABLE_DETAIL, ) from error if lockout_seconds > 0: raise HTTPException( status_code=status.HTTP_429_TOO_MANY_REQUESTS, detail=LOGIN_THROTTLED_DETAIL, headers=_retry_after_headers(lockout_seconds), ) raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid username or password", ) try: clear_login_throttle( username=payload.username, ip_address=ip_address, ) except RuntimeError: logger.warning( "Failed to clear login throttle state after successful authentication: username=%s ip=%s", payload.username.strip().lower(), ip_address or "", ) issued_session = issue_user_session( session, user=user, user_agent=_request_user_agent(request), ip_address=ip_address, ) session.commit() return AuthLoginResponse( access_token=issued_session.token, expires_at=issued_session.expires_at, user=AuthUserResponse.model_validate(user), ) @router.get("/me", response_model=AuthSessionResponse) def me(context: AuthContext = Depends(require_user_or_admin)) -> AuthSessionResponse: """Returns current authenticated session identity and expiration metadata.""" return AuthSessionResponse( expires_at=context.expires_at, user=AuthUserResponse( id=context.user_id, username=context.username, role=context.role, ), ) @router.post("/logout", response_model=AuthLogoutResponse) def logout( context: AuthContext = Depends(require_user_or_admin), session: Session = Depends(get_session), ) -> AuthLogoutResponse: """Revokes current bearer session token and confirms logout state.""" revoked = revoke_auth_session( session, session_id=context.session_id, ) if revoked: session.commit() return AuthLogoutResponse(revoked=revoked)