"""Unit coverage for API auth, SSRF validation, and processing-log redaction controls.""" from __future__ import annotations from datetime import UTC, datetime import socket import sys from pathlib import Path from types import ModuleType, SimpleNamespace import unittest from unittest.mock import patch BACKEND_ROOT = Path(__file__).resolve().parents[1] if str(BACKEND_ROOT) not in sys.path: sys.path.insert(0, str(BACKEND_ROOT)) if "pydantic_settings" not in sys.modules: pydantic_settings_stub = ModuleType("pydantic_settings") class _BaseSettings: """Minimal BaseSettings replacement for dependency-light unit test execution.""" def __init__(self, **kwargs: object) -> None: for key, value in kwargs.items(): setattr(self, key, value) def _settings_config_dict(**kwargs: object) -> dict[str, object]: """Returns configuration values using dict semantics expected by settings module.""" return kwargs pydantic_settings_stub.BaseSettings = _BaseSettings pydantic_settings_stub.SettingsConfigDict = _settings_config_dict sys.modules["pydantic_settings"] = pydantic_settings_stub if "fastapi" not in sys.modules: fastapi_stub = ModuleType("fastapi") class _HTTPException(Exception): """Minimal HTTPException compatible with route dependency tests.""" def __init__(self, status_code: int, detail: str, headers: dict[str, str] | None = None) -> None: super().__init__(detail) self.status_code = status_code self.detail = detail self.headers = headers or {} class _Status: """Minimal status namespace for auth unit tests.""" HTTP_401_UNAUTHORIZED = 401 HTTP_403_FORBIDDEN = 403 HTTP_503_SERVICE_UNAVAILABLE = 503 def _depends(dependency): # type: ignore[no-untyped-def] """Returns provided dependency unchanged for unit testing.""" return dependency fastapi_stub.Depends = _depends fastapi_stub.HTTPException = _HTTPException fastapi_stub.status = _Status() sys.modules["fastapi"] = fastapi_stub if "fastapi.security" not in sys.modules: fastapi_security_stub = ModuleType("fastapi.security") class _HTTPAuthorizationCredentials: """Minimal bearer credential object used by auth dependency tests.""" def __init__(self, *, scheme: str, credentials: str) -> None: self.scheme = scheme self.credentials = credentials class _HTTPBearer: """Minimal HTTPBearer stand-in for dependency construction.""" def __init__(self, auto_error: bool = True) -> None: self.auto_error = auto_error fastapi_security_stub.HTTPAuthorizationCredentials = _HTTPAuthorizationCredentials fastapi_security_stub.HTTPBearer = _HTTPBearer sys.modules["fastapi.security"] = fastapi_security_stub from fastapi import HTTPException from fastapi.security import HTTPAuthorizationCredentials from app.api.auth import AuthRole, get_request_role, require_admin from app.core import config as config_module from app.models.processing_log import sanitize_processing_log_payload_value, sanitize_processing_log_text from app.schemas.processing_logs import ProcessingLogEntryResponse def _security_settings( *, allowlist: list[str] | None = None, allow_http: bool = False, allow_private_network: bool = False, ) -> SimpleNamespace: """Builds lightweight settings object for provider URL validation tests.""" return SimpleNamespace( provider_base_url_allowlist=allowlist if allowlist is not None else ["api.openai.com"], provider_base_url_allow_http=allow_http, provider_base_url_allow_private_network=allow_private_network, ) class AuthDependencyTests(unittest.TestCase): """Verifies token authentication and admin authorization behavior.""" def test_get_request_role_accepts_admin_token(self) -> None: """Admin token resolves admin role.""" settings = SimpleNamespace(admin_api_token="admin-token", user_api_token="user-token") credentials = HTTPAuthorizationCredentials(scheme="Bearer", credentials="admin-token") role = get_request_role(credentials=credentials, settings=settings) self.assertEqual(role, AuthRole.ADMIN) def test_get_request_role_rejects_missing_credentials(self) -> None: """Missing bearer credentials return 401.""" settings = SimpleNamespace(admin_api_token="admin-token", user_api_token="user-token") with self.assertRaises(HTTPException) as context: get_request_role(credentials=None, settings=settings) self.assertEqual(context.exception.status_code, 401) def test_require_admin_rejects_user_role(self) -> None: """User role cannot access admin-only endpoints.""" with self.assertRaises(HTTPException) as context: require_admin(role=AuthRole.USER) self.assertEqual(context.exception.status_code, 403) class ProviderBaseUrlValidationTests(unittest.TestCase): """Verifies allowlist, scheme, and private-network SSRF protections.""" def setUp(self) -> None: """Clears URL validation cache to keep tests independent.""" config_module._normalize_and_validate_provider_base_url_cached.cache_clear() def test_validation_accepts_allowlisted_https_url(self) -> None: """Allowlisted HTTPS URLs are normalized with /v1 suffix.""" with patch.object(config_module, "get_settings", return_value=_security_settings(allowlist=["api.openai.com"])): normalized = config_module.normalize_and_validate_provider_base_url("https://api.openai.com") self.assertEqual(normalized, "https://api.openai.com/v1") def test_validation_rejects_non_allowlisted_host(self) -> None: """Hosts outside configured allowlist are rejected.""" with patch.object(config_module, "get_settings", return_value=_security_settings(allowlist=["api.openai.com"])): with self.assertRaises(ValueError): config_module.normalize_and_validate_provider_base_url("https://example.org/v1") def test_validation_rejects_private_ip_literal(self) -> None: """Private and loopback IP literals are blocked.""" with patch.object(config_module, "get_settings", return_value=_security_settings(allowlist=[])): with self.assertRaises(ValueError): config_module.normalize_and_validate_provider_base_url("https://127.0.0.1/v1") def test_validation_rejects_private_ip_after_dns_resolution(self) -> None: """DNS rebind protection blocks public hostnames resolving to private addresses.""" mocked_dns_response = [ (socket.AF_INET, socket.SOCK_STREAM, socket.IPPROTO_TCP, "", ("127.0.0.1", 443)), ] with ( patch.object(config_module, "get_settings", return_value=_security_settings(allowlist=["api.openai.com"])), patch.object(config_module.socket, "getaddrinfo", return_value=mocked_dns_response), ): with self.assertRaises(ValueError): config_module.normalize_and_validate_provider_base_url( "https://api.openai.com/v1", resolve_dns=True, ) def test_resolve_dns_validation_revalidates_each_call(self) -> None: """DNS-resolved validation is not cached and re-checks host resolution each call.""" mocked_dns_response = [ (socket.AF_INET, socket.SOCK_STREAM, socket.IPPROTO_TCP, "", ("8.8.8.8", 443)), ] with ( patch.object(config_module, "get_settings", return_value=_security_settings(allowlist=["api.openai.com"])), patch.object(config_module.socket, "getaddrinfo", return_value=mocked_dns_response) as getaddrinfo_mock, ): first = config_module.normalize_and_validate_provider_base_url( "https://api.openai.com/v1", resolve_dns=True, ) second = config_module.normalize_and_validate_provider_base_url( "https://api.openai.com/v1", resolve_dns=True, ) self.assertEqual(first, "https://api.openai.com/v1") self.assertEqual(second, "https://api.openai.com/v1") self.assertEqual(getaddrinfo_mock.call_count, 2) class ProcessingLogRedactionTests(unittest.TestCase): """Verifies sensitive processing-log values are redacted for persistence and responses.""" def test_payload_redacts_sensitive_keys(self) -> None: """Sensitive payload keys are replaced with redaction marker.""" sanitized = sanitize_processing_log_payload_value( { "api_key": "secret-value", "nested": { "authorization": "Bearer sample-token", }, } ) self.assertEqual(sanitized["api_key"], "[REDACTED]") self.assertEqual(sanitized["nested"]["authorization"], "[REDACTED]") def test_text_redaction_removes_bearer_and_jwt_values(self) -> None: """Bearer and JWT token substrings are fully removed from log text.""" bearer_token = "super-secret-token-123" jwt_token = ( "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9." "eyJzdWIiOiIxMjM0NTY3ODkwIiwibmFtZSI6IkpvaG4ifQ." "signaturevalue123456789" ) sanitized = sanitize_processing_log_text( f"Authorization: Bearer {bearer_token}\nraw_jwt={jwt_token}" ) self.assertIsNotNone(sanitized) sanitized_text = sanitized or "" self.assertIn("[REDACTED]", sanitized_text) self.assertNotIn(bearer_token, sanitized_text) self.assertNotIn(jwt_token, sanitized_text) def test_text_redaction_removes_json_formatted_secret_values(self) -> None: """JSON-formatted quoted secrets are fully removed from redacted log text.""" api_key_secret = "json-api-key-secret" token_secret = "json-token-secret" authorization_secret = "json-auth-secret" bearer_secret = "json-bearer-secret" json_text = ( "{" f"\"api_key\":\"{api_key_secret}\"," f"\"token\":\"{token_secret}\"," f"\"authorization\":\"Bearer {authorization_secret}\"," f"\"bearer\":\"{bearer_secret}\"" "}" ) sanitized = sanitize_processing_log_text(json_text) self.assertIsNotNone(sanitized) sanitized_text = sanitized or "" self.assertIn("[REDACTED]", sanitized_text) self.assertNotIn(api_key_secret, sanitized_text) self.assertNotIn(token_secret, sanitized_text) self.assertNotIn(authorization_secret, sanitized_text) self.assertNotIn(bearer_secret, sanitized_text) def test_response_schema_applies_redaction_to_existing_entries(self) -> None: """API schema validators redact sensitive fields from legacy stored rows.""" bearer_token = "abc123token" jwt_token = ( "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9." "eyJzdWIiOiIxMjM0NTY3ODkwIiwibmFtZSI6IkpvaG4ifQ." "signaturevalue123456789" ) response = ProcessingLogEntryResponse.model_validate( { "id": 1, "created_at": datetime.now(UTC), "level": "info", "stage": "summary", "event": "response", "document_id": None, "document_filename": "sample.txt", "provider_id": "provider", "model_name": "model", "prompt_text": f"Authorization: Bearer {bearer_token}", "response_text": f"token={jwt_token}", "payload_json": {"password": "secret", "trace_id": "trace-1"}, } ) self.assertEqual(response.payload_json["password"], "[REDACTED]") self.assertIn("[REDACTED]", response.prompt_text or "") self.assertIn("[REDACTED]", response.response_text or "") self.assertNotIn(bearer_token, response.prompt_text or "") self.assertNotIn(jwt_token, response.response_text or "") def test_response_schema_redacts_json_formatted_secret_values(self) -> None: """Response schema redacts quoted JSON secret forms from legacy text fields.""" api_key_secret = "legacy-json-api-key" token_secret = "legacy-json-token" authorization_secret = "legacy-json-auth" bearer_secret = "legacy-json-bearer" prompt_text = ( "{" f"\"api_key\":\"{api_key_secret}\"," f"\"token\":\"{token_secret}\"" "}" ) response_text = ( "{" f"\"authorization\":\"Bearer {authorization_secret}\"," f"\"bearer\":\"{bearer_secret}\"" "}" ) response = ProcessingLogEntryResponse.model_validate( { "id": 2, "created_at": datetime.now(UTC), "level": "info", "stage": "summary", "event": "response", "document_id": None, "document_filename": "sample-json.txt", "provider_id": "provider", "model_name": "model", "prompt_text": prompt_text, "response_text": response_text, "payload_json": {"trace_id": "trace-2"}, } ) self.assertIn("[REDACTED]", response.prompt_text or "") self.assertIn("[REDACTED]", response.response_text or "") self.assertNotIn(api_key_secret, response.prompt_text or "") self.assertNotIn(token_secret, response.prompt_text or "") self.assertNotIn(authorization_secret, response.response_text or "") self.assertNotIn(bearer_secret, response.response_text or "") if __name__ == "__main__": unittest.main()