Files
ledgerdock/backend/app/api/routes_auth.py

350 lines
11 KiB
Python

"""Authentication endpoints for credential login, session introspection, and logout."""
import logging
import secrets
from datetime import UTC, datetime
from urllib.parse import urlparse
from fastapi import APIRouter, Depends, HTTPException, Request, status
from sqlalchemy.orm import Session
from app.api.auth import (
AuthContext,
SESSION_COOKIE_NAME,
CSRF_COOKIE_NAME,
require_user_or_admin,
)
from app.core.config import get_settings
from app.db.base import get_session
from app.schemas.auth import (
AuthLoginRequest,
AuthLoginResponse,
AuthLogoutResponse,
AuthSessionResponse,
AuthUserResponse,
)
from app.services.auth_login_throttle import (
check_login_throttle,
clear_login_throttle,
record_failed_login_attempt,
)
try:
from fastapi import Cookie, Response
except (ImportError, AttributeError):
from fastapi.responses import Response
def Cookie(_default=None, **_kwargs): # type: ignore[no-untyped-def]
"""Compatibility fallback for environments that stub fastapi without request params."""
return None
from app.services.authentication import authenticate_user, issue_user_session, revoke_auth_session
router = APIRouter(prefix="/auth", tags=["auth"])
logger = logging.getLogger(__name__)
LOGIN_THROTTLED_DETAIL = "Too many login attempts. Try again later."
LOGIN_RATE_LIMITER_UNAVAILABLE_DETAIL = "Login rate limiter backend unavailable"
def _request_ip_address(request: Request) -> str | None:
"""Returns best-effort client IP extracted from the request transport context."""
return request.client.host if request.client is not None else None
def _request_user_agent(request: Request) -> str | None:
"""Returns best-effort user-agent metadata for created auth sessions."""
user_agent = request.headers.get("user-agent", "").strip()
return user_agent[:512] if user_agent else None
def _retry_after_headers(retry_after_seconds: int) -> dict[str, str]:
"""Returns a bounded Retry-After header payload for throttled authentication responses."""
return {"Retry-After": str(max(1, int(retry_after_seconds)))}
def _is_https_request(request: Request) -> bool:
"""Returns whether the incoming request should be treated as HTTPS for cookie flags."""
forwarded_protocol = request.headers.get("x-forwarded-proto", "").strip().lower().split(",")[0]
if forwarded_protocol:
return forwarded_protocol == "https"
request_url = getattr(request, "url", None)
request_scheme = str(getattr(request_url, "scheme", "")).lower() if request_url is not None else ""
if request_scheme == "https":
return True
parsed_public_base_url = urlparse(get_settings().public_base_url.strip())
return parsed_public_base_url.scheme.lower() == "https"
def _resolve_cookie_domain() -> str | None:
"""Returns optional cookie domain override for multi-subdomain deployments."""
configured_domain = get_settings().auth_cookie_domain.strip().lower().lstrip(".")
if not configured_domain or "." not in configured_domain:
return None
return configured_domain
def _resolve_cookie_domains() -> tuple[str | None, ...]:
"""Returns cookie domain variants with a host-only cookie first for browser compatibility."""
configured_domain = _resolve_cookie_domain()
if configured_domain is None:
return (None,)
return (None, configured_domain)
def _request_matches_cookie_domain(request: Request) -> bool:
"""Returns whether request and origin hosts both sit under the configured cookie domain."""
configured_domain = _resolve_cookie_domain()
if configured_domain is None:
return False
origin_header = request.headers.get("origin", "").strip()
origin_host = urlparse(origin_header).hostname.strip().lower() if origin_header else ""
if not origin_host:
return False
request_url = getattr(request, "url", None)
request_host = str(getattr(request_url, "hostname", "")).strip().lower() if request_url is not None else ""
if not request_host:
parsed_public_base_url = urlparse(get_settings().public_base_url.strip())
request_host = parsed_public_base_url.hostname.strip().lower() if parsed_public_base_url.hostname else ""
if not request_host:
return False
def _matches(candidate: str) -> bool:
return candidate == configured_domain or candidate.endswith(f".{configured_domain}")
return _matches(origin_host) and _matches(request_host)
def _resolve_cookie_samesite(request: Request, secure_cookie: bool) -> str:
"""Returns cookie SameSite mode with same-site subdomain compatibility defaults."""
configured_mode = get_settings().auth_cookie_samesite.strip().lower()
if configured_mode in {"strict", "lax"}:
return configured_mode
if configured_mode == "none":
return "lax" if _request_matches_cookie_domain(request) else "none"
return "none" if secure_cookie else "lax"
def _session_cookie_ttl_seconds(expires_at: datetime) -> int:
"""Converts session expiration datetime into cookie max-age seconds."""
now = datetime.now(UTC)
ttl = int((expires_at - now).total_seconds())
return max(1, ttl)
def _set_session_cookie(
response: Response,
session_token: str,
*,
request: Request,
expires_at: datetime,
secure: bool,
) -> None:
"""Stores the issued session token in a browser HttpOnly auth cookie."""
if response is None or not hasattr(response, "set_cookie"):
return
expires_seconds = _session_cookie_ttl_seconds(expires_at)
same_site_mode = _resolve_cookie_samesite(request, secure)
for cookie_domain in _resolve_cookie_domains():
cookie_kwargs = {
"value": session_token,
"max_age": expires_seconds,
"httponly": True,
"secure": secure,
"samesite": same_site_mode,
"path": "/",
}
if cookie_domain is not None:
cookie_kwargs["domain"] = cookie_domain
response.set_cookie(SESSION_COOKIE_NAME, **cookie_kwargs)
def _set_csrf_cookie(
response: Response,
csrf_token: str,
*,
request: Request,
expires_at: datetime,
secure: bool,
) -> None:
"""Stores an anti-CSRF token in a browser cookie for JavaScript-safe extraction."""
if response is None or not hasattr(response, "set_cookie"):
return
same_site_mode = _resolve_cookie_samesite(request, secure)
for cookie_domain in _resolve_cookie_domains():
cookie_kwargs = {
"value": csrf_token,
"max_age": _session_cookie_ttl_seconds(expires_at),
"httponly": False,
"secure": secure,
"samesite": same_site_mode,
"path": "/",
}
if cookie_domain is not None:
cookie_kwargs["domain"] = cookie_domain
response.set_cookie(CSRF_COOKIE_NAME, **cookie_kwargs)
def _clear_session_cookies(response: Response) -> None:
"""Clears auth cookies returned by login responses."""
if response is None or not hasattr(response, "delete_cookie"):
return
for cookie_domain in _resolve_cookie_domains():
delete_kwargs = {"path": "/"}
if cookie_domain is not None:
delete_kwargs["domain"] = cookie_domain
response.delete_cookie(SESSION_COOKIE_NAME, **delete_kwargs)
response.delete_cookie(CSRF_COOKIE_NAME, **delete_kwargs)
@router.post("/login", response_model=AuthLoginResponse)
def login(
payload: AuthLoginRequest,
request: Request,
response: Response,
session: Session = Depends(get_session),
) -> AuthLoginResponse:
"""Authenticates credentials with throttle protection and returns issued session metadata."""
ip_address = _request_ip_address(request)
try:
throttle_status = check_login_throttle(
username=payload.username,
ip_address=ip_address,
)
except RuntimeError as error:
raise HTTPException(
status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
detail=LOGIN_RATE_LIMITER_UNAVAILABLE_DETAIL,
) from error
if throttle_status.is_throttled:
raise HTTPException(
status_code=status.HTTP_429_TOO_MANY_REQUESTS,
detail=LOGIN_THROTTLED_DETAIL,
headers=_retry_after_headers(throttle_status.retry_after_seconds),
)
user = authenticate_user(
session,
username=payload.username,
password=payload.password,
)
if user is None:
try:
lockout_seconds = record_failed_login_attempt(
username=payload.username,
ip_address=ip_address,
)
except RuntimeError as error:
raise HTTPException(
status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
detail=LOGIN_RATE_LIMITER_UNAVAILABLE_DETAIL,
) from error
if lockout_seconds > 0:
raise HTTPException(
status_code=status.HTTP_429_TOO_MANY_REQUESTS,
detail=LOGIN_THROTTLED_DETAIL,
headers=_retry_after_headers(lockout_seconds),
)
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid username or password",
)
try:
clear_login_throttle(
username=payload.username,
ip_address=ip_address,
)
except RuntimeError:
logger.warning(
"Failed to clear login throttle state after successful authentication: username=%s ip=%s",
payload.username.strip().lower(),
ip_address or "",
)
issued_session = issue_user_session(
session,
user=user,
user_agent=_request_user_agent(request),
ip_address=ip_address,
)
session.commit()
csrf_token = secrets.token_urlsafe(32)
secure_cookie = _is_https_request(request)
_set_session_cookie(
response,
issued_session.token,
request=request,
expires_at=issued_session.expires_at,
secure=secure_cookie,
)
_set_csrf_cookie(
response,
csrf_token,
request=request,
expires_at=issued_session.expires_at,
secure=secure_cookie,
)
return AuthLoginResponse(
user=AuthUserResponse.model_validate(user),
expires_at=issued_session.expires_at,
access_token=issued_session.token,
csrf_token=csrf_token,
)
@router.get("/me", response_model=AuthSessionResponse)
def me(
context: AuthContext = Depends(require_user_or_admin),
csrf_cookie: str | None = Cookie(None, alias=CSRF_COOKIE_NAME),
) -> AuthSessionResponse:
"""Returns current authenticated session identity and expiration metadata."""
normalized_csrf_cookie = (csrf_cookie or "").strip() or None
return AuthSessionResponse(
expires_at=context.expires_at,
user=AuthUserResponse(
id=context.user_id,
username=context.username,
role=context.role,
),
csrf_token=normalized_csrf_cookie,
)
@router.post("/logout", response_model=AuthLogoutResponse)
def logout(
response: Response,
context: AuthContext = Depends(require_user_or_admin),
session: Session = Depends(get_session),
) -> AuthLogoutResponse:
"""Revokes current session token and clears client auth cookies."""
revoked = revoke_auth_session(
session,
session_id=context.session_id,
)
if revoked:
session.commit()
_clear_session_cookies(response)
return AuthLogoutResponse(revoked=revoked)