"""Persistent single-user application settings service backed by host-mounted storage.""" import json import re from pathlib import Path from typing import Any from app.core.config import get_settings settings = get_settings() TASK_OCR_HANDWRITING = "ocr_handwriting" TASK_SUMMARY_GENERATION = "summary_generation" TASK_ROUTING_CLASSIFICATION = "routing_classification" HANDWRITING_STYLE_SETTINGS_KEY = "handwriting_style_clustering" PROCESSING_LOG_RETENTION_SETTINGS_KEY = "processing_log_retention" PREDEFINED_PATHS_SETTINGS_KEY = "predefined_paths" PREDEFINED_TAGS_SETTINGS_KEY = "predefined_tags" DEFAULT_HANDWRITING_STYLE_EMBED_MODEL = "ts/clip-vit-b-p32" DEFAULT_OCR_PROMPT = ( "You are an expert at reading messy handwritten notes, including hard-to-read writing.\n" "Task: transcribe the handwriting as exactly as possible.\n\n" "Rules:\n" "- Output ONLY the transcription in German, no commentary.\n" "- Preserve original line breaks where they clearly exist.\n" "- Do NOT translate or correct grammar or spelling.\n" "- If a word or character is unclear, wrap your best guess in [[? ... ?]].\n" "- If something is unreadable, write [[?unleserlich?]] in its place." ) DEFAULT_SUMMARY_PROMPT = ( "You summarize documents for indexing and routing.\n" "Return concise markdown with key entities, purpose, and document category hints.\n" "Do not invent facts and do not include any explanation outside the summary." ) DEFAULT_ROUTING_PROMPT = ( "You classify one document into an existing logical path and tags.\n" "Prefer existing paths and tags when possible.\n" "If the evidence is weak, keep chosen_path as null and use suggestions instead.\n" "Return JSON only with this exact shape:\n" "{\n" " \"chosen_path\": string | null,\n" " \"chosen_tags\": string[],\n" " \"suggested_new_paths\": string[],\n" " \"suggested_new_tags\": string[],\n" " \"confidence\": number\n" "}\n" "Confidence must be between 0 and 1." ) def _default_settings() -> dict[str, Any]: """Builds default settings including providers and model task bindings.""" return { "upload_defaults": { "logical_path": "Inbox", "tags": [], }, "display": { "cards_per_page": 12, "log_typing_animation_enabled": True, }, PROCESSING_LOG_RETENTION_SETTINGS_KEY: { "keep_document_sessions": 2, "keep_unbound_entries": 80, }, PREDEFINED_PATHS_SETTINGS_KEY: [], PREDEFINED_TAGS_SETTINGS_KEY: [], HANDWRITING_STYLE_SETTINGS_KEY: { "enabled": True, "embed_model": DEFAULT_HANDWRITING_STYLE_EMBED_MODEL, "neighbor_limit": 8, "match_min_similarity": 0.86, "bootstrap_match_min_similarity": 0.89, "bootstrap_sample_size": 3, "image_max_side": 1024, }, "providers": [ { "id": "openai-default", "label": "OpenAI Default", "provider_type": "openai_compatible", "base_url": settings.default_openai_base_url, "timeout_seconds": settings.default_openai_timeout_seconds, "api_key": settings.default_openai_api_key, } ], "tasks": { TASK_OCR_HANDWRITING: { "enabled": settings.default_openai_handwriting_enabled, "provider_id": "openai-default", "model": settings.default_openai_model, "prompt": DEFAULT_OCR_PROMPT, }, TASK_SUMMARY_GENERATION: { "enabled": True, "provider_id": "openai-default", "model": settings.default_summary_model, "prompt": DEFAULT_SUMMARY_PROMPT, "max_input_tokens": 8000, }, TASK_ROUTING_CLASSIFICATION: { "enabled": True, "provider_id": "openai-default", "model": settings.default_routing_model, "prompt": DEFAULT_ROUTING_PROMPT, "neighbor_count": 8, "neighbor_min_similarity": 0.84, "auto_apply_confidence_threshold": 0.78, "auto_apply_neighbor_similarity_threshold": 0.55, "neighbor_path_override_enabled": True, "neighbor_path_override_min_similarity": 0.86, "neighbor_path_override_min_gap": 0.04, "neighbor_path_override_max_confidence": 0.9, }, }, } def _settings_path() -> Path: """Returns the absolute path of the persisted settings file.""" return settings.storage_root / "settings.json" def _clamp_timeout(value: int) -> int: """Clamps timeout values to a safe and practical range.""" return max(5, min(180, value)) def _clamp_input_tokens(value: int) -> int: """Clamps per-request summary input token budget values to practical bounds.""" return max(512, min(64000, value)) def _clamp_neighbor_count(value: int) -> int: """Clamps nearest-neighbor lookup count for routing classification.""" return max(1, min(40, value)) def _clamp_cards_per_page(value: int) -> int: """Clamps dashboard cards-per-page display setting to practical bounds.""" return max(1, min(200, value)) def _clamp_processing_log_document_sessions(value: int) -> int: """Clamps the number of recent document log sessions kept during cleanup.""" return max(0, min(20, value)) def _clamp_processing_log_unbound_entries(value: int) -> int: """Clamps retained unbound processing log events kept during cleanup.""" return max(0, min(400, value)) def _clamp_predefined_entries_limit(value: int) -> int: """Clamps maximum count for predefined tag/path catalog entries.""" return max(1, min(2000, value)) def _clamp_handwriting_style_neighbor_limit(value: int) -> int: """Clamps handwriting-style nearest-neighbor count used for style matching.""" return max(1, min(32, value)) def _clamp_handwriting_style_sample_size(value: int) -> int: """Clamps handwriting-style bootstrap sample size used for stricter matching.""" return max(1, min(30, value)) def _clamp_handwriting_style_image_max_side(value: int) -> int: """Clamps handwriting-style image normalization max-side pixel size.""" return max(256, min(4096, value)) def _clamp_probability(value: float, fallback: float) -> float: """Clamps probability-like numbers to the range [0, 1].""" try: parsed = float(value) except (TypeError, ValueError): return fallback return max(0.0, min(1.0, parsed)) def _safe_int(value: Any, fallback: int) -> int: """Safely converts arbitrary values to integers with fallback handling.""" try: return int(value) except (TypeError, ValueError): return fallback def _normalize_provider_id(value: str | None, fallback: str) -> str: """Normalizes provider identifiers into stable lowercase slug values.""" candidate = (value or "").strip().lower() candidate = re.sub(r"[^a-z0-9_-]+", "-", candidate).strip("-") return candidate or fallback def _mask_api_key(value: str) -> str: """Masks a secret API key while retaining enough characters for identification.""" if not value: return "" if len(value) <= 6: return "*" * len(value) return f"{value[:4]}...{value[-2:]}" def _normalize_provider( payload: dict[str, Any], fallback_id: str, fallback_values: dict[str, Any], ) -> dict[str, Any]: """Normalizes one provider payload to a stable shape with bounds and defaults.""" defaults = _default_settings()["providers"][0] provider_id = _normalize_provider_id(str(payload.get("id", fallback_id)), fallback_id) provider_type = str(payload.get("provider_type", fallback_values.get("provider_type", defaults["provider_type"]))).strip() if provider_type != "openai_compatible": provider_type = "openai_compatible" api_key_value = payload.get("api_key", fallback_values.get("api_key", defaults["api_key"])) api_key = str(api_key_value).strip() if api_key_value is not None else "" return { "id": provider_id, "label": str(payload.get("label", fallback_values.get("label", provider_id))).strip() or provider_id, "provider_type": provider_type, "base_url": str(payload.get("base_url", fallback_values.get("base_url", defaults["base_url"]))).strip() or defaults["base_url"], "timeout_seconds": _clamp_timeout( _safe_int( payload.get("timeout_seconds", fallback_values.get("timeout_seconds", defaults["timeout_seconds"])), defaults["timeout_seconds"], ) ), "api_key": api_key, } def _normalize_ocr_task(payload: dict[str, Any], provider_ids: list[str]) -> dict[str, Any]: """Normalizes OCR task settings while enforcing valid provider references.""" defaults = _default_settings()["tasks"][TASK_OCR_HANDWRITING] provider_id = str(payload.get("provider_id", defaults["provider_id"])).strip() if provider_id not in provider_ids: provider_id = provider_ids[0] return { "enabled": bool(payload.get("enabled", defaults["enabled"])), "provider_id": provider_id, "model": str(payload.get("model", defaults["model"])).strip() or defaults["model"], "prompt": str(payload.get("prompt", defaults["prompt"])).strip() or defaults["prompt"], } def _normalize_summary_task(payload: dict[str, Any], provider_ids: list[str]) -> dict[str, Any]: """Normalizes summary task settings while enforcing valid provider references.""" defaults = _default_settings()["tasks"][TASK_SUMMARY_GENERATION] provider_id = str(payload.get("provider_id", defaults["provider_id"])).strip() if provider_id not in provider_ids: provider_id = provider_ids[0] raw_max_tokens = payload.get("max_input_tokens") if raw_max_tokens is None: legacy_chars = _safe_int(payload.get("max_source_chars", 0), 0) if legacy_chars > 0: raw_max_tokens = max(512, legacy_chars // 4) else: raw_max_tokens = defaults["max_input_tokens"] return { "enabled": bool(payload.get("enabled", defaults["enabled"])), "provider_id": provider_id, "model": str(payload.get("model", defaults["model"])).strip() or defaults["model"], "prompt": str(payload.get("prompt", defaults["prompt"])).strip() or defaults["prompt"], "max_input_tokens": _clamp_input_tokens( _safe_int(raw_max_tokens, defaults["max_input_tokens"]) ), } def _normalize_routing_task(payload: dict[str, Any], provider_ids: list[str]) -> dict[str, Any]: """Normalizes routing task settings while enforcing valid provider references.""" defaults = _default_settings()["tasks"][TASK_ROUTING_CLASSIFICATION] provider_id = str(payload.get("provider_id", defaults["provider_id"])).strip() if provider_id not in provider_ids: provider_id = provider_ids[0] return { "enabled": bool(payload.get("enabled", defaults["enabled"])), "provider_id": provider_id, "model": str(payload.get("model", defaults["model"])).strip() or defaults["model"], "prompt": str(payload.get("prompt", defaults["prompt"])).strip() or defaults["prompt"], "neighbor_count": _clamp_neighbor_count( _safe_int(payload.get("neighbor_count", defaults["neighbor_count"]), defaults["neighbor_count"]) ), "neighbor_min_similarity": _clamp_probability( payload.get("neighbor_min_similarity", defaults["neighbor_min_similarity"]), defaults["neighbor_min_similarity"], ), "auto_apply_confidence_threshold": _clamp_probability( payload.get("auto_apply_confidence_threshold", defaults["auto_apply_confidence_threshold"]), defaults["auto_apply_confidence_threshold"], ), "auto_apply_neighbor_similarity_threshold": _clamp_probability( payload.get( "auto_apply_neighbor_similarity_threshold", defaults["auto_apply_neighbor_similarity_threshold"], ), defaults["auto_apply_neighbor_similarity_threshold"], ), "neighbor_path_override_enabled": bool( payload.get("neighbor_path_override_enabled", defaults["neighbor_path_override_enabled"]) ), "neighbor_path_override_min_similarity": _clamp_probability( payload.get( "neighbor_path_override_min_similarity", defaults["neighbor_path_override_min_similarity"], ), defaults["neighbor_path_override_min_similarity"], ), "neighbor_path_override_min_gap": _clamp_probability( payload.get("neighbor_path_override_min_gap", defaults["neighbor_path_override_min_gap"]), defaults["neighbor_path_override_min_gap"], ), "neighbor_path_override_max_confidence": _clamp_probability( payload.get( "neighbor_path_override_max_confidence", defaults["neighbor_path_override_max_confidence"], ), defaults["neighbor_path_override_max_confidence"], ), } def _normalize_tasks(payload: dict[str, Any], provider_ids: list[str]) -> dict[str, Any]: """Normalizes task settings map for OCR, summarization, and routing tasks.""" if not isinstance(payload, dict): payload = {} return { TASK_OCR_HANDWRITING: _normalize_ocr_task(payload.get(TASK_OCR_HANDWRITING, {}), provider_ids), TASK_SUMMARY_GENERATION: _normalize_summary_task(payload.get(TASK_SUMMARY_GENERATION, {}), provider_ids), TASK_ROUTING_CLASSIFICATION: _normalize_routing_task(payload.get(TASK_ROUTING_CLASSIFICATION, {}), provider_ids), } def _normalize_upload_defaults(payload: dict[str, Any], defaults: dict[str, Any]) -> dict[str, Any]: """Normalizes upload default destination path and tags.""" if not isinstance(payload, dict): payload = {} default_path = str(defaults.get("logical_path", "Inbox")).strip() or "Inbox" raw_path = str(payload.get("logical_path", default_path)).strip() logical_path = raw_path or default_path raw_tags = payload.get("tags", defaults.get("tags", [])) tags: list[str] = [] seen_lowered: set[str] = set() if isinstance(raw_tags, list): for raw_tag in raw_tags: normalized = str(raw_tag).strip() if not normalized: continue lowered = normalized.lower() if lowered in seen_lowered: continue seen_lowered.add(lowered) tags.append(normalized) if len(tags) >= 50: break return { "logical_path": logical_path, "tags": tags, } def _normalize_display_settings(payload: dict[str, Any], defaults: dict[str, Any]) -> dict[str, Any]: """Normalizes display settings used by the document dashboard UI.""" if not isinstance(payload, dict): payload = {} default_cards_per_page = _safe_int(defaults.get("cards_per_page", 12), 12) cards_per_page = _clamp_cards_per_page( _safe_int(payload.get("cards_per_page", default_cards_per_page), default_cards_per_page) ) return { "cards_per_page": cards_per_page, "log_typing_animation_enabled": bool( payload.get("log_typing_animation_enabled", defaults.get("log_typing_animation_enabled", True)) ), } def _normalize_processing_log_retention(payload: dict[str, Any], defaults: dict[str, Any]) -> dict[str, int]: """Normalizes processing log retention settings used by API and worker cleanup defaults.""" if not isinstance(payload, dict): payload = {} default_keep_document_sessions = _clamp_processing_log_document_sessions( _safe_int(defaults.get("keep_document_sessions", 2), 2) ) default_keep_unbound_entries = _clamp_processing_log_unbound_entries( _safe_int(defaults.get("keep_unbound_entries", 80), 80) ) return { "keep_document_sessions": _clamp_processing_log_document_sessions( _safe_int(payload.get("keep_document_sessions", default_keep_document_sessions), default_keep_document_sessions) ), "keep_unbound_entries": _clamp_processing_log_unbound_entries( _safe_int(payload.get("keep_unbound_entries", default_keep_unbound_entries), default_keep_unbound_entries) ), } def _normalize_predefined_paths( payload: Any, existing_items: list[dict[str, Any]] | None = None, ) -> list[dict[str, Any]]: """Normalizes predefined path entries and enforces irreversible global-sharing flag.""" existing_map: dict[str, dict[str, Any]] = {} if isinstance(existing_items, list): for item in existing_items: if not isinstance(item, dict): continue value = str(item.get("value", "")).strip().strip("/") if not value: continue existing_map[value.lower()] = { "value": value, "global_shared": bool(item.get("global_shared", False)), } if not isinstance(payload, list): return list(existing_map.values()) normalized: list[dict[str, Any]] = [] seen: set[str] = set() limit = _clamp_predefined_entries_limit(len(payload)) for item in payload: if not isinstance(item, dict): continue value = str(item.get("value", "")).strip().strip("/") if not value: continue lowered = value.lower() if lowered in seen: continue seen.add(lowered) existing = existing_map.get(lowered) requested_global = bool(item.get("global_shared", False)) global_shared = bool(existing.get("global_shared", False) if existing else False) or requested_global normalized.append( { "value": value, "global_shared": global_shared, } ) if len(normalized) >= limit: break return normalized def _normalize_predefined_tags( payload: Any, existing_items: list[dict[str, Any]] | None = None, ) -> list[dict[str, Any]]: """Normalizes predefined tag entries and enforces irreversible global-sharing flag.""" existing_map: dict[str, dict[str, Any]] = {} if isinstance(existing_items, list): for item in existing_items: if not isinstance(item, dict): continue value = str(item.get("value", "")).strip() if not value: continue existing_map[value.lower()] = { "value": value, "global_shared": bool(item.get("global_shared", False)), } if not isinstance(payload, list): return list(existing_map.values()) normalized: list[dict[str, Any]] = [] seen: set[str] = set() limit = _clamp_predefined_entries_limit(len(payload)) for item in payload: if not isinstance(item, dict): continue value = str(item.get("value", "")).strip() if not value: continue lowered = value.lower() if lowered in seen: continue seen.add(lowered) existing = existing_map.get(lowered) requested_global = bool(item.get("global_shared", False)) global_shared = bool(existing.get("global_shared", False) if existing else False) or requested_global normalized.append( { "value": value, "global_shared": global_shared, } ) if len(normalized) >= limit: break return normalized def _normalize_handwriting_style_settings(payload: dict[str, Any], defaults: dict[str, Any]) -> dict[str, Any]: """Normalizes handwriting-style clustering settings exposed in the settings UI.""" if not isinstance(payload, dict): payload = {} default_enabled = bool(defaults.get("enabled", True)) default_embed_model = str(defaults.get("embed_model", DEFAULT_HANDWRITING_STYLE_EMBED_MODEL)).strip() default_neighbor_limit = _safe_int(defaults.get("neighbor_limit", 8), 8) default_match_min = _clamp_probability(defaults.get("match_min_similarity", 0.86), 0.86) default_bootstrap_match_min = _clamp_probability(defaults.get("bootstrap_match_min_similarity", 0.89), 0.89) default_bootstrap_sample_size = _safe_int(defaults.get("bootstrap_sample_size", 3), 3) default_image_max_side = _safe_int(defaults.get("image_max_side", 1024), 1024) return { "enabled": bool(payload.get("enabled", default_enabled)), "embed_model": str(payload.get("embed_model", default_embed_model)).strip() or default_embed_model, "neighbor_limit": _clamp_handwriting_style_neighbor_limit( _safe_int(payload.get("neighbor_limit", default_neighbor_limit), default_neighbor_limit) ), "match_min_similarity": _clamp_probability( payload.get("match_min_similarity", default_match_min), default_match_min, ), "bootstrap_match_min_similarity": _clamp_probability( payload.get("bootstrap_match_min_similarity", default_bootstrap_match_min), default_bootstrap_match_min, ), "bootstrap_sample_size": _clamp_handwriting_style_sample_size( _safe_int(payload.get("bootstrap_sample_size", default_bootstrap_sample_size), default_bootstrap_sample_size) ), "image_max_side": _clamp_handwriting_style_image_max_side( _safe_int(payload.get("image_max_side", default_image_max_side), default_image_max_side) ), } def _sanitize_settings(payload: dict[str, Any]) -> dict[str, Any]: """Sanitizes all persisted settings into a stable normalized structure.""" if not isinstance(payload, dict): payload = {} defaults = _default_settings() providers_payload = payload.get("providers") normalized_providers: list[dict[str, Any]] = [] seen_provider_ids: set[str] = set() if isinstance(providers_payload, list): for index, provider_payload in enumerate(providers_payload): if not isinstance(provider_payload, dict): continue fallback = defaults["providers"][0] candidate = _normalize_provider(provider_payload, fallback_id=f"provider-{index + 1}", fallback_values=fallback) if candidate["id"] in seen_provider_ids: continue seen_provider_ids.add(candidate["id"]) normalized_providers.append(candidate) if not normalized_providers: normalized_providers = [dict(defaults["providers"][0])] provider_ids = [provider["id"] for provider in normalized_providers] tasks_payload = payload.get("tasks", {}) normalized_tasks = _normalize_tasks(tasks_payload, provider_ids) upload_defaults = _normalize_upload_defaults(payload.get("upload_defaults", {}), defaults["upload_defaults"]) display_settings = _normalize_display_settings(payload.get("display", {}), defaults["display"]) processing_log_retention = _normalize_processing_log_retention( payload.get(PROCESSING_LOG_RETENTION_SETTINGS_KEY, {}), defaults[PROCESSING_LOG_RETENTION_SETTINGS_KEY], ) predefined_paths = _normalize_predefined_paths( payload.get(PREDEFINED_PATHS_SETTINGS_KEY, []), existing_items=payload.get(PREDEFINED_PATHS_SETTINGS_KEY, []), ) predefined_tags = _normalize_predefined_tags( payload.get(PREDEFINED_TAGS_SETTINGS_KEY, []), existing_items=payload.get(PREDEFINED_TAGS_SETTINGS_KEY, []), ) handwriting_style_settings = _normalize_handwriting_style_settings( payload.get(HANDWRITING_STYLE_SETTINGS_KEY, {}), defaults[HANDWRITING_STYLE_SETTINGS_KEY], ) return { "upload_defaults": upload_defaults, "display": display_settings, PROCESSING_LOG_RETENTION_SETTINGS_KEY: processing_log_retention, PREDEFINED_PATHS_SETTINGS_KEY: predefined_paths, PREDEFINED_TAGS_SETTINGS_KEY: predefined_tags, HANDWRITING_STYLE_SETTINGS_KEY: handwriting_style_settings, "providers": normalized_providers, "tasks": normalized_tasks, } def ensure_app_settings() -> None: """Creates a settings file with defaults when no persisted settings are present.""" path = _settings_path() path.parent.mkdir(parents=True, exist_ok=True) if path.exists(): return defaults = _sanitize_settings(_default_settings()) path.write_text(json.dumps(defaults, indent=2), encoding="utf-8") def _read_raw_settings() -> dict[str, Any]: """Reads persisted settings from disk and returns normalized values.""" ensure_app_settings() path = _settings_path() try: payload = json.loads(path.read_text(encoding="utf-8")) except (OSError, json.JSONDecodeError): payload = {} return _sanitize_settings(payload) def _write_settings(payload: dict[str, Any]) -> None: """Persists sanitized settings payload to host-mounted storage.""" path = _settings_path() path.parent.mkdir(parents=True, exist_ok=True) path.write_text(json.dumps(payload, indent=2), encoding="utf-8") def read_app_settings() -> dict[str, Any]: """Reads settings and returns a sanitized view safe for API responses.""" payload = _read_raw_settings() providers_response: list[dict[str, Any]] = [] for provider in payload["providers"]: api_key = str(provider.get("api_key", "")) providers_response.append( { "id": provider["id"], "label": provider["label"], "provider_type": provider["provider_type"], "base_url": provider["base_url"], "timeout_seconds": int(provider["timeout_seconds"]), "api_key_set": bool(api_key), "api_key_masked": _mask_api_key(api_key), } ) return { "upload_defaults": payload.get("upload_defaults", {"logical_path": "Inbox", "tags": []}), "display": payload.get("display", {"cards_per_page": 12, "log_typing_animation_enabled": True}), PROCESSING_LOG_RETENTION_SETTINGS_KEY: payload.get( PROCESSING_LOG_RETENTION_SETTINGS_KEY, _default_settings()[PROCESSING_LOG_RETENTION_SETTINGS_KEY], ), PREDEFINED_PATHS_SETTINGS_KEY: payload.get(PREDEFINED_PATHS_SETTINGS_KEY, []), PREDEFINED_TAGS_SETTINGS_KEY: payload.get(PREDEFINED_TAGS_SETTINGS_KEY, []), HANDWRITING_STYLE_SETTINGS_KEY: payload.get(HANDWRITING_STYLE_SETTINGS_KEY, {}), "providers": providers_response, "tasks": payload["tasks"], } def reset_app_settings() -> dict[str, Any]: """Resets persisted application settings to sanitized repository defaults.""" defaults = _sanitize_settings(_default_settings()) _write_settings(defaults) return read_app_settings() def read_task_runtime_settings(task_name: str) -> dict[str, Any]: """Returns runtime task settings and resolved provider including secret values.""" payload = _read_raw_settings() tasks = payload["tasks"] if task_name not in tasks: raise KeyError(f"Unknown task settings key: {task_name}") task = dict(tasks[task_name]) provider_map = {provider["id"]: provider for provider in payload["providers"]} provider = provider_map.get(task.get("provider_id")) if provider is None: provider = payload["providers"][0] task["provider_id"] = provider["id"] return { "task": task, "provider": dict(provider), } def update_app_settings( providers: list[dict[str, Any]] | None = None, tasks: dict[str, dict[str, Any]] | None = None, upload_defaults: dict[str, Any] | None = None, display: dict[str, Any] | None = None, processing_log_retention: dict[str, Any] | None = None, handwriting_style: dict[str, Any] | None = None, predefined_paths: list[dict[str, Any]] | None = None, predefined_tags: list[dict[str, Any]] | None = None, ) -> dict[str, Any]: """Updates app settings blocks, persists them, and returns API-safe values.""" current_payload = _read_raw_settings() next_payload: dict[str, Any] = { "upload_defaults": dict(current_payload.get("upload_defaults", {"logical_path": "Inbox", "tags": []})), "display": dict(current_payload.get("display", {"cards_per_page": 12, "log_typing_animation_enabled": True})), PROCESSING_LOG_RETENTION_SETTINGS_KEY: dict( current_payload.get( PROCESSING_LOG_RETENTION_SETTINGS_KEY, _default_settings()[PROCESSING_LOG_RETENTION_SETTINGS_KEY], ) ), PREDEFINED_PATHS_SETTINGS_KEY: list(current_payload.get(PREDEFINED_PATHS_SETTINGS_KEY, [])), PREDEFINED_TAGS_SETTINGS_KEY: list(current_payload.get(PREDEFINED_TAGS_SETTINGS_KEY, [])), HANDWRITING_STYLE_SETTINGS_KEY: dict( current_payload.get(HANDWRITING_STYLE_SETTINGS_KEY, _default_settings()[HANDWRITING_STYLE_SETTINGS_KEY]) ), "providers": list(current_payload["providers"]), "tasks": dict(current_payload["tasks"]), } if providers is not None: existing_provider_map = {provider["id"]: provider for provider in current_payload["providers"]} next_providers: list[dict[str, Any]] = [] for index, provider_payload in enumerate(providers): if not isinstance(provider_payload, dict): continue provider_id = _normalize_provider_id( str(provider_payload.get("id", "")), fallback=f"provider-{index + 1}", ) existing_provider = existing_provider_map.get(provider_id, {}) merged_payload = dict(provider_payload) merged_payload["id"] = provider_id if bool(provider_payload.get("clear_api_key", False)): merged_payload["api_key"] = "" elif "api_key" in provider_payload and provider_payload.get("api_key") is not None: merged_payload["api_key"] = str(provider_payload.get("api_key")).strip() else: merged_payload["api_key"] = str(existing_provider.get("api_key", "")) normalized_provider = _normalize_provider( merged_payload, fallback_id=provider_id, fallback_values=existing_provider, ) next_providers.append(normalized_provider) if next_providers: next_payload["providers"] = next_providers if tasks is not None: merged_tasks = dict(current_payload["tasks"]) for task_name, task_update in tasks.items(): if task_name not in merged_tasks or not isinstance(task_update, dict): continue existing_task = dict(merged_tasks[task_name]) for key, value in task_update.items(): if value is None: continue existing_task[key] = value merged_tasks[task_name] = existing_task next_payload["tasks"] = merged_tasks if upload_defaults is not None and isinstance(upload_defaults, dict): next_upload_defaults = dict(next_payload.get("upload_defaults", {})) for key in ("logical_path", "tags"): if key in upload_defaults: next_upload_defaults[key] = upload_defaults[key] next_payload["upload_defaults"] = next_upload_defaults if display is not None and isinstance(display, dict): next_display = dict(next_payload.get("display", {})) if "cards_per_page" in display: next_display["cards_per_page"] = display["cards_per_page"] if "log_typing_animation_enabled" in display: next_display["log_typing_animation_enabled"] = bool(display["log_typing_animation_enabled"]) next_payload["display"] = next_display if processing_log_retention is not None and isinstance(processing_log_retention, dict): next_retention = dict(next_payload.get(PROCESSING_LOG_RETENTION_SETTINGS_KEY, {})) for key in ("keep_document_sessions", "keep_unbound_entries"): if key in processing_log_retention: next_retention[key] = processing_log_retention[key] next_payload[PROCESSING_LOG_RETENTION_SETTINGS_KEY] = next_retention if handwriting_style is not None and isinstance(handwriting_style, dict): next_handwriting_style = dict(next_payload.get(HANDWRITING_STYLE_SETTINGS_KEY, {})) for key in ( "enabled", "embed_model", "neighbor_limit", "match_min_similarity", "bootstrap_match_min_similarity", "bootstrap_sample_size", "image_max_side", ): if key in handwriting_style: next_handwriting_style[key] = handwriting_style[key] next_payload[HANDWRITING_STYLE_SETTINGS_KEY] = next_handwriting_style if predefined_paths is not None: next_payload[PREDEFINED_PATHS_SETTINGS_KEY] = _normalize_predefined_paths( predefined_paths, existing_items=next_payload.get(PREDEFINED_PATHS_SETTINGS_KEY, []), ) if predefined_tags is not None: next_payload[PREDEFINED_TAGS_SETTINGS_KEY] = _normalize_predefined_tags( predefined_tags, existing_items=next_payload.get(PREDEFINED_TAGS_SETTINGS_KEY, []), ) sanitized = _sanitize_settings(next_payload) _write_settings(sanitized) return read_app_settings() def read_handwriting_provider_settings() -> dict[str, Any]: """Returns OCR settings in legacy shape for the handwriting transcription service.""" runtime = read_task_runtime_settings(TASK_OCR_HANDWRITING) provider = runtime["provider"] task = runtime["task"] return { "provider": provider["provider_type"], "enabled": bool(task.get("enabled", True)), "openai_base_url": str(provider.get("base_url", settings.default_openai_base_url)), "openai_model": str(task.get("model", settings.default_openai_model)), "openai_timeout_seconds": int(provider.get("timeout_seconds", settings.default_openai_timeout_seconds)), "openai_api_key": str(provider.get("api_key", "")), "prompt": str(task.get("prompt", DEFAULT_OCR_PROMPT)), "provider_id": str(provider.get("id", "openai-default")), } def read_handwriting_style_settings() -> dict[str, Any]: """Returns handwriting-style clustering settings for Typesense style assignment logic.""" payload = _read_raw_settings() defaults = _default_settings()[HANDWRITING_STYLE_SETTINGS_KEY] return _normalize_handwriting_style_settings( payload.get(HANDWRITING_STYLE_SETTINGS_KEY, {}), defaults, ) def read_processing_log_retention_settings() -> dict[str, int]: """Returns normalized processing log retention defaults used by worker and trim APIs.""" payload = _read_raw_settings() defaults = _default_settings()[PROCESSING_LOG_RETENTION_SETTINGS_KEY] return _normalize_processing_log_retention( payload.get(PROCESSING_LOG_RETENTION_SETTINGS_KEY, {}), defaults, ) def read_predefined_paths_settings() -> list[dict[str, Any]]: """Returns normalized predefined logical path catalog entries.""" payload = _read_raw_settings() return _normalize_predefined_paths( payload.get(PREDEFINED_PATHS_SETTINGS_KEY, []), existing_items=payload.get(PREDEFINED_PATHS_SETTINGS_KEY, []), ) def read_predefined_tags_settings() -> list[dict[str, Any]]: """Returns normalized predefined tag catalog entries.""" payload = _read_raw_settings() return _normalize_predefined_tags( payload.get(PREDEFINED_TAGS_SETTINGS_KEY, []), existing_items=payload.get(PREDEFINED_TAGS_SETTINGS_KEY, []), ) def update_handwriting_settings( enabled: bool | None = None, openai_base_url: str | None = None, openai_model: str | None = None, openai_timeout_seconds: int | None = None, openai_api_key: str | None = None, clear_openai_api_key: bool = False, ) -> dict[str, Any]: """Updates OCR task and bound provider values using the legacy handwriting API contract.""" runtime = read_task_runtime_settings(TASK_OCR_HANDWRITING) provider = runtime["provider"] provider_update: dict[str, Any] = { "id": provider["id"], "label": provider["label"], "provider_type": provider["provider_type"], "base_url": openai_base_url if openai_base_url is not None else provider["base_url"], "timeout_seconds": openai_timeout_seconds if openai_timeout_seconds is not None else provider["timeout_seconds"], } if clear_openai_api_key: provider_update["clear_api_key"] = True elif openai_api_key is not None: provider_update["api_key"] = openai_api_key tasks_update: dict[str, dict[str, Any]] = {TASK_OCR_HANDWRITING: {}} if enabled is not None: tasks_update[TASK_OCR_HANDWRITING]["enabled"] = enabled if openai_model is not None: tasks_update[TASK_OCR_HANDWRITING]["model"] = openai_model return update_app_settings( providers=[provider_update], tasks=tasks_update, )