Files
ledgerdock/backend/app/services/routing_pipeline.py
2026-02-21 09:44:18 -03:00

1130 lines
44 KiB
Python

"""Document summarization, semantic-neighbor lookup, and LLM routing decisions."""
import json
import re
from dataclasses import dataclass
from functools import lru_cache
from typing import Any
from uuid import UUID
from sqlalchemy import select
from sqlalchemy.orm import Session
from app.models.document import Document, DocumentStatus
from app.services.app_settings import (
TASK_ROUTING_CLASSIFICATION,
TASK_SUMMARY_GENERATION,
read_predefined_paths_settings,
read_predefined_tags_settings,
read_task_runtime_settings,
)
from app.services.handwriting_style import apply_handwriting_style_path, resolve_handwriting_style_path_prefix
from app.services.model_runtime import ModelTaskDisabledError, ModelTaskError, complete_text_task
from app.services.processing_logs import log_processing_event
from app.services.typesense_index import SimilarDocument, query_similar_documents, upsert_document_index
try:
import tiktoken
except ImportError: # pragma: no cover
tiktoken = None
SUMMARY_MIN_INPUT_TOKENS = 512
SUMMARY_MAX_INPUT_TOKENS = 64000
SUMMARY_MIN_TEXT_CHUNK_TOKENS = 256
SUMMARY_REDUCTION_MAX_ROUNDS = 4
SUMMARY_REDUCTION_GROUP_SIZE = 8
@dataclass
class RoutingDecision:
"""Represents LLM-produced routing output enriched with neighbor similarity signals."""
chosen_path: str | None
chosen_tags: list[str]
suggested_new_paths: list[str]
suggested_new_tags: list[str]
confidence: float
neighbor_similarity: float
def _clamp_probability(value: Any, fallback: float = 0.0) -> float:
"""Clamps a numeric value into [0, 1] for confidence-like fields."""
try:
parsed = float(value)
except (TypeError, ValueError):
return fallback
return max(0.0, min(1.0, parsed))
def _clamp_summary_input_tokens(value: Any, fallback: int = 8000) -> int:
"""Clamps summary input token limits to bounded runtime-safe values."""
try:
parsed = int(value)
except (TypeError, ValueError):
return fallback
return max(SUMMARY_MIN_INPUT_TOKENS, min(SUMMARY_MAX_INPUT_TOKENS, parsed))
@lru_cache(maxsize=128)
def _resolve_encoding_name(model_name: str) -> str:
"""Resolves the tokenizer encoding name for a model using tiktoken with fallback."""
if tiktoken is None:
return ""
candidate = model_name.strip()
if candidate:
try:
return tiktoken.encoding_for_model(candidate).name
except Exception:
pass
return "cl100k_base"
def _estimate_token_count(text: str, model_name: str) -> int:
"""Estimates token count for text using model-aware tokenization with safe fallback."""
normalized = text.strip()
if not normalized:
return 0
if tiktoken is None:
return max(1, len(normalized) // 4)
try:
encoding = tiktoken.get_encoding(_resolve_encoding_name(model_name))
return len(encoding.encode(normalized, disallowed_special=()))
except Exception:
return max(1, len(normalized) // 4)
def _split_text_by_tokens(text: str, model_name: str, max_tokens_per_chunk: int) -> list[str]:
"""Splits text into token-bounded chunks while preserving full source coverage."""
normalized = text.strip()
if not normalized:
return []
chunk_tokens = max(SUMMARY_MIN_TEXT_CHUNK_TOKENS, int(max_tokens_per_chunk))
if tiktoken is None:
approx_chars = max(1200, chunk_tokens * 4)
return [normalized[index : index + approx_chars] for index in range(0, len(normalized), approx_chars)]
try:
encoding = tiktoken.get_encoding(_resolve_encoding_name(model_name))
token_ids = encoding.encode(normalized, disallowed_special=())
if not token_ids:
return []
chunks: list[str] = []
for start in range(0, len(token_ids), chunk_tokens):
part = encoding.decode(token_ids[start : start + chunk_tokens]).strip()
if part:
chunks.append(part)
return chunks
except Exception:
approx_chars = max(1200, chunk_tokens * 4)
return [normalized[index : index + approx_chars] for index in range(0, len(normalized), approx_chars)]
def _truncate_text_to_token_budget(text: str, model_name: str, max_tokens: int) -> str:
"""Truncates text to fit within a token budget while retaining original ordering."""
normalized = text.strip()
if not normalized:
return ""
chunks = _split_text_by_tokens(
normalized,
model_name=model_name,
max_tokens_per_chunk=max_tokens,
)
return chunks[0] if chunks else normalized
def _dedupe_non_empty(values: list[str], limit: int) -> list[str]:
"""Deduplicates and normalizes short text lists while preserving order."""
cleaned: list[str] = []
seen: set[str] = set()
for value in values:
normalized = value.strip()
if not normalized:
continue
lowered = normalized.lower()
if lowered in seen:
continue
seen.add(lowered)
cleaned.append(normalized)
if len(cleaned) >= limit:
break
return cleaned
def _normalize_with_catalog(values: list[str], catalog: list[str], limit: int) -> tuple[list[str], list[str]]:
"""Maps values to case-insensitive catalog entries and returns matched and novel values."""
if not catalog:
return _dedupe_non_empty(values, limit), []
catalog_map = {entry.strip().lower(): entry.strip() for entry in catalog if entry.strip()}
matched: list[str] = []
novel: list[str] = []
for value in values:
normalized = value.strip()
if not normalized:
continue
canonical = catalog_map.get(normalized.lower())
if canonical:
matched.append(canonical)
else:
novel.append(normalized)
return _dedupe_non_empty(matched, limit), _dedupe_non_empty(novel, limit)
def _coerce_json(payload: str) -> dict[str, Any]:
"""Extracts and parses a JSON object from model output text."""
text = payload.strip()
if not text:
return {}
try:
value = json.loads(text)
if isinstance(value, dict):
return value
except json.JSONDecodeError:
pass
fenced = re.search(r"```(?:json)?\s*(\{.*?\})\s*```", text, flags=re.DOTALL | re.IGNORECASE)
if fenced:
try:
value = json.loads(fenced.group(1))
if isinstance(value, dict):
return value
except json.JSONDecodeError:
pass
first_brace = text.find("{")
last_brace = text.rfind("}")
if first_brace >= 0 and last_brace > first_brace:
candidate = text[first_brace : last_brace + 1]
try:
value = json.loads(candidate)
if isinstance(value, dict):
return value
except json.JSONDecodeError:
return {}
return {}
def _render_neighbors(neighbors: list[SimilarDocument]) -> str:
"""Formats nearest-neighbor examples for classification task prompt input."""
if not neighbors:
return "None"
rows: list[str] = []
for index, neighbor in enumerate(neighbors, start=1):
similarity = _clamp_probability(1.0 - neighbor.vector_distance, fallback=0.0)
rows.append(
"\n".join(
[
f"Example {index}:",
f"- summary: {neighbor.summary_text[:1200]}",
f"- path: {neighbor.logical_path}",
f"- tags: {', '.join(neighbor.tags) if neighbor.tags else '(none)'}",
f"- similarity: {similarity:.6f}",
]
)
)
return "\n\n".join(rows)
def _list_allowed_paths(session: Session, current_document_id: UUID) -> list[str]:
"""Returns distinct existing logical paths to constrain route predictions."""
rows = session.execute(
select(Document.logical_path).where(
Document.id != current_document_id,
Document.status != DocumentStatus.TRASHED,
)
).scalars().all()
values = [row for row in rows if isinstance(row, str)]
values.extend(
str(item.get("value", "")).strip()
for item in read_predefined_paths_settings()
if str(item.get("value", "")).strip()
)
return _dedupe_non_empty(values, limit=400)
def _list_allowed_tags(session: Session, current_document_id: UUID) -> list[str]:
"""Returns distinct existing tags to constrain tag predictions."""
rows = session.execute(
select(Document.tags).where(
Document.id != current_document_id,
Document.status != DocumentStatus.TRASHED,
)
).scalars().all()
values: list[str] = []
for tag_list in rows:
if isinstance(tag_list, list):
values.extend([str(tag) for tag in tag_list])
values.extend(
str(item.get("value", "")).strip()
for item in read_predefined_tags_settings()
if str(item.get("value", "")).strip()
)
return _dedupe_non_empty(values, limit=500)
def _fallback_summary(document: Document) -> str:
"""Creates a deterministic local summary when LLM summarization is unavailable."""
lines = [
f"Filename: {document.original_filename}",
f"MIME Type: {document.mime_type}",
f"Extension: {document.extension}",
f"Logical Path: {document.logical_path}",
f"Tags: {', '.join(document.tags) if document.tags else '(none)'}",
]
if document.extracted_text.strip():
lines.append("Extracted Content:")
lines.append(document.extracted_text[:4000])
return "\n".join(lines).strip()
def _build_summary_user_payload(
document: Document,
extracted_text: str,
source_scope: str | None = None,
) -> str:
"""Builds one summary task payload with metadata and the current source segment."""
lines = [
"Document Metadata:",
f"- filename: {document.original_filename}",
f"- mime_type: {document.mime_type}",
f"- extension: {document.extension}",
f"- current_logical_path: {document.logical_path}",
f"- current_tags: {', '.join(document.tags) if document.tags else '(none)'}",
]
if source_scope:
lines.extend(
[
"",
f"Source Scope: {source_scope}",
]
)
lines.extend(
[
"",
"Extracted Text:",
extracted_text if extracted_text.strip() else "(none)",
]
)
return "\n".join(lines)
def _summary_chunk_prompt(base_prompt: str) -> str:
"""Builds a chunk-focused prompt preserving the configured summary behavior."""
return "\n".join(
[
base_prompt.strip(),
"",
"You are processing one chunk from a larger document.",
"Return concise markdown with factual entities, intent, and routing hints from this chunk only.",
"Do not invent facts.",
]
).strip()
def _summary_reduction_prompt(base_prompt: str) -> str:
"""Builds a merge-focused prompt for reducing partial summaries into one result."""
return "\n".join(
[
base_prompt.strip(),
"",
"You are merging partial summaries that belong to one document.",
"Produce one concise markdown summary with core entities, purpose, and routing hints.",
"Deduplicate repeated facts and keep only grounded evidence.",
]
).strip()
def summarize_document(session: Session, document: Document) -> str:
"""Generates an indexing summary using task settings with resilient local fallback."""
summary_runtime = read_task_runtime_settings(TASK_SUMMARY_GENERATION)
summary_task = summary_runtime["task"]
summary_provider = summary_runtime["provider"]
if not bool(summary_task.get("enabled", True)):
log_processing_event(
session=session,
stage="summary",
event="Summary task disabled, using deterministic fallback",
level="warning",
document=document,
provider_id=str(summary_provider.get("id", "")),
model_name=str(summary_task.get("model", "")),
)
return _fallback_summary(document)
model_name = str(summary_task.get("model", "")).strip()
max_input_tokens = _clamp_summary_input_tokens(
summary_task.get("max_input_tokens", summary_task.get("max_source_chars", 8000)),
fallback=8000,
)
metadata_only_payload = _build_summary_user_payload(document=document, extracted_text="(none)")
metadata_tokens = _estimate_token_count(metadata_only_payload, model_name)
chunk_token_budget = max(
SUMMARY_MIN_TEXT_CHUNK_TOKENS,
max_input_tokens - metadata_tokens - 256,
)
extracted_text = document.extracted_text.strip()
extracted_token_count = _estimate_token_count(extracted_text, model_name)
source_chunks = (
_split_text_by_tokens(extracted_text, model_name=model_name, max_tokens_per_chunk=chunk_token_budget)
if extracted_text
else []
)
chunk_count = max(1, len(source_chunks))
try:
log_processing_event(
session=session,
stage="summary",
event="Summary request started",
level="info",
document=document,
provider_id=str(summary_provider.get("id", "")),
model_name=str(summary_task.get("model", "")),
prompt_text=str(summary_task.get("prompt", "")),
payload_json={
"max_input_tokens": max_input_tokens,
"chunk_token_budget": chunk_token_budget,
"metadata_tokens_estimate": metadata_tokens,
"extracted_tokens_estimate": extracted_token_count,
"chunk_count": chunk_count,
"chunked": chunk_count > 1,
},
)
summary_prompt = str(summary_task.get("prompt", "")).strip()
if len(source_chunks) <= 1:
payload_text = source_chunks[0] if source_chunks else "(none)"
user_payload = _build_summary_user_payload(document=document, extracted_text=payload_text)
log_processing_event(
session=session,
stage="summary",
event="Summary model request started",
level="info",
document=document,
provider_id=str(summary_provider.get("id", "")),
model_name=str(summary_task.get("model", "")),
prompt_text=summary_prompt,
payload_json={"user_payload": user_payload},
)
summary = complete_text_task(TASK_SUMMARY_GENERATION, user_payload)
else:
chunk_prompt = _summary_chunk_prompt(summary_prompt)
reduction_prompt = _summary_reduction_prompt(summary_prompt)
partial_summaries: list[str] = []
for index, chunk_text in enumerate(source_chunks, start=1):
chunk_payload = _build_summary_user_payload(
document=document,
extracted_text=chunk_text,
source_scope=f"Source chunk {index}/{len(source_chunks)}",
)
log_processing_event(
session=session,
stage="summary",
event="Summary chunk request started",
level="info",
document=document,
provider_id=str(summary_provider.get("id", "")),
model_name=str(summary_task.get("model", "")),
prompt_text=chunk_prompt,
payload_json={
"chunk_index": index,
"chunk_total": len(source_chunks),
"chunk_tokens_estimate": _estimate_token_count(chunk_text, model_name),
"user_payload": chunk_payload,
},
)
chunk_summary = complete_text_task(
TASK_SUMMARY_GENERATION,
chunk_payload,
prompt_override=chunk_prompt,
).strip()
chunk_summary = chunk_summary or chunk_text[:2000]
partial_summaries.append(f"Chunk {index}:\n{chunk_summary}")
log_processing_event(
session=session,
stage="summary",
event="Summary chunk response received",
level="info",
document=document,
provider_id=str(summary_provider.get("id", "")),
model_name=str(summary_task.get("model", "")),
response_text=chunk_summary,
)
reduction_round = 0
reduced_segments = partial_summaries
while len(reduced_segments) > 1 and reduction_round < SUMMARY_REDUCTION_MAX_ROUNDS:
reduction_round += 1
next_segments: list[str] = []
grouped_segments = [
reduced_segments[index : index + SUMMARY_REDUCTION_GROUP_SIZE]
for index in range(0, len(reduced_segments), SUMMARY_REDUCTION_GROUP_SIZE)
]
for group_index, group_items in enumerate(grouped_segments, start=1):
merged_group = "\n\n".join(group_items)
reduced_group_input = _truncate_text_to_token_budget(
merged_group,
model_name=model_name,
max_tokens=chunk_token_budget,
)
group_payload = _build_summary_user_payload(
document=document,
extracted_text=reduced_group_input,
source_scope=(
f"Summary reduction round {reduction_round}, "
f"group {group_index}/{len(grouped_segments)}"
),
)
log_processing_event(
session=session,
stage="summary",
event="Summary reduction request started",
level="info",
document=document,
provider_id=str(summary_provider.get("id", "")),
model_name=str(summary_task.get("model", "")),
prompt_text=reduction_prompt,
payload_json={
"round": reduction_round,
"group_index": group_index,
"group_total": len(grouped_segments),
"user_payload": group_payload,
},
)
reduced = complete_text_task(
TASK_SUMMARY_GENERATION,
group_payload,
prompt_override=reduction_prompt,
).strip()
reduced = reduced or reduced_group_input[:2000]
next_segments.append(f"Group {group_index}:\n{reduced}")
log_processing_event(
session=session,
stage="summary",
event="Summary reduction response received",
level="info",
document=document,
provider_id=str(summary_provider.get("id", "")),
model_name=str(summary_task.get("model", "")),
response_text=reduced,
)
reduced_segments = next_segments
final_source = "\n\n".join(reduced_segments).strip() if reduced_segments else "(none)"
final_source = _truncate_text_to_token_budget(
final_source,
model_name=model_name,
max_tokens=chunk_token_budget,
) or "(none)"
final_payload = _build_summary_user_payload(
document=document,
extracted_text=final_source,
source_scope="Merged chunk summaries",
)
log_processing_event(
session=session,
stage="summary",
event="Summary final merge request started",
level="info",
document=document,
provider_id=str(summary_provider.get("id", "")),
model_name=str(summary_task.get("model", "")),
prompt_text=reduction_prompt,
payload_json={"user_payload": final_payload},
)
summary = complete_text_task(
TASK_SUMMARY_GENERATION,
final_payload,
prompt_override=reduction_prompt,
)
cleaned_summary = summary.strip() or _fallback_summary(document)
log_processing_event(
session=session,
stage="summary",
event="Summary response received",
level="info",
document=document,
provider_id=str(summary_provider.get("id", "")),
model_name=str(summary_task.get("model", "")),
response_text=cleaned_summary,
)
return cleaned_summary
except (ModelTaskError, ModelTaskDisabledError) as error:
log_processing_event(
session=session,
stage="summary",
event="Summary request failed, using deterministic fallback",
level="error",
document=document,
provider_id=str(summary_provider.get("id", "")),
model_name=str(summary_task.get("model", "")),
response_text=str(error),
)
return _fallback_summary(document)
def _build_routing_user_payload(
document: Document,
summary_text: str,
allowed_paths: list[str],
allowed_tags: list[str],
neighbors: list[SimilarDocument],
) -> str:
"""Builds structured user payload for LLM-based routing classification."""
return "\n".join(
[
"Target Document:",
f"- filename: {document.original_filename}",
f"- mime_type: {document.mime_type}",
f"- extension: {document.extension}",
f"- summary: {summary_text}",
"",
"Allowed Paths:",
", ".join(allowed_paths) if allowed_paths else "(none)",
"",
"Allowed Tags:",
", ".join(allowed_tags) if allowed_tags else "(none)",
"",
"Nearest Labeled Examples:",
_render_neighbors(neighbors),
]
)
def _build_neighbor_log_context(neighbors: list[SimilarDocument]) -> list[dict[str, Any]]:
"""Builds concise neighbor context used for routing request observability logs."""
return [
{
"document_id": neighbor.document_id,
"summary": neighbor.summary_text[:1200],
"path": neighbor.logical_path,
"tags": neighbor.tags[:10],
"similarity": round(
_clamp_probability(1.0 - neighbor.vector_distance, fallback=0.0),
4,
),
"distance": round(neighbor.vector_distance, 6),
}
for neighbor in neighbors
]
def _fallback_routing_from_neighbors(
neighbors: list[SimilarDocument],
allowed_paths: list[str],
allowed_tags: list[str],
neighbor_similarity: float,
) -> RoutingDecision:
"""Builds a deterministic fallback routing decision from semantic nearest neighbors."""
if not neighbors:
return RoutingDecision(
chosen_path=None,
chosen_tags=[],
suggested_new_paths=[],
suggested_new_tags=[],
confidence=0.0,
neighbor_similarity=neighbor_similarity,
)
best_neighbor = neighbors[0]
candidate_path = best_neighbor.logical_path.strip() if best_neighbor.logical_path.strip() else None
candidate_tags: list[str] = []
for neighbor in neighbors[:5]:
candidate_tags.extend(neighbor.tags)
chosen_path = candidate_path
suggested_new_paths: list[str] = []
if chosen_path and allowed_paths:
matched_paths, novel_paths = _normalize_with_catalog([chosen_path], allowed_paths, limit=1)
chosen_path = matched_paths[0] if matched_paths else None
suggested_new_paths = novel_paths
matched_tags, novel_tags = _normalize_with_catalog(candidate_tags, allowed_tags, limit=10)
confidence = max(0.0, min(0.72, neighbor_similarity))
return RoutingDecision(
chosen_path=chosen_path,
chosen_tags=matched_tags,
suggested_new_paths=suggested_new_paths,
suggested_new_tags=novel_tags,
confidence=confidence,
neighbor_similarity=neighbor_similarity,
)
def classify_document_routing(session: Session, document: Document, summary_text: str) -> RoutingDecision:
"""Classifies path and tags by combining semantic neighbors with LLM structured output."""
routing_runtime = read_task_runtime_settings(TASK_ROUTING_CLASSIFICATION)
routing_task = routing_runtime["task"]
routing_provider = routing_runtime["provider"]
if not bool(routing_task.get("enabled", True)):
log_processing_event(
session=session,
stage="routing",
event="Routing task disabled",
level="warning",
document=document,
provider_id=str(routing_provider.get("id", "")),
model_name=str(routing_task.get("model", "")),
)
return RoutingDecision(
chosen_path=None,
chosen_tags=[],
suggested_new_paths=[],
suggested_new_tags=[],
confidence=0.0,
neighbor_similarity=0.0,
)
neighbor_count = int(routing_task.get("neighbor_count", 8))
neighbor_min_similarity = _clamp_probability(
routing_task.get("neighbor_min_similarity", 0.84),
fallback=0.84,
)
neighbor_path_override_enabled = bool(routing_task.get("neighbor_path_override_enabled", True))
neighbor_path_override_min_similarity = _clamp_probability(
routing_task.get("neighbor_path_override_min_similarity", 0.86),
fallback=0.86,
)
neighbor_path_override_min_gap = _clamp_probability(
routing_task.get("neighbor_path_override_min_gap", 0.04),
fallback=0.04,
)
neighbor_path_override_max_confidence = _clamp_probability(
routing_task.get("neighbor_path_override_max_confidence", 0.9),
fallback=0.9,
)
log_processing_event(
session=session,
stage="routing",
event="Typesense neighbor query started",
level="info",
document=document,
payload_json={
"neighbor_count": neighbor_count,
"neighbor_min_similarity": neighbor_min_similarity,
"neighbor_mode": "semantic_text",
"summary_text_for_semantic_neighbors": summary_text,
},
)
all_neighbors = query_similar_documents(
summary_text=summary_text,
limit=neighbor_count,
exclude_document_id=str(document.id),
)
neighbors = [
neighbor
for neighbor in all_neighbors
if _clamp_probability(1.0 - neighbor.vector_distance, fallback=0.0) >= neighbor_min_similarity
]
neighbors_for_prompt = neighbors
nearest_any_distance = min([neighbor.vector_distance for neighbor in all_neighbors], default=2.0)
nearest_strong_distance = min([neighbor.vector_distance for neighbor in neighbors], default=2.0)
neighbor_similarity_any = _clamp_probability(1.0 - nearest_any_distance, fallback=0.0)
neighbor_similarity = _clamp_probability(1.0 - nearest_strong_distance, fallback=0.0)
log_processing_event(
session=session,
stage="routing",
event="Typesense neighbor query completed",
level="info",
document=document,
payload_json={
"neighbors_found": len(all_neighbors),
"neighbors_matching_threshold": len(neighbors),
"neighbor_candidates": _build_neighbor_log_context(all_neighbors),
},
)
allowed_paths = _list_allowed_paths(session, document.id)
allowed_tags = _list_allowed_tags(session, document.id)
user_payload = _build_routing_user_payload(
document=document,
summary_text=summary_text,
allowed_paths=allowed_paths,
allowed_tags=allowed_tags,
neighbors=neighbors_for_prompt,
)
llm_failed = False
try:
log_processing_event(
session=session,
stage="routing",
event="Routing request started",
level="info",
document=document,
provider_id=str(routing_provider.get("id", "")),
model_name=str(routing_task.get("model", "")),
prompt_text=str(routing_task.get("prompt", "")),
payload_json={
"neighbor_count": neighbor_count,
"neighbor_min_similarity": neighbor_min_similarity,
"available_paths": len(allowed_paths),
"available_tags": len(allowed_tags),
"neighbors_found": len(all_neighbors),
"neighbors_used": len(neighbors_for_prompt),
"nearest_similarity_any": neighbor_similarity_any,
"nearest_similarity_strong": neighbor_similarity,
"allowed_paths": allowed_paths,
"allowed_tags": allowed_tags,
"neighbor_examples": _build_neighbor_log_context(neighbors_for_prompt),
"user_payload": user_payload,
},
)
raw_output = complete_text_task(TASK_ROUTING_CLASSIFICATION, user_payload)
log_processing_event(
session=session,
stage="routing",
event="Routing response received",
level="info",
document=document,
provider_id=str(routing_provider.get("id", "")),
model_name=str(routing_task.get("model", "")),
response_text=raw_output,
)
except (ModelTaskError, ModelTaskDisabledError) as error:
raw_output = "{}"
llm_failed = True
log_processing_event(
session=session,
stage="routing",
event="Routing model request failed, continuing with fallback",
level="error",
document=document,
provider_id=str(routing_provider.get("id", "")),
model_name=str(routing_task.get("model", "")),
response_text=str(error),
)
parsed = _coerce_json(raw_output)
chosen_path = parsed.get("chosen_path")
if not isinstance(chosen_path, str):
chosen_path = None
elif chosen_path.strip():
chosen_path = chosen_path.strip()
else:
chosen_path = None
raw_tags = parsed.get("chosen_tags", [])
chosen_tags = [str(tag) for tag in raw_tags] if isinstance(raw_tags, list) else []
chosen_tags = _dedupe_non_empty(chosen_tags, limit=30)
raw_suggested_paths = parsed.get("suggested_new_paths", [])
suggested_new_paths = [str(path) for path in raw_suggested_paths] if isinstance(raw_suggested_paths, list) else []
suggested_new_paths = _dedupe_non_empty(suggested_new_paths, limit=10)
raw_suggested_tags = parsed.get("suggested_new_tags", [])
suggested_new_tags = [str(tag) for tag in raw_suggested_tags] if isinstance(raw_suggested_tags, list) else []
suggested_new_tags = _dedupe_non_empty(suggested_new_tags, limit=20)
confidence = _clamp_probability(parsed.get("confidence", 0.0), fallback=0.0)
if chosen_path and allowed_paths:
matched_paths, novel_paths = _normalize_with_catalog([chosen_path], allowed_paths, limit=1)
if matched_paths:
chosen_path = matched_paths[0]
else:
chosen_path = None
suggested_new_paths = _dedupe_non_empty([*suggested_new_paths, *novel_paths], limit=10)
matched_tags, novel_tags = _normalize_with_catalog(chosen_tags, allowed_tags, limit=30)
chosen_tags = matched_tags
suggested_new_tags = _dedupe_non_empty([*suggested_new_tags, *novel_tags], limit=20)
if neighbors:
best_neighbor = neighbors[0]
best_neighbor_path = best_neighbor.logical_path.strip()
matched_best_paths, _ = _normalize_with_catalog([best_neighbor_path], allowed_paths, limit=1)
best_neighbor_path = matched_best_paths[0] if matched_best_paths else best_neighbor_path
best_neighbor_similarity = _clamp_probability(1.0 - best_neighbor.vector_distance, fallback=0.0)
second_neighbor_similarity = _clamp_probability(1.0 - neighbors[1].vector_distance, fallback=0.0) if len(neighbors) > 1 else 0.0
similarity_gap = best_neighbor_similarity - second_neighbor_similarity
override_similarity_min = max(neighbor_min_similarity, neighbor_path_override_min_similarity)
should_override_path = (
neighbor_path_override_enabled
and
chosen_path is not None
and best_neighbor_path.strip() != ""
and chosen_path.strip().lower() != best_neighbor_path.strip().lower()
and confidence < neighbor_path_override_max_confidence
and best_neighbor_similarity >= override_similarity_min
and similarity_gap >= neighbor_path_override_min_gap
)
if should_override_path:
overridden_path = chosen_path
suggested_new_paths = _dedupe_non_empty([*suggested_new_paths, chosen_path], limit=10)
chosen_path = best_neighbor_path
confidence = max(confidence, min(0.86, best_neighbor_similarity))
log_processing_event(
session=session,
stage="routing",
event="Chosen path overridden by dominant nearest neighbor",
level="warning",
document=document,
provider_id=str(routing_provider.get("id", "")),
model_name=str(routing_task.get("model", "")),
payload_json={
"overridden_path": overridden_path,
"best_neighbor_path": best_neighbor_path,
"best_neighbor_similarity": best_neighbor_similarity,
"second_neighbor_similarity": second_neighbor_similarity,
"similarity_gap": similarity_gap,
"llm_confidence": _clamp_probability(parsed.get("confidence", 0.0), fallback=0.0),
},
)
has_best_neighbor_suggested_path = any(
path.strip().lower() == best_neighbor_path.strip().lower()
for path in suggested_new_paths
)
first_suggested_path = suggested_new_paths[0].strip().lower() if suggested_new_paths else ""
should_prioritize_neighbor_suggestion = (
neighbor_path_override_enabled
and
chosen_path is None
and best_neighbor_path.strip() != ""
and confidence < neighbor_path_override_max_confidence
and best_neighbor_similarity >= override_similarity_min
and similarity_gap >= neighbor_path_override_min_gap
and (
not has_best_neighbor_suggested_path
or first_suggested_path != best_neighbor_path.strip().lower()
)
)
if should_prioritize_neighbor_suggestion:
suggested_new_paths = _dedupe_non_empty([best_neighbor_path, *suggested_new_paths], limit=10)
log_processing_event(
session=session,
stage="routing",
event="Suggested path prioritized from dominant nearest neighbor",
level="warning",
document=document,
provider_id=str(routing_provider.get("id", "")),
model_name=str(routing_task.get("model", "")),
payload_json={
"best_neighbor_path": best_neighbor_path,
"best_neighbor_similarity": best_neighbor_similarity,
"second_neighbor_similarity": second_neighbor_similarity,
"similarity_gap": similarity_gap,
"llm_confidence": _clamp_probability(parsed.get("confidence", 0.0), fallback=0.0),
"prioritized_suggested_new_paths": suggested_new_paths,
},
)
if len(neighbors) == 0 and len(all_neighbors) > 0:
log_processing_event(
session=session,
stage="routing",
event="No neighbors met minimum similarity; routing proceeded without neighbor examples",
level="warning",
document=document,
provider_id=str(routing_provider.get("id", "")),
model_name=str(routing_task.get("model", "")),
payload_json={
"neighbor_min_similarity": neighbor_min_similarity,
"neighbors_found": len(all_neighbors),
},
)
llm_has_path_signal = chosen_path is not None or len(suggested_new_paths) > 0
llm_has_tag_signal = len(chosen_tags) > 0 or len(suggested_new_tags) > 0
llm_has_any_signal = llm_has_path_signal or llm_has_tag_signal
if llm_failed or not llm_has_any_signal or not llm_has_path_signal or not llm_has_tag_signal:
fallback = _fallback_routing_from_neighbors(
neighbors=neighbors,
allowed_paths=allowed_paths,
allowed_tags=allowed_tags,
neighbor_similarity=neighbor_similarity,
)
if fallback.chosen_path or fallback.chosen_tags or fallback.suggested_new_paths or fallback.suggested_new_tags:
if not llm_has_path_signal and fallback.chosen_path:
chosen_path = fallback.chosen_path
if not llm_has_path_signal and fallback.suggested_new_paths:
suggested_new_paths = _dedupe_non_empty(
[*suggested_new_paths, *fallback.suggested_new_paths],
limit=10,
)
if not llm_has_tag_signal:
if llm_failed or not llm_has_any_signal:
chosen_tags = _dedupe_non_empty([*chosen_tags, *fallback.chosen_tags], limit=30)
else:
suggested_new_tags = _dedupe_non_empty(
[*suggested_new_tags, *fallback.chosen_tags],
limit=20,
)
suggested_new_tags = _dedupe_non_empty(
[*suggested_new_tags, *fallback.suggested_new_tags],
limit=20,
)
confidence = max(confidence, fallback.confidence)
decision = RoutingDecision(
chosen_path=chosen_path,
chosen_tags=chosen_tags,
suggested_new_paths=suggested_new_paths,
suggested_new_tags=suggested_new_tags,
confidence=confidence,
neighbor_similarity=neighbor_similarity,
)
log_processing_event(
session=session,
stage="routing",
event="Routing decision prepared",
level="info",
document=document,
provider_id=str(routing_provider.get("id", "")),
model_name=str(routing_task.get("model", "")),
payload_json={
"chosen_path": decision.chosen_path,
"chosen_tags": decision.chosen_tags,
"suggested_new_paths": decision.suggested_new_paths,
"suggested_new_tags": decision.suggested_new_tags,
"confidence": decision.confidence,
"neighbor_similarity": decision.neighbor_similarity,
"nearest_similarity_any": neighbor_similarity_any,
},
)
return decision
def apply_routing_decision(document: Document, decision: RoutingDecision, session: Session | None = None) -> None:
"""Applies auto-routing when thresholds are met and stores suggestion metadata otherwise."""
routing_task = read_task_runtime_settings(TASK_ROUTING_CLASSIFICATION)["task"]
confidence_threshold = _clamp_probability(routing_task.get("auto_apply_confidence_threshold", 0.78), 0.78)
neighbor_threshold = _clamp_probability(
routing_task.get("auto_apply_neighbor_similarity_threshold", 0.55),
0.55,
)
style_cluster_id = (
document.handwriting_style_id
if document.image_text_type == "handwriting" and document.handwriting_style_id
else None
)
style_path_prefix = style_cluster_id
if style_cluster_id and session is not None:
style_path_prefix = resolve_handwriting_style_path_prefix(
session=session,
style_cluster_id=style_cluster_id,
exclude_document_id=str(document.id),
)
chosen_path = apply_handwriting_style_path(style_cluster_id=style_path_prefix, path_value=decision.chosen_path)
suggested_new_paths = _dedupe_non_empty(
[
styled_path
for styled_path in [
apply_handwriting_style_path(style_cluster_id=style_path_prefix, path_value=path)
for path in decision.suggested_new_paths
]
if styled_path
],
limit=10,
)
has_chosen_path = bool(chosen_path)
confidence_passed = decision.confidence >= confidence_threshold
auto_apply_blocked_reasons: list[str] = []
if not has_chosen_path:
auto_apply_blocked_reasons.append("missing_chosen_path")
if not confidence_passed:
auto_apply_blocked_reasons.append("confidence_below_threshold")
auto_applied_path = False
auto_applied_tags = False
auto_applied = False
if has_chosen_path and confidence_passed:
document.logical_path = chosen_path
auto_applied_path = True
routed_tags_for_apply = _dedupe_non_empty(
[*decision.chosen_tags, *decision.suggested_new_tags],
limit=50,
)
if routed_tags_for_apply:
document.tags = routed_tags_for_apply
auto_applied_tags = True
auto_applied = True
metadata_json = dict(document.metadata_json)
metadata_json["routing"] = {
"auto_applied": auto_applied,
"auto_applied_path": auto_applied_path,
"auto_applied_tags": auto_applied_tags,
"confidence": decision.confidence,
"neighbor_similarity": decision.neighbor_similarity,
"auto_apply_confidence_threshold": confidence_threshold,
"auto_apply_neighbor_similarity_threshold": neighbor_threshold,
"auto_apply_checks": {
"has_chosen_path": has_chosen_path,
"confidence_passed": confidence_passed,
"neighbor_similarity_observed": decision.neighbor_similarity,
"neighbor_similarity_gate_enabled": False,
},
"auto_apply_blocked_reasons": auto_apply_blocked_reasons,
"chosen_path": chosen_path,
"base_chosen_path": decision.chosen_path,
"chosen_tags": decision.chosen_tags,
"suggested_new_paths": suggested_new_paths,
"base_suggested_new_paths": decision.suggested_new_paths,
"suggested_new_tags": decision.suggested_new_tags,
"handwriting_style_id": style_cluster_id,
"handwriting_style_path_prefix": style_path_prefix,
}
document.metadata_json = metadata_json
unresolved_path = suggested_new_paths[0] if suggested_new_paths else None
unresolved_tags = _dedupe_non_empty(decision.suggested_new_tags, limit=50)
if not auto_applied:
document.suggested_path = chosen_path or unresolved_path
document.suggested_tags = _dedupe_non_empty(
[*decision.chosen_tags, *unresolved_tags],
limit=50,
)
else:
document.suggested_path = unresolved_path
document.suggested_tags = [] if auto_applied_tags else unresolved_tags
def upsert_semantic_index(document: Document, summary_text: str) -> None:
"""Upserts semantic search document representation in Typesense."""
upsert_document_index(document=document, summary_text=summary_text)