Harden settings read sanitization for invalid providers
This commit is contained in:
149
backend/tests/test_app_settings_provider_resilience.py
Normal file
149
backend/tests/test_app_settings_provider_resilience.py
Normal file
@@ -0,0 +1,149 @@
|
||||
"""Unit coverage for resilient provider sanitization in persisted app settings."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import sys
|
||||
import unittest
|
||||
from pathlib import Path
|
||||
from types import ModuleType
|
||||
from typing import Any
|
||||
from unittest.mock import patch
|
||||
|
||||
|
||||
BACKEND_ROOT = Path(__file__).resolve().parents[1]
|
||||
if str(BACKEND_ROOT) not in sys.path:
|
||||
sys.path.insert(0, str(BACKEND_ROOT))
|
||||
|
||||
if "pydantic_settings" not in sys.modules:
|
||||
pydantic_settings_stub = ModuleType("pydantic_settings")
|
||||
|
||||
class _BaseSettings:
|
||||
"""Minimal BaseSettings replacement for dependency-light unit test execution."""
|
||||
|
||||
def __init__(self, **kwargs: object) -> None:
|
||||
for key, value in kwargs.items():
|
||||
setattr(self, key, value)
|
||||
|
||||
def _settings_config_dict(**kwargs: object) -> dict[str, object]:
|
||||
"""Returns configuration values using dict semantics expected by settings module."""
|
||||
|
||||
return kwargs
|
||||
|
||||
pydantic_settings_stub.BaseSettings = _BaseSettings
|
||||
pydantic_settings_stub.SettingsConfigDict = _settings_config_dict
|
||||
sys.modules["pydantic_settings"] = pydantic_settings_stub
|
||||
|
||||
from app.services import app_settings
|
||||
|
||||
|
||||
def _sample_current_payload() -> dict[str, Any]:
|
||||
"""Builds a sanitized payload used as in-memory persistence fixture for update tests."""
|
||||
|
||||
return app_settings._sanitize_settings(app_settings._default_settings())
|
||||
|
||||
|
||||
class AppSettingsProviderResilienceTests(unittest.TestCase):
|
||||
"""Verifies read-path resilience for corrupt persisted providers without weakening writes."""
|
||||
|
||||
def test_sanitize_settings_skips_invalid_persisted_provider_entries(self) -> None:
|
||||
"""Invalid persisted providers are skipped and tasks rebind to remaining valid providers."""
|
||||
|
||||
payload = {
|
||||
"providers": [
|
||||
{
|
||||
"id": "insecure-provider",
|
||||
"label": "Insecure Provider",
|
||||
"provider_type": "openai_compatible",
|
||||
"base_url": "http://api.openai.com/v1",
|
||||
"timeout_seconds": 45,
|
||||
"api_key": "",
|
||||
},
|
||||
{
|
||||
"id": "secure-provider",
|
||||
"label": "Secure Provider",
|
||||
"provider_type": "openai_compatible",
|
||||
"base_url": "https://api.openai.com/v1",
|
||||
"timeout_seconds": 45,
|
||||
"api_key": "",
|
||||
},
|
||||
],
|
||||
"tasks": {
|
||||
app_settings.TASK_OCR_HANDWRITING: {"provider_id": "insecure-provider"},
|
||||
app_settings.TASK_SUMMARY_GENERATION: {"provider_id": "insecure-provider"},
|
||||
app_settings.TASK_ROUTING_CLASSIFICATION: {"provider_id": "insecure-provider"},
|
||||
},
|
||||
}
|
||||
|
||||
sanitized = app_settings._sanitize_settings(payload)
|
||||
self.assertEqual([provider["id"] for provider in sanitized["providers"]], ["secure-provider"])
|
||||
self.assertEqual(
|
||||
sanitized["tasks"][app_settings.TASK_OCR_HANDWRITING]["provider_id"],
|
||||
"secure-provider",
|
||||
)
|
||||
self.assertEqual(
|
||||
sanitized["tasks"][app_settings.TASK_SUMMARY_GENERATION]["provider_id"],
|
||||
"secure-provider",
|
||||
)
|
||||
self.assertEqual(
|
||||
sanitized["tasks"][app_settings.TASK_ROUTING_CLASSIFICATION]["provider_id"],
|
||||
"secure-provider",
|
||||
)
|
||||
|
||||
def test_sanitize_settings_uses_default_provider_when_all_persisted_entries_are_invalid(self) -> None:
|
||||
"""Default provider is restored when all persisted provider rows are invalid."""
|
||||
|
||||
payload = {
|
||||
"providers": [
|
||||
{
|
||||
"id": "insecure-provider",
|
||||
"label": "Insecure Provider",
|
||||
"provider_type": "openai_compatible",
|
||||
"base_url": "http://api.openai.com/v1",
|
||||
"timeout_seconds": 45,
|
||||
"api_key": "",
|
||||
}
|
||||
]
|
||||
}
|
||||
|
||||
sanitized = app_settings._sanitize_settings(payload)
|
||||
defaults = app_settings._default_settings()
|
||||
default_provider_id = defaults["providers"][0]["id"]
|
||||
self.assertEqual(sanitized["providers"][0]["id"], default_provider_id)
|
||||
self.assertEqual(sanitized["providers"][0]["base_url"], defaults["providers"][0]["base_url"])
|
||||
self.assertEqual(
|
||||
sanitized["tasks"][app_settings.TASK_OCR_HANDWRITING]["provider_id"],
|
||||
default_provider_id,
|
||||
)
|
||||
self.assertEqual(
|
||||
sanitized["tasks"][app_settings.TASK_SUMMARY_GENERATION]["provider_id"],
|
||||
default_provider_id,
|
||||
)
|
||||
self.assertEqual(
|
||||
sanitized["tasks"][app_settings.TASK_ROUTING_CLASSIFICATION]["provider_id"],
|
||||
default_provider_id,
|
||||
)
|
||||
|
||||
def test_update_app_settings_keeps_provider_base_url_validation_strict(self) -> None:
|
||||
"""Provider write updates still reject invalid base URLs instead of silently sanitizing."""
|
||||
|
||||
current_payload = _sample_current_payload()
|
||||
current_provider = current_payload["providers"][0]
|
||||
provider_update = {
|
||||
"id": current_provider["id"],
|
||||
"label": current_provider["label"],
|
||||
"provider_type": current_provider["provider_type"],
|
||||
"base_url": "http://api.openai.com/v1",
|
||||
"timeout_seconds": current_provider["timeout_seconds"],
|
||||
}
|
||||
|
||||
with (
|
||||
patch.object(app_settings, "_read_raw_settings", return_value=current_payload),
|
||||
patch.object(app_settings, "_write_settings") as write_settings_mock,
|
||||
):
|
||||
with self.assertRaises(app_settings.AppSettingsValidationError):
|
||||
app_settings.update_app_settings(providers=[provider_update])
|
||||
write_settings_mock.assert_not_called()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
Reference in New Issue
Block a user