Files
ledgerdock/backend/tests/test_security_controls.py

787 lines
31 KiB
Python

"""Unit coverage for API auth, SSRF validation, and processing-log redaction controls."""
from __future__ import annotations
from datetime import UTC, datetime
import io
import socket
import sys
import uuid
from pathlib import Path
from types import ModuleType, SimpleNamespace
import unittest
from unittest.mock import patch
import zipfile
BACKEND_ROOT = Path(__file__).resolve().parents[1]
if str(BACKEND_ROOT) not in sys.path:
sys.path.insert(0, str(BACKEND_ROOT))
if "pydantic_settings" not in sys.modules:
pydantic_settings_stub = ModuleType("pydantic_settings")
class _BaseSettings:
"""Minimal BaseSettings replacement for dependency-light unit test execution."""
def __init__(self, **kwargs: object) -> None:
for key, value in kwargs.items():
setattr(self, key, value)
def _settings_config_dict(**kwargs: object) -> dict[str, object]:
"""Returns configuration values using dict semantics expected by settings module."""
return kwargs
pydantic_settings_stub.BaseSettings = _BaseSettings
pydantic_settings_stub.SettingsConfigDict = _settings_config_dict
sys.modules["pydantic_settings"] = pydantic_settings_stub
if "fastapi" not in sys.modules:
fastapi_stub = ModuleType("fastapi")
class _HTTPException(Exception):
"""Minimal HTTPException compatible with route dependency tests."""
def __init__(self, status_code: int, detail: str, headers: dict[str, str] | None = None) -> None:
super().__init__(detail)
self.status_code = status_code
self.detail = detail
self.headers = headers or {}
class _Status:
"""Minimal status namespace for auth unit tests."""
HTTP_401_UNAUTHORIZED = 401
HTTP_403_FORBIDDEN = 403
HTTP_503_SERVICE_UNAVAILABLE = 503
def _depends(dependency): # type: ignore[no-untyped-def]
"""Returns provided dependency unchanged for unit testing."""
return dependency
fastapi_stub.Depends = _depends
fastapi_stub.HTTPException = _HTTPException
fastapi_stub.status = _Status()
sys.modules["fastapi"] = fastapi_stub
if "fastapi.security" not in sys.modules:
fastapi_security_stub = ModuleType("fastapi.security")
class _HTTPAuthorizationCredentials:
"""Minimal bearer credential object used by auth dependency tests."""
def __init__(self, *, scheme: str, credentials: str) -> None:
self.scheme = scheme
self.credentials = credentials
class _HTTPBearer:
"""Minimal HTTPBearer stand-in for dependency construction."""
def __init__(self, auto_error: bool = True) -> None:
self.auto_error = auto_error
fastapi_security_stub.HTTPAuthorizationCredentials = _HTTPAuthorizationCredentials
fastapi_security_stub.HTTPBearer = _HTTPBearer
sys.modules["fastapi.security"] = fastapi_security_stub
if "magic" not in sys.modules:
magic_stub = ModuleType("magic")
def _from_buffer(_data: bytes, mime: bool = True) -> str:
"""Returns deterministic fallback MIME values for extractor import stubs."""
return "application/octet-stream" if mime else ""
magic_stub.from_buffer = _from_buffer
sys.modules["magic"] = magic_stub
if "docx" not in sys.modules:
docx_stub = ModuleType("docx")
class _DocxDocument:
"""Minimal docx document stub for extractor import compatibility."""
def __init__(self, *_args: object, **_kwargs: object) -> None:
self.paragraphs: list[SimpleNamespace] = []
docx_stub.Document = _DocxDocument
sys.modules["docx"] = docx_stub
if "openpyxl" not in sys.modules:
openpyxl_stub = ModuleType("openpyxl")
class _Workbook:
"""Minimal workbook stub for extractor import compatibility."""
worksheets: list[SimpleNamespace] = []
def _load_workbook(*_args: object, **_kwargs: object) -> _Workbook:
"""Returns deterministic workbook placeholder for extractor import stubs."""
return _Workbook()
openpyxl_stub.load_workbook = _load_workbook
sys.modules["openpyxl"] = openpyxl_stub
if "PIL" not in sys.modules:
pil_stub = ModuleType("PIL")
class _Image:
"""Minimal PIL.Image replacement for extractor and handwriting import stubs."""
class Resampling:
"""Minimal enum-like namespace used by handwriting image resize path."""
LANCZOS = 1
@staticmethod
def open(*_args: object, **_kwargs: object) -> "_Image":
"""Raises for unsupported image operations in dependency-light tests."""
raise RuntimeError("Image.open is not available in stub")
class _ImageOps:
"""Minimal PIL.ImageOps replacement for import compatibility."""
@staticmethod
def exif_transpose(image: object) -> object:
"""Returns original image object unchanged in dependency-light tests."""
return image
pil_stub.Image = _Image
pil_stub.ImageOps = _ImageOps
sys.modules["PIL"] = pil_stub
if "pypdf" not in sys.modules:
pypdf_stub = ModuleType("pypdf")
class _PdfReader:
"""Minimal PdfReader replacement for extractor import compatibility."""
def __init__(self, *_args: object, **_kwargs: object) -> None:
self.pages: list[SimpleNamespace] = []
pypdf_stub.PdfReader = _PdfReader
sys.modules["pypdf"] = pypdf_stub
if "pymupdf" not in sys.modules:
pymupdf_stub = ModuleType("pymupdf")
class _Matrix:
"""Minimal matrix placeholder for extractor import compatibility."""
def __init__(self, *_args: object, **_kwargs: object) -> None:
pass
def _open(*_args: object, **_kwargs: object) -> object:
"""Raises when preview rendering is invoked in dependency-light tests."""
raise RuntimeError("pymupdf is not available in stub")
pymupdf_stub.Matrix = _Matrix
pymupdf_stub.open = _open
sys.modules["pymupdf"] = pymupdf_stub
if "app.services.handwriting" not in sys.modules:
handwriting_stub = ModuleType("app.services.handwriting")
class _HandwritingError(Exception):
"""Minimal base error class for extractor import compatibility."""
class _HandwritingNotConfiguredError(_HandwritingError):
"""Minimal not-configured error class for extractor import compatibility."""
class _HandwritingTimeoutError(_HandwritingError):
"""Minimal timeout error class for extractor import compatibility."""
def _classify_image_text_bytes(*_args: object, **_kwargs: object) -> SimpleNamespace:
"""Returns deterministic image text classification fallback."""
return SimpleNamespace(label="unknown", confidence=0.0, provider="stub", model="stub")
def _transcribe_handwriting_bytes(*_args: object, **_kwargs: object) -> SimpleNamespace:
"""Returns deterministic handwriting transcription fallback."""
return SimpleNamespace(text="", uncertainties=[], provider="stub", model="stub")
handwriting_stub.IMAGE_TEXT_TYPE_NO_TEXT = "no_text"
handwriting_stub.IMAGE_TEXT_TYPE_UNKNOWN = "unknown"
handwriting_stub.IMAGE_TEXT_TYPE_HANDWRITING = "handwriting"
handwriting_stub.HandwritingTranscriptionError = _HandwritingError
handwriting_stub.HandwritingTranscriptionNotConfiguredError = _HandwritingNotConfiguredError
handwriting_stub.HandwritingTranscriptionTimeoutError = _HandwritingTimeoutError
handwriting_stub.classify_image_text_bytes = _classify_image_text_bytes
handwriting_stub.transcribe_handwriting_bytes = _transcribe_handwriting_bytes
sys.modules["app.services.handwriting"] = handwriting_stub
if "app.services.handwriting_style" not in sys.modules:
handwriting_style_stub = ModuleType("app.services.handwriting_style")
def _assign_handwriting_style(*_args: object, **_kwargs: object) -> SimpleNamespace:
"""Returns deterministic style assignment payload for worker import compatibility."""
return SimpleNamespace(
style_cluster_id="cluster-1",
matched_existing=False,
similarity=0.0,
vector_distance=0.0,
compared_neighbors=0,
match_min_similarity=0.0,
bootstrap_match_min_similarity=0.0,
)
def _delete_handwriting_style_document(*_args: object, **_kwargs: object) -> None:
"""No-op style document delete stub for worker import compatibility."""
return None
handwriting_style_stub.assign_handwriting_style = _assign_handwriting_style
handwriting_style_stub.delete_handwriting_style_document = _delete_handwriting_style_document
sys.modules["app.services.handwriting_style"] = handwriting_style_stub
if "app.services.routing_pipeline" not in sys.modules:
routing_pipeline_stub = ModuleType("app.services.routing_pipeline")
def _apply_routing_decision(*_args: object, **_kwargs: object) -> None:
"""No-op routing application stub for worker import compatibility."""
return None
def _classify_document_routing(*_args: object, **_kwargs: object) -> dict[str, object]:
"""Returns deterministic routing decision payload for worker import compatibility."""
return {"chosen_path": None, "chosen_tags": []}
def _summarize_document(*_args: object, **_kwargs: object) -> str:
"""Returns deterministic summary text for worker import compatibility."""
return "summary"
def _upsert_semantic_index(*_args: object, **_kwargs: object) -> None:
"""No-op semantic index update stub for worker import compatibility."""
return None
routing_pipeline_stub.apply_routing_decision = _apply_routing_decision
routing_pipeline_stub.classify_document_routing = _classify_document_routing
routing_pipeline_stub.summarize_document = _summarize_document
routing_pipeline_stub.upsert_semantic_index = _upsert_semantic_index
sys.modules["app.services.routing_pipeline"] = routing_pipeline_stub
from fastapi import HTTPException
from fastapi.security import HTTPAuthorizationCredentials
from app.api.auth import AuthRole, get_request_role, require_admin
from app.core import config as config_module
from app.models.processing_log import sanitize_processing_log_payload_value, sanitize_processing_log_text
from app.schemas.processing_logs import ProcessingLogEntryResponse
from app.services import extractor as extractor_module
from app.worker import tasks as worker_tasks_module
def _security_settings(
*,
allowlist: list[str] | None = None,
allow_http: bool = False,
allow_private_network: bool = False,
) -> SimpleNamespace:
"""Builds lightweight settings object for provider URL validation tests."""
return SimpleNamespace(
provider_base_url_allowlist=allowlist if allowlist is not None else ["api.openai.com"],
provider_base_url_allow_http=allow_http,
provider_base_url_allow_private_network=allow_private_network,
)
class AuthDependencyTests(unittest.TestCase):
"""Verifies token authentication and admin authorization behavior."""
def test_get_request_role_accepts_admin_token(self) -> None:
"""Admin token resolves admin role."""
settings = SimpleNamespace(
admin_api_token="admin-token",
user_api_token="user-token",
allow_development_anonymous_user_access=False,
app_env="production",
)
credentials = HTTPAuthorizationCredentials(scheme="Bearer", credentials="admin-token")
role = get_request_role(credentials=credentials, settings=settings)
self.assertEqual(role, AuthRole.ADMIN)
def test_get_request_role_rejects_missing_credentials(self) -> None:
"""Missing bearer credentials return 401."""
settings = SimpleNamespace(
admin_api_token="admin-token",
user_api_token="user-token",
allow_development_anonymous_user_access=False,
app_env="production",
)
with self.assertRaises(HTTPException) as context:
get_request_role(credentials=None, settings=settings)
self.assertEqual(context.exception.status_code, 401)
def test_get_request_role_allows_tokenless_user_access_in_development(self) -> None:
"""Development mode can allow tokenless user role for compatibility."""
settings = SimpleNamespace(
admin_api_token="admin-token",
user_api_token="user-token",
allow_development_anonymous_user_access=True,
app_env="development",
)
role = get_request_role(credentials=None, settings=settings)
self.assertEqual(role, AuthRole.USER)
def test_require_admin_rejects_user_role(self) -> None:
"""User role cannot access admin-only endpoints."""
with self.assertRaises(HTTPException) as context:
require_admin(role=AuthRole.USER)
self.assertEqual(context.exception.status_code, 403)
class ProviderBaseUrlValidationTests(unittest.TestCase):
"""Verifies allowlist, scheme, and private-network SSRF protections."""
def setUp(self) -> None:
"""Clears URL validation cache to keep tests independent."""
config_module._normalize_and_validate_provider_base_url_cached.cache_clear()
def test_validation_accepts_allowlisted_https_url(self) -> None:
"""Allowlisted HTTPS URLs are normalized with /v1 suffix."""
with patch.object(config_module, "get_settings", return_value=_security_settings(allowlist=["api.openai.com"])):
normalized = config_module.normalize_and_validate_provider_base_url("https://api.openai.com")
self.assertEqual(normalized, "https://api.openai.com/v1")
def test_validation_rejects_non_allowlisted_host(self) -> None:
"""Hosts outside configured allowlist are rejected."""
with patch.object(config_module, "get_settings", return_value=_security_settings(allowlist=["api.openai.com"])):
with self.assertRaises(ValueError):
config_module.normalize_and_validate_provider_base_url("https://example.org/v1")
def test_validation_rejects_private_ip_literal(self) -> None:
"""Private and loopback IP literals are blocked."""
with patch.object(config_module, "get_settings", return_value=_security_settings(allowlist=[])):
with self.assertRaises(ValueError):
config_module.normalize_and_validate_provider_base_url("https://127.0.0.1/v1")
def test_validation_rejects_private_ip_after_dns_resolution(self) -> None:
"""DNS rebind protection blocks public hostnames resolving to private addresses."""
mocked_dns_response = [
(socket.AF_INET, socket.SOCK_STREAM, socket.IPPROTO_TCP, "", ("127.0.0.1", 443)),
]
with (
patch.object(config_module, "get_settings", return_value=_security_settings(allowlist=["api.openai.com"])),
patch.object(config_module.socket, "getaddrinfo", return_value=mocked_dns_response),
):
with self.assertRaises(ValueError):
config_module.normalize_and_validate_provider_base_url(
"https://api.openai.com/v1",
resolve_dns=True,
)
def test_resolve_dns_validation_revalidates_each_call(self) -> None:
"""DNS-resolved validation is not cached and re-checks host resolution each call."""
mocked_dns_response = [
(socket.AF_INET, socket.SOCK_STREAM, socket.IPPROTO_TCP, "", ("8.8.8.8", 443)),
]
with (
patch.object(config_module, "get_settings", return_value=_security_settings(allowlist=["api.openai.com"])),
patch.object(config_module.socket, "getaddrinfo", return_value=mocked_dns_response) as getaddrinfo_mock,
):
first = config_module.normalize_and_validate_provider_base_url(
"https://api.openai.com/v1",
resolve_dns=True,
)
second = config_module.normalize_and_validate_provider_base_url(
"https://api.openai.com/v1",
resolve_dns=True,
)
self.assertEqual(first, "https://api.openai.com/v1")
self.assertEqual(second, "https://api.openai.com/v1")
self.assertEqual(getaddrinfo_mock.call_count, 2)
class RedisQueueSecurityTests(unittest.TestCase):
"""Verifies Redis URL security policy behavior for compatibility and strict environments."""
def test_auto_mode_allows_insecure_redis_url_in_development(self) -> None:
"""Development mode stays backward-compatible with local unauthenticated redis URLs."""
normalized = config_module.validate_redis_url_security(
"redis://redis:6379/0",
app_env="development",
security_mode="auto",
tls_mode="auto",
)
self.assertEqual(normalized, "redis://redis:6379/0")
def test_auto_mode_rejects_missing_auth_in_production(self) -> None:
"""Production auto mode fails closed when Redis URL omits authentication."""
with self.assertRaises(ValueError):
config_module.validate_redis_url_security(
"rediss://redis:6379/0",
app_env="production",
security_mode="auto",
tls_mode="auto",
)
def test_auto_mode_rejects_plaintext_redis_in_production(self) -> None:
"""Production auto mode requires TLS transport for Redis URLs."""
with self.assertRaises(ValueError):
config_module.validate_redis_url_security(
"redis://:password@redis:6379/0",
app_env="production",
security_mode="auto",
tls_mode="auto",
)
def test_strict_mode_enforces_auth_and_tls_outside_production(self) -> None:
"""Strict mode enforces production-grade Redis controls in all environments."""
with self.assertRaises(ValueError):
config_module.validate_redis_url_security(
"redis://redis:6379/0",
app_env="development",
security_mode="strict",
tls_mode="auto",
)
normalized = config_module.validate_redis_url_security(
"rediss://:password@redis:6379/0",
app_env="development",
security_mode="strict",
tls_mode="auto",
)
self.assertEqual(normalized, "rediss://:password@redis:6379/0")
def test_compat_mode_allows_insecure_redis_in_production_for_safe_migration(self) -> None:
"""Compatibility mode keeps legacy production Redis URLs usable during migration windows."""
normalized = config_module.validate_redis_url_security(
"redis://redis:6379/0",
app_env="production",
security_mode="compat",
tls_mode="allow_insecure",
)
self.assertEqual(normalized, "redis://redis:6379/0")
class PreviewMimeSafetyTests(unittest.TestCase):
"""Verifies inline preview MIME safety checks for uploaded document responses."""
def test_preview_blocks_script_capable_html_and_svg_types(self) -> None:
"""HTML and SVG MIME types are rejected for inline preview responses."""
self.assertFalse(config_module.is_inline_preview_mime_type_safe("text/html"))
self.assertFalse(config_module.is_inline_preview_mime_type_safe("image/svg+xml"))
def test_preview_allows_pdf_and_safe_image_types(self) -> None:
"""PDF and raster image MIME types stay eligible for inline preview responses."""
self.assertTrue(config_module.is_inline_preview_mime_type_safe("application/pdf"))
self.assertTrue(config_module.is_inline_preview_mime_type_safe("image/png"))
def _build_zip_bytes(entries: dict[str, bytes]) -> bytes:
"""Builds in-memory ZIP bytes for archive extraction guardrail tests."""
output = io.BytesIO()
with zipfile.ZipFile(output, mode="w", compression=zipfile.ZIP_DEFLATED) as archive:
for filename, payload in entries.items():
archive.writestr(filename, payload)
return output.getvalue()
class ArchiveExtractionGuardrailTests(unittest.TestCase):
"""Verifies depth-aware archive extraction and per-call member cap enforcement."""
def test_extract_archive_members_rejects_depth_at_configured_limit(self) -> None:
"""Archive member extraction is disabled at or beyond configured depth ceiling."""
archive_bytes = _build_zip_bytes({"sample.txt": b"sample"})
patched_settings = SimpleNamespace(
max_zip_depth=2,
max_zip_members=250,
max_zip_member_uncompressed_bytes=25 * 1024 * 1024,
max_zip_total_uncompressed_bytes=150 * 1024 * 1024,
max_zip_compression_ratio=120.0,
)
with patch.object(extractor_module, "settings", patched_settings):
members = extractor_module.extract_archive_members(archive_bytes, depth=2)
self.assertEqual(members, [])
def test_extract_archive_members_respects_member_cap_argument(self) -> None:
"""Archive extraction truncates results when caller-provided member cap is lower than archive size."""
archive_bytes = _build_zip_bytes(
{
"one.txt": b"1",
"two.txt": b"2",
"three.txt": b"3",
}
)
patched_settings = SimpleNamespace(
max_zip_depth=3,
max_zip_members=250,
max_zip_member_uncompressed_bytes=25 * 1024 * 1024,
max_zip_total_uncompressed_bytes=150 * 1024 * 1024,
max_zip_compression_ratio=120.0,
)
with patch.object(extractor_module, "settings", patched_settings):
members = extractor_module.extract_archive_members(archive_bytes, depth=0, max_members=1)
self.assertEqual(len(members), 1)
class ArchiveLineagePropagationTests(unittest.TestCase):
"""Verifies archive lineage metadata propagation helpers used by worker descendant queueing."""
def test_create_archive_member_document_persists_lineage_metadata(self) -> None:
"""Child archive documents include root id and incremented depth metadata."""
parent_id = uuid.uuid4()
parent = SimpleNamespace(
id=parent_id,
source_relative_path="uploads/root.zip",
logical_path="Inbox",
tags=["finance"],
)
with (
patch.object(worker_tasks_module, "store_bytes", return_value="stored/path/child.zip"),
patch.object(worker_tasks_module, "compute_sha256", return_value="deadbeef"),
):
child = worker_tasks_module._create_archive_member_document(
parent=parent,
member_name="nested/child.zip",
member_data=b"zip-bytes",
mime_type="application/zip",
archive_root_document_id=parent_id,
archive_depth=1,
)
self.assertEqual(child.parent_document_id, parent_id)
self.assertEqual(child.metadata_json.get(worker_tasks_module.ARCHIVE_ROOT_ID_METADATA_KEY), str(parent_id))
self.assertEqual(child.metadata_json.get(worker_tasks_module.ARCHIVE_DEPTH_METADATA_KEY), 1)
self.assertTrue(child.is_archive_member)
def test_resolve_archive_lineage_prefers_existing_metadata(self) -> None:
"""Existing archive lineage metadata is reused without traversing parent relationships."""
root_id = uuid.uuid4()
document = SimpleNamespace(
id=uuid.uuid4(),
metadata_json={
worker_tasks_module.ARCHIVE_ROOT_ID_METADATA_KEY: str(root_id),
worker_tasks_module.ARCHIVE_DEPTH_METADATA_KEY: 3,
},
is_archive_member=True,
parent_document_id=uuid.uuid4(),
)
class _SessionShouldNotBeUsed:
"""Fails test if lineage helper performs unnecessary parent traversals."""
def execute(self, _statement: object) -> object:
raise AssertionError("session.execute should not be called when metadata is present")
resolved_root, resolved_depth = worker_tasks_module._resolve_archive_lineage(
session=_SessionShouldNotBeUsed(),
document=document,
)
self.assertEqual(resolved_root, root_id)
self.assertEqual(resolved_depth, 3)
def test_resolve_archive_lineage_walks_parent_chain_when_metadata_missing(self) -> None:
"""Lineage fallback traverses parent references to recover root id and depth."""
root_id = uuid.uuid4()
parent_id = uuid.uuid4()
root_document = SimpleNamespace(id=root_id, parent_document_id=None)
parent_document = SimpleNamespace(id=parent_id, parent_document_id=root_id)
document = SimpleNamespace(
id=uuid.uuid4(),
metadata_json={},
is_archive_member=True,
parent_document_id=parent_id,
)
class _ScalarResult:
"""Wraps scalar ORM results for deterministic worker helper tests."""
def __init__(self, value: object) -> None:
self._value = value
def scalar_one_or_none(self) -> object:
return self._value
class _SequenceSession:
"""Returns predetermined parent rows in traversal order."""
def __init__(self, values: list[object]) -> None:
self._values = values
def execute(self, _statement: object) -> _ScalarResult:
next_value = self._values.pop(0) if self._values else None
return _ScalarResult(next_value)
resolved_root, resolved_depth = worker_tasks_module._resolve_archive_lineage(
session=_SequenceSession([parent_document, root_document]),
document=document,
)
self.assertEqual(resolved_root, root_id)
self.assertEqual(resolved_depth, 2)
class ProcessingLogRedactionTests(unittest.TestCase):
"""Verifies sensitive processing-log values are redacted for persistence and responses."""
def test_payload_redacts_sensitive_keys(self) -> None:
"""Sensitive payload keys are replaced with redaction marker."""
sanitized = sanitize_processing_log_payload_value(
{
"api_key": "secret-value",
"nested": {
"authorization": "Bearer sample-token",
},
}
)
self.assertEqual(sanitized["api_key"], "[REDACTED]")
self.assertEqual(sanitized["nested"]["authorization"], "[REDACTED]")
def test_text_redaction_removes_bearer_and_jwt_values(self) -> None:
"""Bearer and JWT token substrings are fully removed from log text."""
bearer_token = "super-secret-token-123"
jwt_token = (
"eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9."
"eyJzdWIiOiIxMjM0NTY3ODkwIiwibmFtZSI6IkpvaG4ifQ."
"signaturevalue123456789"
)
sanitized = sanitize_processing_log_text(
f"Authorization: Bearer {bearer_token}\nraw_jwt={jwt_token}"
)
self.assertIsNotNone(sanitized)
sanitized_text = sanitized or ""
self.assertIn("[REDACTED]", sanitized_text)
self.assertNotIn(bearer_token, sanitized_text)
self.assertNotIn(jwt_token, sanitized_text)
def test_text_redaction_removes_json_formatted_secret_values(self) -> None:
"""JSON-formatted quoted secrets are fully removed from redacted log text."""
api_key_secret = "json-api-key-secret"
token_secret = "json-token-secret"
authorization_secret = "json-auth-secret"
bearer_secret = "json-bearer-secret"
json_text = (
"{"
f"\"api_key\":\"{api_key_secret}\","
f"\"token\":\"{token_secret}\","
f"\"authorization\":\"Bearer {authorization_secret}\","
f"\"bearer\":\"{bearer_secret}\""
"}"
)
sanitized = sanitize_processing_log_text(json_text)
self.assertIsNotNone(sanitized)
sanitized_text = sanitized or ""
self.assertIn("[REDACTED]", sanitized_text)
self.assertNotIn(api_key_secret, sanitized_text)
self.assertNotIn(token_secret, sanitized_text)
self.assertNotIn(authorization_secret, sanitized_text)
self.assertNotIn(bearer_secret, sanitized_text)
def test_response_schema_applies_redaction_to_existing_entries(self) -> None:
"""API schema validators redact sensitive fields from legacy stored rows."""
bearer_token = "abc123token"
jwt_token = (
"eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9."
"eyJzdWIiOiIxMjM0NTY3ODkwIiwibmFtZSI6IkpvaG4ifQ."
"signaturevalue123456789"
)
response = ProcessingLogEntryResponse.model_validate(
{
"id": 1,
"created_at": datetime.now(UTC),
"level": "info",
"stage": "summary",
"event": "response",
"document_id": None,
"document_filename": "sample.txt",
"provider_id": "provider",
"model_name": "model",
"prompt_text": f"Authorization: Bearer {bearer_token}",
"response_text": f"token={jwt_token}",
"payload_json": {"password": "secret", "trace_id": "trace-1"},
}
)
self.assertEqual(response.payload_json["password"], "[REDACTED]")
self.assertIn("[REDACTED]", response.prompt_text or "")
self.assertIn("[REDACTED]", response.response_text or "")
self.assertNotIn(bearer_token, response.prompt_text or "")
self.assertNotIn(jwt_token, response.response_text or "")
def test_response_schema_redacts_json_formatted_secret_values(self) -> None:
"""Response schema redacts quoted JSON secret forms from legacy text fields."""
api_key_secret = "legacy-json-api-key"
token_secret = "legacy-json-token"
authorization_secret = "legacy-json-auth"
bearer_secret = "legacy-json-bearer"
prompt_text = (
"{"
f"\"api_key\":\"{api_key_secret}\","
f"\"token\":\"{token_secret}\""
"}"
)
response_text = (
"{"
f"\"authorization\":\"Bearer {authorization_secret}\","
f"\"bearer\":\"{bearer_secret}\""
"}"
)
response = ProcessingLogEntryResponse.model_validate(
{
"id": 2,
"created_at": datetime.now(UTC),
"level": "info",
"stage": "summary",
"event": "response",
"document_id": None,
"document_filename": "sample-json.txt",
"provider_id": "provider",
"model_name": "model",
"prompt_text": prompt_text,
"response_text": response_text,
"payload_json": {"trace_id": "trace-2"},
}
)
self.assertIn("[REDACTED]", response.prompt_text or "")
self.assertIn("[REDACTED]", response.response_text or "")
self.assertNotIn(api_key_secret, response.prompt_text or "")
self.assertNotIn(token_secret, response.prompt_text or "")
self.assertNotIn(authorization_secret, response.response_text or "")
self.assertNotIn(bearer_secret, response.response_text or "")
if __name__ == "__main__":
unittest.main()