959 lines
37 KiB
Python
959 lines
37 KiB
Python
"""Persistent single-user application settings service backed by host-mounted storage."""
|
|
|
|
import json
|
|
import re
|
|
from pathlib import Path
|
|
from typing import Any
|
|
|
|
from app.core.config import get_settings
|
|
|
|
|
|
settings = get_settings()
|
|
|
|
|
|
TASK_OCR_HANDWRITING = "ocr_handwriting"
|
|
TASK_SUMMARY_GENERATION = "summary_generation"
|
|
TASK_ROUTING_CLASSIFICATION = "routing_classification"
|
|
HANDWRITING_STYLE_SETTINGS_KEY = "handwriting_style_clustering"
|
|
PROCESSING_LOG_RETENTION_SETTINGS_KEY = "processing_log_retention"
|
|
PREDEFINED_PATHS_SETTINGS_KEY = "predefined_paths"
|
|
PREDEFINED_TAGS_SETTINGS_KEY = "predefined_tags"
|
|
DEFAULT_HANDWRITING_STYLE_EMBED_MODEL = "ts/clip-vit-b-p32"
|
|
|
|
|
|
DEFAULT_OCR_PROMPT = (
|
|
"You are an expert at reading messy handwritten notes, including hard-to-read writing.\n"
|
|
"Task: transcribe the handwriting as exactly as possible.\n\n"
|
|
"Rules:\n"
|
|
"- Output ONLY the transcription in German, no commentary.\n"
|
|
"- Preserve original line breaks where they clearly exist.\n"
|
|
"- Do NOT translate or correct grammar or spelling.\n"
|
|
"- If a word or character is unclear, wrap your best guess in [[? ... ?]].\n"
|
|
"- If something is unreadable, write [[?unleserlich?]] in its place."
|
|
)
|
|
|
|
DEFAULT_SUMMARY_PROMPT = (
|
|
"You summarize documents for indexing and routing.\n"
|
|
"Return concise markdown with key entities, purpose, and document category hints.\n"
|
|
"Do not invent facts and do not include any explanation outside the summary."
|
|
)
|
|
|
|
DEFAULT_ROUTING_PROMPT = (
|
|
"You classify one document into an existing logical path and tags.\n"
|
|
"Prefer existing paths and tags when possible.\n"
|
|
"If the evidence is weak, keep chosen_path as null and use suggestions instead.\n"
|
|
"Return JSON only with this exact shape:\n"
|
|
"{\n"
|
|
" \"chosen_path\": string | null,\n"
|
|
" \"chosen_tags\": string[],\n"
|
|
" \"suggested_new_paths\": string[],\n"
|
|
" \"suggested_new_tags\": string[],\n"
|
|
" \"confidence\": number\n"
|
|
"}\n"
|
|
"Confidence must be between 0 and 1."
|
|
)
|
|
|
|
|
|
def _default_settings() -> dict[str, Any]:
|
|
"""Builds default settings including providers and model task bindings."""
|
|
|
|
return {
|
|
"upload_defaults": {
|
|
"logical_path": "Inbox",
|
|
"tags": [],
|
|
},
|
|
"display": {
|
|
"cards_per_page": 12,
|
|
"log_typing_animation_enabled": True,
|
|
},
|
|
PROCESSING_LOG_RETENTION_SETTINGS_KEY: {
|
|
"keep_document_sessions": 2,
|
|
"keep_unbound_entries": 80,
|
|
},
|
|
PREDEFINED_PATHS_SETTINGS_KEY: [],
|
|
PREDEFINED_TAGS_SETTINGS_KEY: [],
|
|
HANDWRITING_STYLE_SETTINGS_KEY: {
|
|
"enabled": True,
|
|
"embed_model": DEFAULT_HANDWRITING_STYLE_EMBED_MODEL,
|
|
"neighbor_limit": 8,
|
|
"match_min_similarity": 0.86,
|
|
"bootstrap_match_min_similarity": 0.89,
|
|
"bootstrap_sample_size": 3,
|
|
"image_max_side": 1024,
|
|
},
|
|
"providers": [
|
|
{
|
|
"id": "openai-default",
|
|
"label": "OpenAI Default",
|
|
"provider_type": "openai_compatible",
|
|
"base_url": settings.default_openai_base_url,
|
|
"timeout_seconds": settings.default_openai_timeout_seconds,
|
|
"api_key": settings.default_openai_api_key,
|
|
}
|
|
],
|
|
"tasks": {
|
|
TASK_OCR_HANDWRITING: {
|
|
"enabled": settings.default_openai_handwriting_enabled,
|
|
"provider_id": "openai-default",
|
|
"model": settings.default_openai_model,
|
|
"prompt": DEFAULT_OCR_PROMPT,
|
|
},
|
|
TASK_SUMMARY_GENERATION: {
|
|
"enabled": True,
|
|
"provider_id": "openai-default",
|
|
"model": settings.default_summary_model,
|
|
"prompt": DEFAULT_SUMMARY_PROMPT,
|
|
"max_input_tokens": 8000,
|
|
},
|
|
TASK_ROUTING_CLASSIFICATION: {
|
|
"enabled": True,
|
|
"provider_id": "openai-default",
|
|
"model": settings.default_routing_model,
|
|
"prompt": DEFAULT_ROUTING_PROMPT,
|
|
"neighbor_count": 8,
|
|
"neighbor_min_similarity": 0.84,
|
|
"auto_apply_confidence_threshold": 0.78,
|
|
"auto_apply_neighbor_similarity_threshold": 0.55,
|
|
"neighbor_path_override_enabled": True,
|
|
"neighbor_path_override_min_similarity": 0.86,
|
|
"neighbor_path_override_min_gap": 0.04,
|
|
"neighbor_path_override_max_confidence": 0.9,
|
|
},
|
|
},
|
|
}
|
|
|
|
|
|
def _settings_path() -> Path:
|
|
"""Returns the absolute path of the persisted settings file."""
|
|
|
|
return settings.storage_root / "settings.json"
|
|
|
|
|
|
def _clamp_timeout(value: int) -> int:
|
|
"""Clamps timeout values to a safe and practical range."""
|
|
|
|
return max(5, min(180, value))
|
|
|
|
|
|
def _clamp_input_tokens(value: int) -> int:
|
|
"""Clamps per-request summary input token budget values to practical bounds."""
|
|
|
|
return max(512, min(64000, value))
|
|
|
|
|
|
def _clamp_neighbor_count(value: int) -> int:
|
|
"""Clamps nearest-neighbor lookup count for routing classification."""
|
|
|
|
return max(1, min(40, value))
|
|
|
|
|
|
def _clamp_cards_per_page(value: int) -> int:
|
|
"""Clamps dashboard cards-per-page display setting to practical bounds."""
|
|
|
|
return max(1, min(200, value))
|
|
|
|
|
|
def _clamp_processing_log_document_sessions(value: int) -> int:
|
|
"""Clamps the number of recent document log sessions kept during cleanup."""
|
|
|
|
return max(0, min(20, value))
|
|
|
|
|
|
def _clamp_processing_log_unbound_entries(value: int) -> int:
|
|
"""Clamps retained unbound processing log events kept during cleanup."""
|
|
|
|
return max(0, min(400, value))
|
|
|
|
|
|
def _clamp_predefined_entries_limit(value: int) -> int:
|
|
"""Clamps maximum count for predefined tag/path catalog entries."""
|
|
|
|
return max(1, min(2000, value))
|
|
|
|
|
|
def _clamp_handwriting_style_neighbor_limit(value: int) -> int:
|
|
"""Clamps handwriting-style nearest-neighbor count used for style matching."""
|
|
|
|
return max(1, min(32, value))
|
|
|
|
|
|
def _clamp_handwriting_style_sample_size(value: int) -> int:
|
|
"""Clamps handwriting-style bootstrap sample size used for stricter matching."""
|
|
|
|
return max(1, min(30, value))
|
|
|
|
|
|
def _clamp_handwriting_style_image_max_side(value: int) -> int:
|
|
"""Clamps handwriting-style image normalization max-side pixel size."""
|
|
|
|
return max(256, min(4096, value))
|
|
|
|
|
|
def _clamp_probability(value: float, fallback: float) -> float:
|
|
"""Clamps probability-like numbers to the range [0, 1]."""
|
|
|
|
try:
|
|
parsed = float(value)
|
|
except (TypeError, ValueError):
|
|
return fallback
|
|
return max(0.0, min(1.0, parsed))
|
|
|
|
|
|
def _safe_int(value: Any, fallback: int) -> int:
|
|
"""Safely converts arbitrary values to integers with fallback handling."""
|
|
|
|
try:
|
|
return int(value)
|
|
except (TypeError, ValueError):
|
|
return fallback
|
|
|
|
|
|
def _normalize_provider_id(value: str | None, fallback: str) -> str:
|
|
"""Normalizes provider identifiers into stable lowercase slug values."""
|
|
|
|
candidate = (value or "").strip().lower()
|
|
candidate = re.sub(r"[^a-z0-9_-]+", "-", candidate).strip("-")
|
|
return candidate or fallback
|
|
|
|
|
|
def _mask_api_key(value: str) -> str:
|
|
"""Masks a secret API key while retaining enough characters for identification."""
|
|
|
|
if not value:
|
|
return ""
|
|
if len(value) <= 6:
|
|
return "*" * len(value)
|
|
return f"{value[:4]}...{value[-2:]}"
|
|
|
|
|
|
def _normalize_provider(
|
|
payload: dict[str, Any],
|
|
fallback_id: str,
|
|
fallback_values: dict[str, Any],
|
|
) -> dict[str, Any]:
|
|
"""Normalizes one provider payload to a stable shape with bounds and defaults."""
|
|
|
|
defaults = _default_settings()["providers"][0]
|
|
provider_id = _normalize_provider_id(str(payload.get("id", fallback_id)), fallback_id)
|
|
provider_type = str(payload.get("provider_type", fallback_values.get("provider_type", defaults["provider_type"]))).strip()
|
|
if provider_type != "openai_compatible":
|
|
provider_type = "openai_compatible"
|
|
|
|
api_key_value = payload.get("api_key", fallback_values.get("api_key", defaults["api_key"]))
|
|
api_key = str(api_key_value).strip() if api_key_value is not None else ""
|
|
|
|
return {
|
|
"id": provider_id,
|
|
"label": str(payload.get("label", fallback_values.get("label", provider_id))).strip() or provider_id,
|
|
"provider_type": provider_type,
|
|
"base_url": str(payload.get("base_url", fallback_values.get("base_url", defaults["base_url"]))).strip()
|
|
or defaults["base_url"],
|
|
"timeout_seconds": _clamp_timeout(
|
|
_safe_int(
|
|
payload.get("timeout_seconds", fallback_values.get("timeout_seconds", defaults["timeout_seconds"])),
|
|
defaults["timeout_seconds"],
|
|
)
|
|
),
|
|
"api_key": api_key,
|
|
}
|
|
|
|
|
|
def _normalize_ocr_task(payload: dict[str, Any], provider_ids: list[str]) -> dict[str, Any]:
|
|
"""Normalizes OCR task settings while enforcing valid provider references."""
|
|
|
|
defaults = _default_settings()["tasks"][TASK_OCR_HANDWRITING]
|
|
provider_id = str(payload.get("provider_id", defaults["provider_id"])).strip()
|
|
if provider_id not in provider_ids:
|
|
provider_id = provider_ids[0]
|
|
|
|
return {
|
|
"enabled": bool(payload.get("enabled", defaults["enabled"])),
|
|
"provider_id": provider_id,
|
|
"model": str(payload.get("model", defaults["model"])).strip() or defaults["model"],
|
|
"prompt": str(payload.get("prompt", defaults["prompt"])).strip() or defaults["prompt"],
|
|
}
|
|
|
|
|
|
def _normalize_summary_task(payload: dict[str, Any], provider_ids: list[str]) -> dict[str, Any]:
|
|
"""Normalizes summary task settings while enforcing valid provider references."""
|
|
|
|
defaults = _default_settings()["tasks"][TASK_SUMMARY_GENERATION]
|
|
provider_id = str(payload.get("provider_id", defaults["provider_id"])).strip()
|
|
if provider_id not in provider_ids:
|
|
provider_id = provider_ids[0]
|
|
|
|
raw_max_tokens = payload.get("max_input_tokens")
|
|
if raw_max_tokens is None:
|
|
legacy_chars = _safe_int(payload.get("max_source_chars", 0), 0)
|
|
if legacy_chars > 0:
|
|
raw_max_tokens = max(512, legacy_chars // 4)
|
|
else:
|
|
raw_max_tokens = defaults["max_input_tokens"]
|
|
|
|
return {
|
|
"enabled": bool(payload.get("enabled", defaults["enabled"])),
|
|
"provider_id": provider_id,
|
|
"model": str(payload.get("model", defaults["model"])).strip() or defaults["model"],
|
|
"prompt": str(payload.get("prompt", defaults["prompt"])).strip() or defaults["prompt"],
|
|
"max_input_tokens": _clamp_input_tokens(
|
|
_safe_int(raw_max_tokens, defaults["max_input_tokens"])
|
|
),
|
|
}
|
|
|
|
|
|
def _normalize_routing_task(payload: dict[str, Any], provider_ids: list[str]) -> dict[str, Any]:
|
|
"""Normalizes routing task settings while enforcing valid provider references."""
|
|
|
|
defaults = _default_settings()["tasks"][TASK_ROUTING_CLASSIFICATION]
|
|
provider_id = str(payload.get("provider_id", defaults["provider_id"])).strip()
|
|
if provider_id not in provider_ids:
|
|
provider_id = provider_ids[0]
|
|
|
|
return {
|
|
"enabled": bool(payload.get("enabled", defaults["enabled"])),
|
|
"provider_id": provider_id,
|
|
"model": str(payload.get("model", defaults["model"])).strip() or defaults["model"],
|
|
"prompt": str(payload.get("prompt", defaults["prompt"])).strip() or defaults["prompt"],
|
|
"neighbor_count": _clamp_neighbor_count(
|
|
_safe_int(payload.get("neighbor_count", defaults["neighbor_count"]), defaults["neighbor_count"])
|
|
),
|
|
"neighbor_min_similarity": _clamp_probability(
|
|
payload.get("neighbor_min_similarity", defaults["neighbor_min_similarity"]),
|
|
defaults["neighbor_min_similarity"],
|
|
),
|
|
"auto_apply_confidence_threshold": _clamp_probability(
|
|
payload.get("auto_apply_confidence_threshold", defaults["auto_apply_confidence_threshold"]),
|
|
defaults["auto_apply_confidence_threshold"],
|
|
),
|
|
"auto_apply_neighbor_similarity_threshold": _clamp_probability(
|
|
payload.get(
|
|
"auto_apply_neighbor_similarity_threshold",
|
|
defaults["auto_apply_neighbor_similarity_threshold"],
|
|
),
|
|
defaults["auto_apply_neighbor_similarity_threshold"],
|
|
),
|
|
"neighbor_path_override_enabled": bool(
|
|
payload.get("neighbor_path_override_enabled", defaults["neighbor_path_override_enabled"])
|
|
),
|
|
"neighbor_path_override_min_similarity": _clamp_probability(
|
|
payload.get(
|
|
"neighbor_path_override_min_similarity",
|
|
defaults["neighbor_path_override_min_similarity"],
|
|
),
|
|
defaults["neighbor_path_override_min_similarity"],
|
|
),
|
|
"neighbor_path_override_min_gap": _clamp_probability(
|
|
payload.get("neighbor_path_override_min_gap", defaults["neighbor_path_override_min_gap"]),
|
|
defaults["neighbor_path_override_min_gap"],
|
|
),
|
|
"neighbor_path_override_max_confidence": _clamp_probability(
|
|
payload.get(
|
|
"neighbor_path_override_max_confidence",
|
|
defaults["neighbor_path_override_max_confidence"],
|
|
),
|
|
defaults["neighbor_path_override_max_confidence"],
|
|
),
|
|
}
|
|
|
|
|
|
def _normalize_tasks(payload: dict[str, Any], provider_ids: list[str]) -> dict[str, Any]:
|
|
"""Normalizes task settings map for OCR, summarization, and routing tasks."""
|
|
|
|
if not isinstance(payload, dict):
|
|
payload = {}
|
|
return {
|
|
TASK_OCR_HANDWRITING: _normalize_ocr_task(payload.get(TASK_OCR_HANDWRITING, {}), provider_ids),
|
|
TASK_SUMMARY_GENERATION: _normalize_summary_task(payload.get(TASK_SUMMARY_GENERATION, {}), provider_ids),
|
|
TASK_ROUTING_CLASSIFICATION: _normalize_routing_task(payload.get(TASK_ROUTING_CLASSIFICATION, {}), provider_ids),
|
|
}
|
|
|
|
|
|
def _normalize_upload_defaults(payload: dict[str, Any], defaults: dict[str, Any]) -> dict[str, Any]:
|
|
"""Normalizes upload default destination path and tags."""
|
|
|
|
if not isinstance(payload, dict):
|
|
payload = {}
|
|
|
|
default_path = str(defaults.get("logical_path", "Inbox")).strip() or "Inbox"
|
|
raw_path = str(payload.get("logical_path", default_path)).strip()
|
|
logical_path = raw_path or default_path
|
|
|
|
raw_tags = payload.get("tags", defaults.get("tags", []))
|
|
tags: list[str] = []
|
|
seen_lowered: set[str] = set()
|
|
if isinstance(raw_tags, list):
|
|
for raw_tag in raw_tags:
|
|
normalized = str(raw_tag).strip()
|
|
if not normalized:
|
|
continue
|
|
lowered = normalized.lower()
|
|
if lowered in seen_lowered:
|
|
continue
|
|
seen_lowered.add(lowered)
|
|
tags.append(normalized)
|
|
if len(tags) >= 50:
|
|
break
|
|
|
|
return {
|
|
"logical_path": logical_path,
|
|
"tags": tags,
|
|
}
|
|
|
|
|
|
def _normalize_display_settings(payload: dict[str, Any], defaults: dict[str, Any]) -> dict[str, Any]:
|
|
"""Normalizes display settings used by the document dashboard UI."""
|
|
|
|
if not isinstance(payload, dict):
|
|
payload = {}
|
|
|
|
default_cards_per_page = _safe_int(defaults.get("cards_per_page", 12), 12)
|
|
cards_per_page = _clamp_cards_per_page(
|
|
_safe_int(payload.get("cards_per_page", default_cards_per_page), default_cards_per_page)
|
|
)
|
|
return {
|
|
"cards_per_page": cards_per_page,
|
|
"log_typing_animation_enabled": bool(
|
|
payload.get("log_typing_animation_enabled", defaults.get("log_typing_animation_enabled", True))
|
|
),
|
|
}
|
|
|
|
|
|
def _normalize_processing_log_retention(payload: dict[str, Any], defaults: dict[str, Any]) -> dict[str, int]:
|
|
"""Normalizes processing log retention settings used by API and worker cleanup defaults."""
|
|
|
|
if not isinstance(payload, dict):
|
|
payload = {}
|
|
|
|
default_keep_document_sessions = _clamp_processing_log_document_sessions(
|
|
_safe_int(defaults.get("keep_document_sessions", 2), 2)
|
|
)
|
|
default_keep_unbound_entries = _clamp_processing_log_unbound_entries(
|
|
_safe_int(defaults.get("keep_unbound_entries", 80), 80)
|
|
)
|
|
return {
|
|
"keep_document_sessions": _clamp_processing_log_document_sessions(
|
|
_safe_int(payload.get("keep_document_sessions", default_keep_document_sessions), default_keep_document_sessions)
|
|
),
|
|
"keep_unbound_entries": _clamp_processing_log_unbound_entries(
|
|
_safe_int(payload.get("keep_unbound_entries", default_keep_unbound_entries), default_keep_unbound_entries)
|
|
),
|
|
}
|
|
|
|
|
|
def _normalize_predefined_paths(
|
|
payload: Any,
|
|
existing_items: list[dict[str, Any]] | None = None,
|
|
) -> list[dict[str, Any]]:
|
|
"""Normalizes predefined path entries and enforces irreversible global-sharing flag."""
|
|
|
|
existing_map: dict[str, dict[str, Any]] = {}
|
|
if isinstance(existing_items, list):
|
|
for item in existing_items:
|
|
if not isinstance(item, dict):
|
|
continue
|
|
value = str(item.get("value", "")).strip().strip("/")
|
|
if not value:
|
|
continue
|
|
existing_map[value.lower()] = {
|
|
"value": value,
|
|
"global_shared": bool(item.get("global_shared", False)),
|
|
}
|
|
|
|
if not isinstance(payload, list):
|
|
return list(existing_map.values())
|
|
|
|
normalized: list[dict[str, Any]] = []
|
|
seen: set[str] = set()
|
|
limit = _clamp_predefined_entries_limit(len(payload))
|
|
for item in payload:
|
|
if not isinstance(item, dict):
|
|
continue
|
|
value = str(item.get("value", "")).strip().strip("/")
|
|
if not value:
|
|
continue
|
|
lowered = value.lower()
|
|
if lowered in seen:
|
|
continue
|
|
seen.add(lowered)
|
|
existing = existing_map.get(lowered)
|
|
requested_global = bool(item.get("global_shared", False))
|
|
global_shared = bool(existing.get("global_shared", False) if existing else False) or requested_global
|
|
normalized.append(
|
|
{
|
|
"value": value,
|
|
"global_shared": global_shared,
|
|
}
|
|
)
|
|
if len(normalized) >= limit:
|
|
break
|
|
return normalized
|
|
|
|
|
|
def _normalize_predefined_tags(
|
|
payload: Any,
|
|
existing_items: list[dict[str, Any]] | None = None,
|
|
) -> list[dict[str, Any]]:
|
|
"""Normalizes predefined tag entries and enforces irreversible global-sharing flag."""
|
|
|
|
existing_map: dict[str, dict[str, Any]] = {}
|
|
if isinstance(existing_items, list):
|
|
for item in existing_items:
|
|
if not isinstance(item, dict):
|
|
continue
|
|
value = str(item.get("value", "")).strip()
|
|
if not value:
|
|
continue
|
|
existing_map[value.lower()] = {
|
|
"value": value,
|
|
"global_shared": bool(item.get("global_shared", False)),
|
|
}
|
|
|
|
if not isinstance(payload, list):
|
|
return list(existing_map.values())
|
|
|
|
normalized: list[dict[str, Any]] = []
|
|
seen: set[str] = set()
|
|
limit = _clamp_predefined_entries_limit(len(payload))
|
|
for item in payload:
|
|
if not isinstance(item, dict):
|
|
continue
|
|
value = str(item.get("value", "")).strip()
|
|
if not value:
|
|
continue
|
|
lowered = value.lower()
|
|
if lowered in seen:
|
|
continue
|
|
seen.add(lowered)
|
|
existing = existing_map.get(lowered)
|
|
requested_global = bool(item.get("global_shared", False))
|
|
global_shared = bool(existing.get("global_shared", False) if existing else False) or requested_global
|
|
normalized.append(
|
|
{
|
|
"value": value,
|
|
"global_shared": global_shared,
|
|
}
|
|
)
|
|
if len(normalized) >= limit:
|
|
break
|
|
return normalized
|
|
|
|
|
|
def _normalize_handwriting_style_settings(payload: dict[str, Any], defaults: dict[str, Any]) -> dict[str, Any]:
|
|
"""Normalizes handwriting-style clustering settings exposed in the settings UI."""
|
|
|
|
if not isinstance(payload, dict):
|
|
payload = {}
|
|
|
|
default_enabled = bool(defaults.get("enabled", True))
|
|
default_embed_model = str(defaults.get("embed_model", DEFAULT_HANDWRITING_STYLE_EMBED_MODEL)).strip()
|
|
default_neighbor_limit = _safe_int(defaults.get("neighbor_limit", 8), 8)
|
|
default_match_min = _clamp_probability(defaults.get("match_min_similarity", 0.86), 0.86)
|
|
default_bootstrap_match_min = _clamp_probability(defaults.get("bootstrap_match_min_similarity", 0.89), 0.89)
|
|
default_bootstrap_sample_size = _safe_int(defaults.get("bootstrap_sample_size", 3), 3)
|
|
default_image_max_side = _safe_int(defaults.get("image_max_side", 1024), 1024)
|
|
|
|
return {
|
|
"enabled": bool(payload.get("enabled", default_enabled)),
|
|
"embed_model": str(payload.get("embed_model", default_embed_model)).strip() or default_embed_model,
|
|
"neighbor_limit": _clamp_handwriting_style_neighbor_limit(
|
|
_safe_int(payload.get("neighbor_limit", default_neighbor_limit), default_neighbor_limit)
|
|
),
|
|
"match_min_similarity": _clamp_probability(
|
|
payload.get("match_min_similarity", default_match_min),
|
|
default_match_min,
|
|
),
|
|
"bootstrap_match_min_similarity": _clamp_probability(
|
|
payload.get("bootstrap_match_min_similarity", default_bootstrap_match_min),
|
|
default_bootstrap_match_min,
|
|
),
|
|
"bootstrap_sample_size": _clamp_handwriting_style_sample_size(
|
|
_safe_int(payload.get("bootstrap_sample_size", default_bootstrap_sample_size), default_bootstrap_sample_size)
|
|
),
|
|
"image_max_side": _clamp_handwriting_style_image_max_side(
|
|
_safe_int(payload.get("image_max_side", default_image_max_side), default_image_max_side)
|
|
),
|
|
}
|
|
|
|
|
|
def _sanitize_settings(payload: dict[str, Any]) -> dict[str, Any]:
|
|
"""Sanitizes all persisted settings into a stable normalized structure."""
|
|
|
|
if not isinstance(payload, dict):
|
|
payload = {}
|
|
|
|
defaults = _default_settings()
|
|
|
|
providers_payload = payload.get("providers")
|
|
normalized_providers: list[dict[str, Any]] = []
|
|
seen_provider_ids: set[str] = set()
|
|
|
|
if isinstance(providers_payload, list):
|
|
for index, provider_payload in enumerate(providers_payload):
|
|
if not isinstance(provider_payload, dict):
|
|
continue
|
|
fallback = defaults["providers"][0]
|
|
candidate = _normalize_provider(provider_payload, fallback_id=f"provider-{index + 1}", fallback_values=fallback)
|
|
if candidate["id"] in seen_provider_ids:
|
|
continue
|
|
seen_provider_ids.add(candidate["id"])
|
|
normalized_providers.append(candidate)
|
|
|
|
if not normalized_providers:
|
|
normalized_providers = [dict(defaults["providers"][0])]
|
|
|
|
provider_ids = [provider["id"] for provider in normalized_providers]
|
|
tasks_payload = payload.get("tasks", {})
|
|
normalized_tasks = _normalize_tasks(tasks_payload, provider_ids)
|
|
upload_defaults = _normalize_upload_defaults(payload.get("upload_defaults", {}), defaults["upload_defaults"])
|
|
display_settings = _normalize_display_settings(payload.get("display", {}), defaults["display"])
|
|
processing_log_retention = _normalize_processing_log_retention(
|
|
payload.get(PROCESSING_LOG_RETENTION_SETTINGS_KEY, {}),
|
|
defaults[PROCESSING_LOG_RETENTION_SETTINGS_KEY],
|
|
)
|
|
predefined_paths = _normalize_predefined_paths(
|
|
payload.get(PREDEFINED_PATHS_SETTINGS_KEY, []),
|
|
existing_items=payload.get(PREDEFINED_PATHS_SETTINGS_KEY, []),
|
|
)
|
|
predefined_tags = _normalize_predefined_tags(
|
|
payload.get(PREDEFINED_TAGS_SETTINGS_KEY, []),
|
|
existing_items=payload.get(PREDEFINED_TAGS_SETTINGS_KEY, []),
|
|
)
|
|
handwriting_style_settings = _normalize_handwriting_style_settings(
|
|
payload.get(HANDWRITING_STYLE_SETTINGS_KEY, {}),
|
|
defaults[HANDWRITING_STYLE_SETTINGS_KEY],
|
|
)
|
|
|
|
return {
|
|
"upload_defaults": upload_defaults,
|
|
"display": display_settings,
|
|
PROCESSING_LOG_RETENTION_SETTINGS_KEY: processing_log_retention,
|
|
PREDEFINED_PATHS_SETTINGS_KEY: predefined_paths,
|
|
PREDEFINED_TAGS_SETTINGS_KEY: predefined_tags,
|
|
HANDWRITING_STYLE_SETTINGS_KEY: handwriting_style_settings,
|
|
"providers": normalized_providers,
|
|
"tasks": normalized_tasks,
|
|
}
|
|
|
|
|
|
def ensure_app_settings() -> None:
|
|
"""Creates a settings file with defaults when no persisted settings are present."""
|
|
|
|
path = _settings_path()
|
|
path.parent.mkdir(parents=True, exist_ok=True)
|
|
if path.exists():
|
|
return
|
|
|
|
defaults = _sanitize_settings(_default_settings())
|
|
path.write_text(json.dumps(defaults, indent=2), encoding="utf-8")
|
|
|
|
|
|
def _read_raw_settings() -> dict[str, Any]:
|
|
"""Reads persisted settings from disk and returns normalized values."""
|
|
|
|
ensure_app_settings()
|
|
path = _settings_path()
|
|
try:
|
|
payload = json.loads(path.read_text(encoding="utf-8"))
|
|
except (OSError, json.JSONDecodeError):
|
|
payload = {}
|
|
return _sanitize_settings(payload)
|
|
|
|
|
|
def _write_settings(payload: dict[str, Any]) -> None:
|
|
"""Persists sanitized settings payload to host-mounted storage."""
|
|
|
|
path = _settings_path()
|
|
path.parent.mkdir(parents=True, exist_ok=True)
|
|
path.write_text(json.dumps(payload, indent=2), encoding="utf-8")
|
|
|
|
|
|
def read_app_settings() -> dict[str, Any]:
|
|
"""Reads settings and returns a sanitized view safe for API responses."""
|
|
|
|
payload = _read_raw_settings()
|
|
providers_response: list[dict[str, Any]] = []
|
|
for provider in payload["providers"]:
|
|
api_key = str(provider.get("api_key", ""))
|
|
providers_response.append(
|
|
{
|
|
"id": provider["id"],
|
|
"label": provider["label"],
|
|
"provider_type": provider["provider_type"],
|
|
"base_url": provider["base_url"],
|
|
"timeout_seconds": int(provider["timeout_seconds"]),
|
|
"api_key_set": bool(api_key),
|
|
"api_key_masked": _mask_api_key(api_key),
|
|
}
|
|
)
|
|
|
|
return {
|
|
"upload_defaults": payload.get("upload_defaults", {"logical_path": "Inbox", "tags": []}),
|
|
"display": payload.get("display", {"cards_per_page": 12, "log_typing_animation_enabled": True}),
|
|
PROCESSING_LOG_RETENTION_SETTINGS_KEY: payload.get(
|
|
PROCESSING_LOG_RETENTION_SETTINGS_KEY,
|
|
_default_settings()[PROCESSING_LOG_RETENTION_SETTINGS_KEY],
|
|
),
|
|
PREDEFINED_PATHS_SETTINGS_KEY: payload.get(PREDEFINED_PATHS_SETTINGS_KEY, []),
|
|
PREDEFINED_TAGS_SETTINGS_KEY: payload.get(PREDEFINED_TAGS_SETTINGS_KEY, []),
|
|
HANDWRITING_STYLE_SETTINGS_KEY: payload.get(HANDWRITING_STYLE_SETTINGS_KEY, {}),
|
|
"providers": providers_response,
|
|
"tasks": payload["tasks"],
|
|
}
|
|
|
|
|
|
def reset_app_settings() -> dict[str, Any]:
|
|
"""Resets persisted application settings to sanitized repository defaults."""
|
|
|
|
defaults = _sanitize_settings(_default_settings())
|
|
_write_settings(defaults)
|
|
return read_app_settings()
|
|
|
|
|
|
def read_task_runtime_settings(task_name: str) -> dict[str, Any]:
|
|
"""Returns runtime task settings and resolved provider including secret values."""
|
|
|
|
payload = _read_raw_settings()
|
|
tasks = payload["tasks"]
|
|
if task_name not in tasks:
|
|
raise KeyError(f"Unknown task settings key: {task_name}")
|
|
|
|
task = dict(tasks[task_name])
|
|
provider_map = {provider["id"]: provider for provider in payload["providers"]}
|
|
provider = provider_map.get(task.get("provider_id"))
|
|
if provider is None:
|
|
provider = payload["providers"][0]
|
|
task["provider_id"] = provider["id"]
|
|
|
|
return {
|
|
"task": task,
|
|
"provider": dict(provider),
|
|
}
|
|
|
|
|
|
def update_app_settings(
|
|
providers: list[dict[str, Any]] | None = None,
|
|
tasks: dict[str, dict[str, Any]] | None = None,
|
|
upload_defaults: dict[str, Any] | None = None,
|
|
display: dict[str, Any] | None = None,
|
|
processing_log_retention: dict[str, Any] | None = None,
|
|
handwriting_style: dict[str, Any] | None = None,
|
|
predefined_paths: list[dict[str, Any]] | None = None,
|
|
predefined_tags: list[dict[str, Any]] | None = None,
|
|
) -> dict[str, Any]:
|
|
"""Updates app settings blocks, persists them, and returns API-safe values."""
|
|
|
|
current_payload = _read_raw_settings()
|
|
next_payload: dict[str, Any] = {
|
|
"upload_defaults": dict(current_payload.get("upload_defaults", {"logical_path": "Inbox", "tags": []})),
|
|
"display": dict(current_payload.get("display", {"cards_per_page": 12, "log_typing_animation_enabled": True})),
|
|
PROCESSING_LOG_RETENTION_SETTINGS_KEY: dict(
|
|
current_payload.get(
|
|
PROCESSING_LOG_RETENTION_SETTINGS_KEY,
|
|
_default_settings()[PROCESSING_LOG_RETENTION_SETTINGS_KEY],
|
|
)
|
|
),
|
|
PREDEFINED_PATHS_SETTINGS_KEY: list(current_payload.get(PREDEFINED_PATHS_SETTINGS_KEY, [])),
|
|
PREDEFINED_TAGS_SETTINGS_KEY: list(current_payload.get(PREDEFINED_TAGS_SETTINGS_KEY, [])),
|
|
HANDWRITING_STYLE_SETTINGS_KEY: dict(
|
|
current_payload.get(HANDWRITING_STYLE_SETTINGS_KEY, _default_settings()[HANDWRITING_STYLE_SETTINGS_KEY])
|
|
),
|
|
"providers": list(current_payload["providers"]),
|
|
"tasks": dict(current_payload["tasks"]),
|
|
}
|
|
|
|
if providers is not None:
|
|
existing_provider_map = {provider["id"]: provider for provider in current_payload["providers"]}
|
|
next_providers: list[dict[str, Any]] = []
|
|
for index, provider_payload in enumerate(providers):
|
|
if not isinstance(provider_payload, dict):
|
|
continue
|
|
|
|
provider_id = _normalize_provider_id(
|
|
str(provider_payload.get("id", "")),
|
|
fallback=f"provider-{index + 1}",
|
|
)
|
|
existing_provider = existing_provider_map.get(provider_id, {})
|
|
merged_payload = dict(provider_payload)
|
|
merged_payload["id"] = provider_id
|
|
|
|
if bool(provider_payload.get("clear_api_key", False)):
|
|
merged_payload["api_key"] = ""
|
|
elif "api_key" in provider_payload and provider_payload.get("api_key") is not None:
|
|
merged_payload["api_key"] = str(provider_payload.get("api_key")).strip()
|
|
else:
|
|
merged_payload["api_key"] = str(existing_provider.get("api_key", ""))
|
|
|
|
normalized_provider = _normalize_provider(
|
|
merged_payload,
|
|
fallback_id=provider_id,
|
|
fallback_values=existing_provider,
|
|
)
|
|
next_providers.append(normalized_provider)
|
|
|
|
if next_providers:
|
|
next_payload["providers"] = next_providers
|
|
|
|
if tasks is not None:
|
|
merged_tasks = dict(current_payload["tasks"])
|
|
for task_name, task_update in tasks.items():
|
|
if task_name not in merged_tasks or not isinstance(task_update, dict):
|
|
continue
|
|
existing_task = dict(merged_tasks[task_name])
|
|
for key, value in task_update.items():
|
|
if value is None:
|
|
continue
|
|
existing_task[key] = value
|
|
merged_tasks[task_name] = existing_task
|
|
next_payload["tasks"] = merged_tasks
|
|
|
|
if upload_defaults is not None and isinstance(upload_defaults, dict):
|
|
next_upload_defaults = dict(next_payload.get("upload_defaults", {}))
|
|
for key in ("logical_path", "tags"):
|
|
if key in upload_defaults:
|
|
next_upload_defaults[key] = upload_defaults[key]
|
|
next_payload["upload_defaults"] = next_upload_defaults
|
|
|
|
if display is not None and isinstance(display, dict):
|
|
next_display = dict(next_payload.get("display", {}))
|
|
if "cards_per_page" in display:
|
|
next_display["cards_per_page"] = display["cards_per_page"]
|
|
if "log_typing_animation_enabled" in display:
|
|
next_display["log_typing_animation_enabled"] = bool(display["log_typing_animation_enabled"])
|
|
next_payload["display"] = next_display
|
|
|
|
if processing_log_retention is not None and isinstance(processing_log_retention, dict):
|
|
next_retention = dict(next_payload.get(PROCESSING_LOG_RETENTION_SETTINGS_KEY, {}))
|
|
for key in ("keep_document_sessions", "keep_unbound_entries"):
|
|
if key in processing_log_retention:
|
|
next_retention[key] = processing_log_retention[key]
|
|
next_payload[PROCESSING_LOG_RETENTION_SETTINGS_KEY] = next_retention
|
|
|
|
if handwriting_style is not None and isinstance(handwriting_style, dict):
|
|
next_handwriting_style = dict(next_payload.get(HANDWRITING_STYLE_SETTINGS_KEY, {}))
|
|
for key in (
|
|
"enabled",
|
|
"embed_model",
|
|
"neighbor_limit",
|
|
"match_min_similarity",
|
|
"bootstrap_match_min_similarity",
|
|
"bootstrap_sample_size",
|
|
"image_max_side",
|
|
):
|
|
if key in handwriting_style:
|
|
next_handwriting_style[key] = handwriting_style[key]
|
|
next_payload[HANDWRITING_STYLE_SETTINGS_KEY] = next_handwriting_style
|
|
|
|
if predefined_paths is not None:
|
|
next_payload[PREDEFINED_PATHS_SETTINGS_KEY] = _normalize_predefined_paths(
|
|
predefined_paths,
|
|
existing_items=next_payload.get(PREDEFINED_PATHS_SETTINGS_KEY, []),
|
|
)
|
|
|
|
if predefined_tags is not None:
|
|
next_payload[PREDEFINED_TAGS_SETTINGS_KEY] = _normalize_predefined_tags(
|
|
predefined_tags,
|
|
existing_items=next_payload.get(PREDEFINED_TAGS_SETTINGS_KEY, []),
|
|
)
|
|
|
|
sanitized = _sanitize_settings(next_payload)
|
|
_write_settings(sanitized)
|
|
return read_app_settings()
|
|
|
|
|
|
def read_handwriting_provider_settings() -> dict[str, Any]:
|
|
"""Returns OCR settings in legacy shape for the handwriting transcription service."""
|
|
|
|
runtime = read_task_runtime_settings(TASK_OCR_HANDWRITING)
|
|
provider = runtime["provider"]
|
|
task = runtime["task"]
|
|
|
|
return {
|
|
"provider": provider["provider_type"],
|
|
"enabled": bool(task.get("enabled", True)),
|
|
"openai_base_url": str(provider.get("base_url", settings.default_openai_base_url)),
|
|
"openai_model": str(task.get("model", settings.default_openai_model)),
|
|
"openai_timeout_seconds": int(provider.get("timeout_seconds", settings.default_openai_timeout_seconds)),
|
|
"openai_api_key": str(provider.get("api_key", "")),
|
|
"prompt": str(task.get("prompt", DEFAULT_OCR_PROMPT)),
|
|
"provider_id": str(provider.get("id", "openai-default")),
|
|
}
|
|
|
|
|
|
def read_handwriting_style_settings() -> dict[str, Any]:
|
|
"""Returns handwriting-style clustering settings for Typesense style assignment logic."""
|
|
|
|
payload = _read_raw_settings()
|
|
defaults = _default_settings()[HANDWRITING_STYLE_SETTINGS_KEY]
|
|
return _normalize_handwriting_style_settings(
|
|
payload.get(HANDWRITING_STYLE_SETTINGS_KEY, {}),
|
|
defaults,
|
|
)
|
|
|
|
|
|
def read_processing_log_retention_settings() -> dict[str, int]:
|
|
"""Returns normalized processing log retention defaults used by worker and trim APIs."""
|
|
|
|
payload = _read_raw_settings()
|
|
defaults = _default_settings()[PROCESSING_LOG_RETENTION_SETTINGS_KEY]
|
|
return _normalize_processing_log_retention(
|
|
payload.get(PROCESSING_LOG_RETENTION_SETTINGS_KEY, {}),
|
|
defaults,
|
|
)
|
|
|
|
|
|
def read_predefined_paths_settings() -> list[dict[str, Any]]:
|
|
"""Returns normalized predefined logical path catalog entries."""
|
|
|
|
payload = _read_raw_settings()
|
|
return _normalize_predefined_paths(
|
|
payload.get(PREDEFINED_PATHS_SETTINGS_KEY, []),
|
|
existing_items=payload.get(PREDEFINED_PATHS_SETTINGS_KEY, []),
|
|
)
|
|
|
|
|
|
def read_predefined_tags_settings() -> list[dict[str, Any]]:
|
|
"""Returns normalized predefined tag catalog entries."""
|
|
|
|
payload = _read_raw_settings()
|
|
return _normalize_predefined_tags(
|
|
payload.get(PREDEFINED_TAGS_SETTINGS_KEY, []),
|
|
existing_items=payload.get(PREDEFINED_TAGS_SETTINGS_KEY, []),
|
|
)
|
|
|
|
|
|
def update_handwriting_settings(
|
|
enabled: bool | None = None,
|
|
openai_base_url: str | None = None,
|
|
openai_model: str | None = None,
|
|
openai_timeout_seconds: int | None = None,
|
|
openai_api_key: str | None = None,
|
|
clear_openai_api_key: bool = False,
|
|
) -> dict[str, Any]:
|
|
"""Updates OCR task and bound provider values using the legacy handwriting API contract."""
|
|
|
|
runtime = read_task_runtime_settings(TASK_OCR_HANDWRITING)
|
|
provider = runtime["provider"]
|
|
|
|
provider_update: dict[str, Any] = {
|
|
"id": provider["id"],
|
|
"label": provider["label"],
|
|
"provider_type": provider["provider_type"],
|
|
"base_url": openai_base_url if openai_base_url is not None else provider["base_url"],
|
|
"timeout_seconds": openai_timeout_seconds if openai_timeout_seconds is not None else provider["timeout_seconds"],
|
|
}
|
|
if clear_openai_api_key:
|
|
provider_update["clear_api_key"] = True
|
|
elif openai_api_key is not None:
|
|
provider_update["api_key"] = openai_api_key
|
|
|
|
tasks_update: dict[str, dict[str, Any]] = {TASK_OCR_HANDWRITING: {}}
|
|
if enabled is not None:
|
|
tasks_update[TASK_OCR_HANDWRITING]["enabled"] = enabled
|
|
if openai_model is not None:
|
|
tasks_update[TASK_OCR_HANDWRITING]["model"] = openai_model
|
|
|
|
return update_app_settings(
|
|
providers=[provider_update],
|
|
tasks=tasks_update,
|
|
)
|