Files
ledgerdock/backend/app/services/authentication.py

290 lines
9.7 KiB
Python

"""Authentication services for user credential validation and session issuance."""
import base64
import binascii
from dataclasses import dataclass
from datetime import UTC, datetime, timedelta
import hashlib
import hmac
import secrets
import uuid
from sqlalchemy import delete, select
from sqlalchemy.orm import Session
from app.core.config import Settings, get_settings
from app.db.base import SessionLocal
from app.models.auth import AppUser, AuthSession, UserRole
PASSWORD_HASH_SCHEME = "pbkdf2_sha256"
DEFAULT_AUTH_FALLBACK_SECRET = "dcm-session-secret"
@dataclass(frozen=True)
class IssuedSession:
"""Represents one newly issued bearer session token and expiration timestamp."""
token: str
expires_at: datetime
def normalize_username(username: str) -> str:
"""Normalizes usernames to a stable lowercase identity key."""
return username.strip().lower()
def _urlsafe_b64encode_no_padding(data: bytes) -> str:
"""Encodes bytes to compact URL-safe base64 without padding."""
return base64.urlsafe_b64encode(data).decode("ascii").rstrip("=")
def _urlsafe_b64decode_no_padding(data: str) -> bytes:
"""Decodes URL-safe base64 values that may omit trailing padding characters."""
padded = data + "=" * (-len(data) % 4)
return base64.urlsafe_b64decode(padded.encode("ascii"))
def _password_iterations(settings: Settings) -> int:
"""Returns PBKDF2 iteration count clamped to a secure operational range."""
return max(200_000, min(1_200_000, int(settings.auth_password_pbkdf2_iterations)))
def hash_password(password: str, settings: Settings | None = None) -> str:
"""Derives and formats a PBKDF2-SHA256 password hash for persisted user credentials."""
resolved_settings = settings or get_settings()
normalized_password = password.strip()
if not normalized_password:
raise ValueError("Password must not be empty")
iterations = _password_iterations(resolved_settings)
salt = secrets.token_bytes(16)
derived = hashlib.pbkdf2_hmac(
"sha256",
normalized_password.encode("utf-8"),
salt,
iterations,
dklen=32,
)
return (
f"{PASSWORD_HASH_SCHEME}$"
f"{iterations}$"
f"{_urlsafe_b64encode_no_padding(salt)}$"
f"{_urlsafe_b64encode_no_padding(derived)}"
)
def verify_password(password: str, stored_hash: str, settings: Settings | None = None) -> bool:
"""Verifies one plaintext password against persisted PBKDF2-SHA256 hash material."""
resolved_settings = settings or get_settings()
normalized_password = password.strip()
if not normalized_password:
return False
parts = stored_hash.strip().split("$")
if len(parts) != 4:
return False
scheme, iterations_text, salt_text, digest_text = parts
if scheme != PASSWORD_HASH_SCHEME:
return False
try:
iterations = int(iterations_text)
except ValueError:
return False
if iterations < 200_000 or iterations > 2_000_000:
return False
try:
salt = _urlsafe_b64decode_no_padding(salt_text)
expected_digest = _urlsafe_b64decode_no_padding(digest_text)
except (binascii.Error, ValueError):
return False
derived_digest = hashlib.pbkdf2_hmac(
"sha256",
normalized_password.encode("utf-8"),
salt,
iterations,
dklen=len(expected_digest),
)
if not hmac.compare_digest(expected_digest, derived_digest):
return False
return iterations >= _password_iterations(resolved_settings)
def _auth_session_secret(settings: Settings) -> bytes:
"""Resolves a stable secret used to hash issued bearer session tokens."""
candidate = settings.auth_session_pepper.strip() or settings.app_settings_encryption_key.strip()
if not candidate:
candidate = DEFAULT_AUTH_FALLBACK_SECRET
return hashlib.sha256(candidate.encode("utf-8")).digest()
def _hash_session_token(token: str, settings: Settings | None = None) -> str:
"""Derives a deterministic SHA256 token hash guarded by secret pepper material."""
resolved_settings = settings or get_settings()
secret = _auth_session_secret(resolved_settings)
digest = hmac.new(secret, token.encode("utf-8"), hashlib.sha256).hexdigest()
return digest
def _new_session_token(settings: Settings) -> str:
"""Creates a random URL-safe bearer token for one API session."""
token_bytes = max(24, min(128, int(settings.auth_session_token_bytes)))
return secrets.token_urlsafe(token_bytes)
def _resolve_optional_user_credentials(username: str, password: str) -> tuple[str, str] | None:
"""Returns optional user credentials only when both username and password are configured."""
normalized_username = normalize_username(username)
normalized_password = password.strip()
if not normalized_username and not normalized_password:
return None
if not normalized_username or not normalized_password:
raise ValueError("Optional bootstrap user requires both username and password")
return normalized_username, normalized_password
def _upsert_bootstrap_user(session: Session, *, username: str, password: str, role: UserRole) -> AppUser:
"""Creates or updates one bootstrap account with deterministic role assignment."""
existing = session.execute(select(AppUser).where(AppUser.username == username)).scalar_one_or_none()
password_hash = hash_password(password)
if existing is None:
user = AppUser(
username=username,
password_hash=password_hash,
role=role,
is_active=True,
)
session.add(user)
return user
existing.password_hash = password_hash
existing.role = role
existing.is_active = True
return existing
def ensure_bootstrap_users() -> None:
"""Creates or refreshes bootstrap user accounts from runtime environment credentials."""
settings = get_settings()
admin_username = normalize_username(settings.auth_bootstrap_admin_username)
admin_password = settings.auth_bootstrap_admin_password.strip()
if not admin_username:
raise RuntimeError("AUTH_BOOTSTRAP_ADMIN_USERNAME must not be empty")
if not admin_password:
raise RuntimeError("AUTH_BOOTSTRAP_ADMIN_PASSWORD must not be empty")
optional_user_credentials = _resolve_optional_user_credentials(
username=settings.auth_bootstrap_user_username,
password=settings.auth_bootstrap_user_password,
)
with SessionLocal() as session:
_upsert_bootstrap_user(
session,
username=admin_username,
password=admin_password,
role=UserRole.ADMIN,
)
if optional_user_credentials is not None:
user_username, user_password = optional_user_credentials
if user_username == admin_username:
raise RuntimeError("AUTH_BOOTSTRAP_USER_USERNAME must differ from admin username")
_upsert_bootstrap_user(
session,
username=user_username,
password=user_password,
role=UserRole.USER,
)
session.commit()
def authenticate_user(session: Session, *, username: str, password: str) -> AppUser | None:
"""Authenticates one username/password pair and returns active account on success."""
normalized_username = normalize_username(username)
if not normalized_username:
return None
user = session.execute(select(AppUser).where(AppUser.username == normalized_username)).scalar_one_or_none()
if user is None or not user.is_active:
return None
if not verify_password(password, user.password_hash):
return None
return user
def issue_user_session(
session: Session,
*,
user: AppUser,
user_agent: str | None = None,
ip_address: str | None = None,
) -> IssuedSession:
"""Issues one new bearer token session for a validated user account."""
settings = get_settings()
now = datetime.now(UTC)
ttl_minutes = max(5, min(7 * 24 * 60, int(settings.auth_session_ttl_minutes)))
expires_at = now + timedelta(minutes=ttl_minutes)
token = _new_session_token(settings)
token_hash = _hash_session_token(token, settings)
session.execute(
delete(AuthSession).where(
AuthSession.user_id == user.id,
AuthSession.expires_at <= now,
)
)
session_entry = AuthSession(
user_id=user.id,
token_hash=token_hash,
expires_at=expires_at,
user_agent=(user_agent or "").strip()[:512] or None,
ip_address=(ip_address or "").strip()[:64] or None,
)
session.add(session_entry)
return IssuedSession(token=token, expires_at=expires_at)
def resolve_auth_session(session: Session, *, token: str) -> AuthSession | None:
"""Resolves one non-revoked and non-expired session from a bearer token value."""
normalized = token.strip()
if not normalized:
return None
token_hash = _hash_session_token(normalized)
now = datetime.now(UTC)
session_entry = session.execute(
select(AuthSession).where(
AuthSession.token_hash == token_hash,
AuthSession.revoked_at.is_(None),
AuthSession.expires_at > now,
)
).scalar_one_or_none()
if session_entry is None or session_entry.user is None:
return None
if not session_entry.user.is_active:
return None
return session_entry
def revoke_auth_session(session: Session, *, session_id: uuid.UUID) -> bool:
"""Revokes one active session by identifier and returns whether a change was applied."""
existing = session.execute(select(AuthSession).where(AuthSession.id == session_id)).scalar_one_or_none()
if existing is None or existing.revoked_at is not None:
return False
existing.revoked_at = datetime.now(UTC)
return True