"""Authentication endpoints for credential login, session introspection, and logout.""" from fastapi import APIRouter, Depends, HTTPException, Request, status from sqlalchemy.orm import Session from app.api.auth import AuthContext, require_user_or_admin from app.db.base import get_session from app.schemas.auth import ( AuthLoginRequest, AuthLoginResponse, AuthLogoutResponse, AuthSessionResponse, AuthUserResponse, ) from app.services.authentication import authenticate_user, issue_user_session, revoke_auth_session router = APIRouter(prefix="/auth", tags=["auth"]) def _request_ip_address(request: Request) -> str | None: """Returns best-effort client IP extracted from the request transport context.""" return request.client.host if request.client is not None else None def _request_user_agent(request: Request) -> str | None: """Returns best-effort user-agent metadata for created auth sessions.""" user_agent = request.headers.get("user-agent", "").strip() return user_agent[:512] if user_agent else None @router.post("/login", response_model=AuthLoginResponse) def login( payload: AuthLoginRequest, request: Request, session: Session = Depends(get_session), ) -> AuthLoginResponse: """Authenticates username and password and returns an issued bearer session token.""" user = authenticate_user( session, username=payload.username, password=payload.password, ) if user is None: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid username or password", ) issued_session = issue_user_session( session, user=user, user_agent=_request_user_agent(request), ip_address=_request_ip_address(request), ) session.commit() return AuthLoginResponse( access_token=issued_session.token, expires_at=issued_session.expires_at, user=AuthUserResponse.model_validate(user), ) @router.get("/me", response_model=AuthSessionResponse) def me(context: AuthContext = Depends(require_user_or_admin)) -> AuthSessionResponse: """Returns current authenticated session identity and expiration metadata.""" return AuthSessionResponse( expires_at=context.expires_at, user=AuthUserResponse( id=context.user_id, username=context.username, role=context.role, ), ) @router.post("/logout", response_model=AuthLogoutResponse) def logout( context: AuthContext = Depends(require_user_or_admin), session: Session = Depends(get_session), ) -> AuthLogoutResponse: """Revokes current bearer session token and confirms logout state.""" revoked = revoke_auth_session( session, session_id=context.session_id, ) if revoked: session.commit() return AuthLogoutResponse(revoked=revoked)