288 lines
9.1 KiB
Python
288 lines
9.1 KiB
Python
"""Authentication endpoints for credential login, session introspection, and logout."""
|
|
|
|
import logging
|
|
import secrets
|
|
from datetime import UTC, datetime
|
|
from urllib.parse import urlparse
|
|
|
|
from fastapi import APIRouter, Depends, HTTPException, Request, status
|
|
from sqlalchemy.orm import Session
|
|
|
|
from app.api.auth import (
|
|
AuthContext,
|
|
SESSION_COOKIE_NAME,
|
|
CSRF_COOKIE_NAME,
|
|
require_user_or_admin,
|
|
)
|
|
from app.core.config import get_settings
|
|
from app.db.base import get_session
|
|
from app.schemas.auth import (
|
|
AuthLoginRequest,
|
|
AuthLoginResponse,
|
|
AuthLogoutResponse,
|
|
AuthSessionResponse,
|
|
AuthUserResponse,
|
|
)
|
|
from app.services.auth_login_throttle import (
|
|
check_login_throttle,
|
|
clear_login_throttle,
|
|
record_failed_login_attempt,
|
|
)
|
|
|
|
try:
|
|
from fastapi import Response
|
|
except (ImportError, AttributeError):
|
|
from fastapi.responses import Response
|
|
from app.services.authentication import authenticate_user, issue_user_session, revoke_auth_session
|
|
|
|
router = APIRouter(prefix="/auth", tags=["auth"])
|
|
logger = logging.getLogger(__name__)
|
|
|
|
LOGIN_THROTTLED_DETAIL = "Too many login attempts. Try again later."
|
|
LOGIN_RATE_LIMITER_UNAVAILABLE_DETAIL = "Login rate limiter backend unavailable"
|
|
|
|
|
|
def _request_ip_address(request: Request) -> str | None:
|
|
"""Returns best-effort client IP extracted from the request transport context."""
|
|
|
|
return request.client.host if request.client is not None else None
|
|
|
|
|
|
def _request_user_agent(request: Request) -> str | None:
|
|
"""Returns best-effort user-agent metadata for created auth sessions."""
|
|
|
|
user_agent = request.headers.get("user-agent", "").strip()
|
|
return user_agent[:512] if user_agent else None
|
|
|
|
|
|
def _retry_after_headers(retry_after_seconds: int) -> dict[str, str]:
|
|
"""Returns a bounded Retry-After header payload for throttled authentication responses."""
|
|
|
|
return {"Retry-After": str(max(1, int(retry_after_seconds)))}
|
|
|
|
|
|
def _is_https_request(request: Request) -> bool:
|
|
"""Returns whether the incoming request should be treated as HTTPS for cookie flags."""
|
|
|
|
forwarded_protocol = request.headers.get("x-forwarded-proto", "").strip().lower().split(",")[0]
|
|
if forwarded_protocol:
|
|
return forwarded_protocol == "https"
|
|
request_url = getattr(request, "url", None)
|
|
request_scheme = str(getattr(request_url, "scheme", "")).lower() if request_url is not None else ""
|
|
if request_scheme == "https":
|
|
return True
|
|
|
|
parsed_public_base_url = urlparse(get_settings().public_base_url.strip())
|
|
return parsed_public_base_url.scheme.lower() == "https"
|
|
|
|
|
|
def _resolve_cookie_domain() -> str | None:
|
|
"""Returns optional cookie domain override for multi-subdomain deployments."""
|
|
|
|
configured_domain = get_settings().auth_cookie_domain.strip().lower().lstrip(".")
|
|
if not configured_domain or "." not in configured_domain:
|
|
return None
|
|
return configured_domain
|
|
|
|
|
|
def _resolve_cookie_samesite(secure_cookie: bool) -> str:
|
|
"""Returns cookie SameSite mode with secure-aware defaults for browser compatibility."""
|
|
|
|
configured_mode = get_settings().auth_cookie_samesite.strip().lower()
|
|
if configured_mode in {"strict", "lax", "none"}:
|
|
return configured_mode
|
|
return "none" if secure_cookie else "lax"
|
|
|
|
|
|
def _session_cookie_ttl_seconds(expires_at: datetime) -> int:
|
|
"""Converts session expiration datetime into cookie max-age seconds."""
|
|
|
|
now = datetime.now(UTC)
|
|
ttl = int((expires_at - now).total_seconds())
|
|
return max(1, ttl)
|
|
|
|
|
|
def _set_session_cookie(response: Response, session_token: str, *, expires_at: datetime, secure: bool) -> None:
|
|
"""Stores the issued session token in a browser HttpOnly auth cookie."""
|
|
|
|
if response is None or not hasattr(response, "set_cookie"):
|
|
return
|
|
expires_seconds = _session_cookie_ttl_seconds(expires_at)
|
|
cookie_domain = _resolve_cookie_domain()
|
|
same_site_mode = _resolve_cookie_samesite(secure)
|
|
response.set_cookie(
|
|
SESSION_COOKIE_NAME,
|
|
value=session_token,
|
|
max_age=expires_seconds,
|
|
httponly=True,
|
|
secure=secure,
|
|
samesite=same_site_mode,
|
|
path="/",
|
|
domain=cookie_domain,
|
|
)
|
|
|
|
|
|
def _set_csrf_cookie(
|
|
response: Response,
|
|
csrf_token: str,
|
|
*,
|
|
expires_at: datetime,
|
|
secure: bool,
|
|
) -> None:
|
|
"""Stores an anti-CSRF token in a browser cookie for JavaScript-safe extraction."""
|
|
|
|
if response is None or not hasattr(response, "set_cookie"):
|
|
return
|
|
cookie_domain = _resolve_cookie_domain()
|
|
same_site_mode = _resolve_cookie_samesite(secure)
|
|
response.set_cookie(
|
|
CSRF_COOKIE_NAME,
|
|
value=csrf_token,
|
|
max_age=_session_cookie_ttl_seconds(expires_at),
|
|
httponly=False,
|
|
secure=secure,
|
|
samesite=same_site_mode,
|
|
path="/",
|
|
domain=cookie_domain,
|
|
)
|
|
|
|
|
|
def _clear_session_cookies(response: Response) -> None:
|
|
"""Clears auth cookies returned by login responses."""
|
|
|
|
if response is None or not hasattr(response, "delete_cookie"):
|
|
return
|
|
cookie_domain = _resolve_cookie_domain()
|
|
response.delete_cookie(SESSION_COOKIE_NAME, path="/", domain=cookie_domain)
|
|
response.delete_cookie(CSRF_COOKIE_NAME, path="/", domain=cookie_domain)
|
|
|
|
|
|
@router.post("/login", response_model=AuthLoginResponse)
|
|
def login(
|
|
payload: AuthLoginRequest,
|
|
request: Request,
|
|
response: Response,
|
|
session: Session = Depends(get_session),
|
|
) -> AuthLoginResponse:
|
|
"""Authenticates credentials with throttle protection and returns issued session metadata."""
|
|
|
|
ip_address = _request_ip_address(request)
|
|
try:
|
|
throttle_status = check_login_throttle(
|
|
username=payload.username,
|
|
ip_address=ip_address,
|
|
)
|
|
except RuntimeError as error:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
|
|
detail=LOGIN_RATE_LIMITER_UNAVAILABLE_DETAIL,
|
|
) from error
|
|
if throttle_status.is_throttled:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_429_TOO_MANY_REQUESTS,
|
|
detail=LOGIN_THROTTLED_DETAIL,
|
|
headers=_retry_after_headers(throttle_status.retry_after_seconds),
|
|
)
|
|
|
|
user = authenticate_user(
|
|
session,
|
|
username=payload.username,
|
|
password=payload.password,
|
|
)
|
|
if user is None:
|
|
try:
|
|
lockout_seconds = record_failed_login_attempt(
|
|
username=payload.username,
|
|
ip_address=ip_address,
|
|
)
|
|
except RuntimeError as error:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
|
|
detail=LOGIN_RATE_LIMITER_UNAVAILABLE_DETAIL,
|
|
) from error
|
|
if lockout_seconds > 0:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_429_TOO_MANY_REQUESTS,
|
|
detail=LOGIN_THROTTLED_DETAIL,
|
|
headers=_retry_after_headers(lockout_seconds),
|
|
)
|
|
raise HTTPException(
|
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
|
detail="Invalid username or password",
|
|
)
|
|
|
|
try:
|
|
clear_login_throttle(
|
|
username=payload.username,
|
|
ip_address=ip_address,
|
|
)
|
|
except RuntimeError:
|
|
logger.warning(
|
|
"Failed to clear login throttle state after successful authentication: username=%s ip=%s",
|
|
payload.username.strip().lower(),
|
|
ip_address or "",
|
|
)
|
|
|
|
issued_session = issue_user_session(
|
|
session,
|
|
user=user,
|
|
user_agent=_request_user_agent(request),
|
|
ip_address=ip_address,
|
|
)
|
|
session.commit()
|
|
|
|
csrf_token = secrets.token_urlsafe(32)
|
|
secure_cookie = _is_https_request(request)
|
|
_set_session_cookie(
|
|
response,
|
|
issued_session.token,
|
|
expires_at=issued_session.expires_at,
|
|
secure=secure_cookie,
|
|
)
|
|
_set_csrf_cookie(
|
|
response,
|
|
csrf_token,
|
|
expires_at=issued_session.expires_at,
|
|
secure=secure_cookie,
|
|
)
|
|
|
|
return AuthLoginResponse(
|
|
user=AuthUserResponse.model_validate(user),
|
|
expires_at=issued_session.expires_at,
|
|
access_token=issued_session.token,
|
|
csrf_token=csrf_token,
|
|
)
|
|
|
|
|
|
@router.get("/me", response_model=AuthSessionResponse)
|
|
def me(context: AuthContext = Depends(require_user_or_admin)) -> AuthSessionResponse:
|
|
"""Returns current authenticated session identity and expiration metadata."""
|
|
|
|
return AuthSessionResponse(
|
|
expires_at=context.expires_at,
|
|
user=AuthUserResponse(
|
|
id=context.user_id,
|
|
username=context.username,
|
|
role=context.role,
|
|
),
|
|
)
|
|
|
|
|
|
@router.post("/logout", response_model=AuthLogoutResponse)
|
|
def logout(
|
|
response: Response,
|
|
context: AuthContext = Depends(require_user_or_admin),
|
|
session: Session = Depends(get_session),
|
|
) -> AuthLogoutResponse:
|
|
"""Revokes current session token and clears client auth cookies."""
|
|
|
|
revoked = revoke_auth_session(
|
|
session,
|
|
session_id=context.session_id,
|
|
)
|
|
if revoked:
|
|
session.commit()
|
|
|
|
_clear_session_cookies(response)
|
|
return AuthLogoutResponse(revoked=revoked)
|