1130 lines
44 KiB
Python
1130 lines
44 KiB
Python
"""Document summarization, semantic-neighbor lookup, and LLM routing decisions."""
|
|
|
|
import json
|
|
import re
|
|
from dataclasses import dataclass
|
|
from functools import lru_cache
|
|
from typing import Any
|
|
from uuid import UUID
|
|
|
|
from sqlalchemy import select
|
|
from sqlalchemy.orm import Session
|
|
|
|
from app.models.document import Document, DocumentStatus
|
|
from app.services.app_settings import (
|
|
TASK_ROUTING_CLASSIFICATION,
|
|
TASK_SUMMARY_GENERATION,
|
|
read_predefined_paths_settings,
|
|
read_predefined_tags_settings,
|
|
read_task_runtime_settings,
|
|
)
|
|
from app.services.handwriting_style import apply_handwriting_style_path, resolve_handwriting_style_path_prefix
|
|
from app.services.model_runtime import ModelTaskDisabledError, ModelTaskError, complete_text_task
|
|
from app.services.processing_logs import log_processing_event
|
|
from app.services.typesense_index import SimilarDocument, query_similar_documents, upsert_document_index
|
|
|
|
try:
|
|
import tiktoken
|
|
except ImportError: # pragma: no cover
|
|
tiktoken = None
|
|
|
|
|
|
SUMMARY_MIN_INPUT_TOKENS = 512
|
|
SUMMARY_MAX_INPUT_TOKENS = 64000
|
|
SUMMARY_MIN_TEXT_CHUNK_TOKENS = 256
|
|
SUMMARY_REDUCTION_MAX_ROUNDS = 4
|
|
SUMMARY_REDUCTION_GROUP_SIZE = 8
|
|
|
|
|
|
@dataclass
|
|
class RoutingDecision:
|
|
"""Represents LLM-produced routing output enriched with neighbor similarity signals."""
|
|
|
|
chosen_path: str | None
|
|
chosen_tags: list[str]
|
|
suggested_new_paths: list[str]
|
|
suggested_new_tags: list[str]
|
|
confidence: float
|
|
neighbor_similarity: float
|
|
|
|
|
|
def _clamp_probability(value: Any, fallback: float = 0.0) -> float:
|
|
"""Clamps a numeric value into [0, 1] for confidence-like fields."""
|
|
|
|
try:
|
|
parsed = float(value)
|
|
except (TypeError, ValueError):
|
|
return fallback
|
|
return max(0.0, min(1.0, parsed))
|
|
|
|
|
|
def _clamp_summary_input_tokens(value: Any, fallback: int = 8000) -> int:
|
|
"""Clamps summary input token limits to bounded runtime-safe values."""
|
|
|
|
try:
|
|
parsed = int(value)
|
|
except (TypeError, ValueError):
|
|
return fallback
|
|
return max(SUMMARY_MIN_INPUT_TOKENS, min(SUMMARY_MAX_INPUT_TOKENS, parsed))
|
|
|
|
|
|
@lru_cache(maxsize=128)
|
|
def _resolve_encoding_name(model_name: str) -> str:
|
|
"""Resolves the tokenizer encoding name for a model using tiktoken with fallback."""
|
|
|
|
if tiktoken is None:
|
|
return ""
|
|
|
|
candidate = model_name.strip()
|
|
if candidate:
|
|
try:
|
|
return tiktoken.encoding_for_model(candidate).name
|
|
except Exception:
|
|
pass
|
|
return "cl100k_base"
|
|
|
|
|
|
def _estimate_token_count(text: str, model_name: str) -> int:
|
|
"""Estimates token count for text using model-aware tokenization with safe fallback."""
|
|
|
|
normalized = text.strip()
|
|
if not normalized:
|
|
return 0
|
|
if tiktoken is None:
|
|
return max(1, len(normalized) // 4)
|
|
|
|
try:
|
|
encoding = tiktoken.get_encoding(_resolve_encoding_name(model_name))
|
|
return len(encoding.encode(normalized, disallowed_special=()))
|
|
except Exception:
|
|
return max(1, len(normalized) // 4)
|
|
|
|
|
|
def _split_text_by_tokens(text: str, model_name: str, max_tokens_per_chunk: int) -> list[str]:
|
|
"""Splits text into token-bounded chunks while preserving full source coverage."""
|
|
|
|
normalized = text.strip()
|
|
if not normalized:
|
|
return []
|
|
|
|
chunk_tokens = max(SUMMARY_MIN_TEXT_CHUNK_TOKENS, int(max_tokens_per_chunk))
|
|
if tiktoken is None:
|
|
approx_chars = max(1200, chunk_tokens * 4)
|
|
return [normalized[index : index + approx_chars] for index in range(0, len(normalized), approx_chars)]
|
|
|
|
try:
|
|
encoding = tiktoken.get_encoding(_resolve_encoding_name(model_name))
|
|
token_ids = encoding.encode(normalized, disallowed_special=())
|
|
if not token_ids:
|
|
return []
|
|
chunks: list[str] = []
|
|
for start in range(0, len(token_ids), chunk_tokens):
|
|
part = encoding.decode(token_ids[start : start + chunk_tokens]).strip()
|
|
if part:
|
|
chunks.append(part)
|
|
return chunks
|
|
except Exception:
|
|
approx_chars = max(1200, chunk_tokens * 4)
|
|
return [normalized[index : index + approx_chars] for index in range(0, len(normalized), approx_chars)]
|
|
|
|
|
|
def _truncate_text_to_token_budget(text: str, model_name: str, max_tokens: int) -> str:
|
|
"""Truncates text to fit within a token budget while retaining original ordering."""
|
|
|
|
normalized = text.strip()
|
|
if not normalized:
|
|
return ""
|
|
chunks = _split_text_by_tokens(
|
|
normalized,
|
|
model_name=model_name,
|
|
max_tokens_per_chunk=max_tokens,
|
|
)
|
|
return chunks[0] if chunks else normalized
|
|
|
|
|
|
def _dedupe_non_empty(values: list[str], limit: int) -> list[str]:
|
|
"""Deduplicates and normalizes short text lists while preserving order."""
|
|
|
|
cleaned: list[str] = []
|
|
seen: set[str] = set()
|
|
for value in values:
|
|
normalized = value.strip()
|
|
if not normalized:
|
|
continue
|
|
lowered = normalized.lower()
|
|
if lowered in seen:
|
|
continue
|
|
seen.add(lowered)
|
|
cleaned.append(normalized)
|
|
if len(cleaned) >= limit:
|
|
break
|
|
return cleaned
|
|
|
|
|
|
def _normalize_with_catalog(values: list[str], catalog: list[str], limit: int) -> tuple[list[str], list[str]]:
|
|
"""Maps values to case-insensitive catalog entries and returns matched and novel values."""
|
|
|
|
if not catalog:
|
|
return _dedupe_non_empty(values, limit), []
|
|
|
|
catalog_map = {entry.strip().lower(): entry.strip() for entry in catalog if entry.strip()}
|
|
matched: list[str] = []
|
|
novel: list[str] = []
|
|
for value in values:
|
|
normalized = value.strip()
|
|
if not normalized:
|
|
continue
|
|
canonical = catalog_map.get(normalized.lower())
|
|
if canonical:
|
|
matched.append(canonical)
|
|
else:
|
|
novel.append(normalized)
|
|
return _dedupe_non_empty(matched, limit), _dedupe_non_empty(novel, limit)
|
|
|
|
|
|
def _coerce_json(payload: str) -> dict[str, Any]:
|
|
"""Extracts and parses a JSON object from model output text."""
|
|
|
|
text = payload.strip()
|
|
if not text:
|
|
return {}
|
|
|
|
try:
|
|
value = json.loads(text)
|
|
if isinstance(value, dict):
|
|
return value
|
|
except json.JSONDecodeError:
|
|
pass
|
|
|
|
fenced = re.search(r"```(?:json)?\s*(\{.*?\})\s*```", text, flags=re.DOTALL | re.IGNORECASE)
|
|
if fenced:
|
|
try:
|
|
value = json.loads(fenced.group(1))
|
|
if isinstance(value, dict):
|
|
return value
|
|
except json.JSONDecodeError:
|
|
pass
|
|
|
|
first_brace = text.find("{")
|
|
last_brace = text.rfind("}")
|
|
if first_brace >= 0 and last_brace > first_brace:
|
|
candidate = text[first_brace : last_brace + 1]
|
|
try:
|
|
value = json.loads(candidate)
|
|
if isinstance(value, dict):
|
|
return value
|
|
except json.JSONDecodeError:
|
|
return {}
|
|
|
|
return {}
|
|
|
|
|
|
def _render_neighbors(neighbors: list[SimilarDocument]) -> str:
|
|
"""Formats nearest-neighbor examples for classification task prompt input."""
|
|
|
|
if not neighbors:
|
|
return "None"
|
|
|
|
rows: list[str] = []
|
|
for index, neighbor in enumerate(neighbors, start=1):
|
|
similarity = _clamp_probability(1.0 - neighbor.vector_distance, fallback=0.0)
|
|
rows.append(
|
|
"\n".join(
|
|
[
|
|
f"Example {index}:",
|
|
f"- summary: {neighbor.summary_text[:1200]}",
|
|
f"- path: {neighbor.logical_path}",
|
|
f"- tags: {', '.join(neighbor.tags) if neighbor.tags else '(none)'}",
|
|
f"- similarity: {similarity:.6f}",
|
|
]
|
|
)
|
|
)
|
|
return "\n\n".join(rows)
|
|
|
|
|
|
def _list_allowed_paths(session: Session, current_document_id: UUID) -> list[str]:
|
|
"""Returns distinct existing logical paths to constrain route predictions."""
|
|
|
|
rows = session.execute(
|
|
select(Document.logical_path).where(
|
|
Document.id != current_document_id,
|
|
Document.status != DocumentStatus.TRASHED,
|
|
)
|
|
).scalars().all()
|
|
values = [row for row in rows if isinstance(row, str)]
|
|
values.extend(
|
|
str(item.get("value", "")).strip()
|
|
for item in read_predefined_paths_settings()
|
|
if str(item.get("value", "")).strip()
|
|
)
|
|
return _dedupe_non_empty(values, limit=400)
|
|
|
|
|
|
def _list_allowed_tags(session: Session, current_document_id: UUID) -> list[str]:
|
|
"""Returns distinct existing tags to constrain tag predictions."""
|
|
|
|
rows = session.execute(
|
|
select(Document.tags).where(
|
|
Document.id != current_document_id,
|
|
Document.status != DocumentStatus.TRASHED,
|
|
)
|
|
).scalars().all()
|
|
|
|
values: list[str] = []
|
|
for tag_list in rows:
|
|
if isinstance(tag_list, list):
|
|
values.extend([str(tag) for tag in tag_list])
|
|
values.extend(
|
|
str(item.get("value", "")).strip()
|
|
for item in read_predefined_tags_settings()
|
|
if str(item.get("value", "")).strip()
|
|
)
|
|
return _dedupe_non_empty(values, limit=500)
|
|
|
|
|
|
def _fallback_summary(document: Document) -> str:
|
|
"""Creates a deterministic local summary when LLM summarization is unavailable."""
|
|
|
|
lines = [
|
|
f"Filename: {document.original_filename}",
|
|
f"MIME Type: {document.mime_type}",
|
|
f"Extension: {document.extension}",
|
|
f"Logical Path: {document.logical_path}",
|
|
f"Tags: {', '.join(document.tags) if document.tags else '(none)'}",
|
|
]
|
|
if document.extracted_text.strip():
|
|
lines.append("Extracted Content:")
|
|
lines.append(document.extracted_text[:4000])
|
|
return "\n".join(lines).strip()
|
|
|
|
|
|
def _build_summary_user_payload(
|
|
document: Document,
|
|
extracted_text: str,
|
|
source_scope: str | None = None,
|
|
) -> str:
|
|
"""Builds one summary task payload with metadata and the current source segment."""
|
|
|
|
lines = [
|
|
"Document Metadata:",
|
|
f"- filename: {document.original_filename}",
|
|
f"- mime_type: {document.mime_type}",
|
|
f"- extension: {document.extension}",
|
|
f"- current_logical_path: {document.logical_path}",
|
|
f"- current_tags: {', '.join(document.tags) if document.tags else '(none)'}",
|
|
]
|
|
if source_scope:
|
|
lines.extend(
|
|
[
|
|
"",
|
|
f"Source Scope: {source_scope}",
|
|
]
|
|
)
|
|
lines.extend(
|
|
[
|
|
"",
|
|
"Extracted Text:",
|
|
extracted_text if extracted_text.strip() else "(none)",
|
|
]
|
|
)
|
|
return "\n".join(lines)
|
|
|
|
|
|
def _summary_chunk_prompt(base_prompt: str) -> str:
|
|
"""Builds a chunk-focused prompt preserving the configured summary behavior."""
|
|
|
|
return "\n".join(
|
|
[
|
|
base_prompt.strip(),
|
|
"",
|
|
"You are processing one chunk from a larger document.",
|
|
"Return concise markdown with factual entities, intent, and routing hints from this chunk only.",
|
|
"Do not invent facts.",
|
|
]
|
|
).strip()
|
|
|
|
|
|
def _summary_reduction_prompt(base_prompt: str) -> str:
|
|
"""Builds a merge-focused prompt for reducing partial summaries into one result."""
|
|
|
|
return "\n".join(
|
|
[
|
|
base_prompt.strip(),
|
|
"",
|
|
"You are merging partial summaries that belong to one document.",
|
|
"Produce one concise markdown summary with core entities, purpose, and routing hints.",
|
|
"Deduplicate repeated facts and keep only grounded evidence.",
|
|
]
|
|
).strip()
|
|
|
|
|
|
def summarize_document(session: Session, document: Document) -> str:
|
|
"""Generates an indexing summary using task settings with resilient local fallback."""
|
|
|
|
summary_runtime = read_task_runtime_settings(TASK_SUMMARY_GENERATION)
|
|
summary_task = summary_runtime["task"]
|
|
summary_provider = summary_runtime["provider"]
|
|
if not bool(summary_task.get("enabled", True)):
|
|
log_processing_event(
|
|
session=session,
|
|
stage="summary",
|
|
event="Summary task disabled, using deterministic fallback",
|
|
level="warning",
|
|
document=document,
|
|
provider_id=str(summary_provider.get("id", "")),
|
|
model_name=str(summary_task.get("model", "")),
|
|
)
|
|
return _fallback_summary(document)
|
|
|
|
model_name = str(summary_task.get("model", "")).strip()
|
|
max_input_tokens = _clamp_summary_input_tokens(
|
|
summary_task.get("max_input_tokens", summary_task.get("max_source_chars", 8000)),
|
|
fallback=8000,
|
|
)
|
|
metadata_only_payload = _build_summary_user_payload(document=document, extracted_text="(none)")
|
|
metadata_tokens = _estimate_token_count(metadata_only_payload, model_name)
|
|
chunk_token_budget = max(
|
|
SUMMARY_MIN_TEXT_CHUNK_TOKENS,
|
|
max_input_tokens - metadata_tokens - 256,
|
|
)
|
|
extracted_text = document.extracted_text.strip()
|
|
extracted_token_count = _estimate_token_count(extracted_text, model_name)
|
|
source_chunks = (
|
|
_split_text_by_tokens(extracted_text, model_name=model_name, max_tokens_per_chunk=chunk_token_budget)
|
|
if extracted_text
|
|
else []
|
|
)
|
|
chunk_count = max(1, len(source_chunks))
|
|
|
|
try:
|
|
log_processing_event(
|
|
session=session,
|
|
stage="summary",
|
|
event="Summary request started",
|
|
level="info",
|
|
document=document,
|
|
provider_id=str(summary_provider.get("id", "")),
|
|
model_name=str(summary_task.get("model", "")),
|
|
prompt_text=str(summary_task.get("prompt", "")),
|
|
payload_json={
|
|
"max_input_tokens": max_input_tokens,
|
|
"chunk_token_budget": chunk_token_budget,
|
|
"metadata_tokens_estimate": metadata_tokens,
|
|
"extracted_tokens_estimate": extracted_token_count,
|
|
"chunk_count": chunk_count,
|
|
"chunked": chunk_count > 1,
|
|
},
|
|
)
|
|
summary_prompt = str(summary_task.get("prompt", "")).strip()
|
|
if len(source_chunks) <= 1:
|
|
payload_text = source_chunks[0] if source_chunks else "(none)"
|
|
user_payload = _build_summary_user_payload(document=document, extracted_text=payload_text)
|
|
log_processing_event(
|
|
session=session,
|
|
stage="summary",
|
|
event="Summary model request started",
|
|
level="info",
|
|
document=document,
|
|
provider_id=str(summary_provider.get("id", "")),
|
|
model_name=str(summary_task.get("model", "")),
|
|
prompt_text=summary_prompt,
|
|
payload_json={"user_payload": user_payload},
|
|
)
|
|
summary = complete_text_task(TASK_SUMMARY_GENERATION, user_payload)
|
|
else:
|
|
chunk_prompt = _summary_chunk_prompt(summary_prompt)
|
|
reduction_prompt = _summary_reduction_prompt(summary_prompt)
|
|
partial_summaries: list[str] = []
|
|
|
|
for index, chunk_text in enumerate(source_chunks, start=1):
|
|
chunk_payload = _build_summary_user_payload(
|
|
document=document,
|
|
extracted_text=chunk_text,
|
|
source_scope=f"Source chunk {index}/{len(source_chunks)}",
|
|
)
|
|
log_processing_event(
|
|
session=session,
|
|
stage="summary",
|
|
event="Summary chunk request started",
|
|
level="info",
|
|
document=document,
|
|
provider_id=str(summary_provider.get("id", "")),
|
|
model_name=str(summary_task.get("model", "")),
|
|
prompt_text=chunk_prompt,
|
|
payload_json={
|
|
"chunk_index": index,
|
|
"chunk_total": len(source_chunks),
|
|
"chunk_tokens_estimate": _estimate_token_count(chunk_text, model_name),
|
|
"user_payload": chunk_payload,
|
|
},
|
|
)
|
|
chunk_summary = complete_text_task(
|
|
TASK_SUMMARY_GENERATION,
|
|
chunk_payload,
|
|
prompt_override=chunk_prompt,
|
|
).strip()
|
|
chunk_summary = chunk_summary or chunk_text[:2000]
|
|
partial_summaries.append(f"Chunk {index}:\n{chunk_summary}")
|
|
log_processing_event(
|
|
session=session,
|
|
stage="summary",
|
|
event="Summary chunk response received",
|
|
level="info",
|
|
document=document,
|
|
provider_id=str(summary_provider.get("id", "")),
|
|
model_name=str(summary_task.get("model", "")),
|
|
response_text=chunk_summary,
|
|
)
|
|
|
|
reduction_round = 0
|
|
reduced_segments = partial_summaries
|
|
while len(reduced_segments) > 1 and reduction_round < SUMMARY_REDUCTION_MAX_ROUNDS:
|
|
reduction_round += 1
|
|
next_segments: list[str] = []
|
|
grouped_segments = [
|
|
reduced_segments[index : index + SUMMARY_REDUCTION_GROUP_SIZE]
|
|
for index in range(0, len(reduced_segments), SUMMARY_REDUCTION_GROUP_SIZE)
|
|
]
|
|
for group_index, group_items in enumerate(grouped_segments, start=1):
|
|
merged_group = "\n\n".join(group_items)
|
|
reduced_group_input = _truncate_text_to_token_budget(
|
|
merged_group,
|
|
model_name=model_name,
|
|
max_tokens=chunk_token_budget,
|
|
)
|
|
group_payload = _build_summary_user_payload(
|
|
document=document,
|
|
extracted_text=reduced_group_input,
|
|
source_scope=(
|
|
f"Summary reduction round {reduction_round}, "
|
|
f"group {group_index}/{len(grouped_segments)}"
|
|
),
|
|
)
|
|
log_processing_event(
|
|
session=session,
|
|
stage="summary",
|
|
event="Summary reduction request started",
|
|
level="info",
|
|
document=document,
|
|
provider_id=str(summary_provider.get("id", "")),
|
|
model_name=str(summary_task.get("model", "")),
|
|
prompt_text=reduction_prompt,
|
|
payload_json={
|
|
"round": reduction_round,
|
|
"group_index": group_index,
|
|
"group_total": len(grouped_segments),
|
|
"user_payload": group_payload,
|
|
},
|
|
)
|
|
reduced = complete_text_task(
|
|
TASK_SUMMARY_GENERATION,
|
|
group_payload,
|
|
prompt_override=reduction_prompt,
|
|
).strip()
|
|
reduced = reduced or reduced_group_input[:2000]
|
|
next_segments.append(f"Group {group_index}:\n{reduced}")
|
|
log_processing_event(
|
|
session=session,
|
|
stage="summary",
|
|
event="Summary reduction response received",
|
|
level="info",
|
|
document=document,
|
|
provider_id=str(summary_provider.get("id", "")),
|
|
model_name=str(summary_task.get("model", "")),
|
|
response_text=reduced,
|
|
)
|
|
reduced_segments = next_segments
|
|
|
|
final_source = "\n\n".join(reduced_segments).strip() if reduced_segments else "(none)"
|
|
final_source = _truncate_text_to_token_budget(
|
|
final_source,
|
|
model_name=model_name,
|
|
max_tokens=chunk_token_budget,
|
|
) or "(none)"
|
|
final_payload = _build_summary_user_payload(
|
|
document=document,
|
|
extracted_text=final_source,
|
|
source_scope="Merged chunk summaries",
|
|
)
|
|
log_processing_event(
|
|
session=session,
|
|
stage="summary",
|
|
event="Summary final merge request started",
|
|
level="info",
|
|
document=document,
|
|
provider_id=str(summary_provider.get("id", "")),
|
|
model_name=str(summary_task.get("model", "")),
|
|
prompt_text=reduction_prompt,
|
|
payload_json={"user_payload": final_payload},
|
|
)
|
|
summary = complete_text_task(
|
|
TASK_SUMMARY_GENERATION,
|
|
final_payload,
|
|
prompt_override=reduction_prompt,
|
|
)
|
|
cleaned_summary = summary.strip() or _fallback_summary(document)
|
|
log_processing_event(
|
|
session=session,
|
|
stage="summary",
|
|
event="Summary response received",
|
|
level="info",
|
|
document=document,
|
|
provider_id=str(summary_provider.get("id", "")),
|
|
model_name=str(summary_task.get("model", "")),
|
|
response_text=cleaned_summary,
|
|
)
|
|
return cleaned_summary
|
|
except (ModelTaskError, ModelTaskDisabledError) as error:
|
|
log_processing_event(
|
|
session=session,
|
|
stage="summary",
|
|
event="Summary request failed, using deterministic fallback",
|
|
level="error",
|
|
document=document,
|
|
provider_id=str(summary_provider.get("id", "")),
|
|
model_name=str(summary_task.get("model", "")),
|
|
response_text=str(error),
|
|
)
|
|
return _fallback_summary(document)
|
|
|
|
|
|
def _build_routing_user_payload(
|
|
document: Document,
|
|
summary_text: str,
|
|
allowed_paths: list[str],
|
|
allowed_tags: list[str],
|
|
neighbors: list[SimilarDocument],
|
|
) -> str:
|
|
"""Builds structured user payload for LLM-based routing classification."""
|
|
|
|
return "\n".join(
|
|
[
|
|
"Target Document:",
|
|
f"- filename: {document.original_filename}",
|
|
f"- mime_type: {document.mime_type}",
|
|
f"- extension: {document.extension}",
|
|
f"- summary: {summary_text}",
|
|
"",
|
|
"Allowed Paths:",
|
|
", ".join(allowed_paths) if allowed_paths else "(none)",
|
|
"",
|
|
"Allowed Tags:",
|
|
", ".join(allowed_tags) if allowed_tags else "(none)",
|
|
"",
|
|
"Nearest Labeled Examples:",
|
|
_render_neighbors(neighbors),
|
|
]
|
|
)
|
|
|
|
|
|
def _build_neighbor_log_context(neighbors: list[SimilarDocument]) -> list[dict[str, Any]]:
|
|
"""Builds concise neighbor context used for routing request observability logs."""
|
|
|
|
return [
|
|
{
|
|
"document_id": neighbor.document_id,
|
|
"summary": neighbor.summary_text[:1200],
|
|
"path": neighbor.logical_path,
|
|
"tags": neighbor.tags[:10],
|
|
"similarity": round(
|
|
_clamp_probability(1.0 - neighbor.vector_distance, fallback=0.0),
|
|
4,
|
|
),
|
|
"distance": round(neighbor.vector_distance, 6),
|
|
}
|
|
for neighbor in neighbors
|
|
]
|
|
|
|
|
|
def _fallback_routing_from_neighbors(
|
|
neighbors: list[SimilarDocument],
|
|
allowed_paths: list[str],
|
|
allowed_tags: list[str],
|
|
neighbor_similarity: float,
|
|
) -> RoutingDecision:
|
|
"""Builds a deterministic fallback routing decision from semantic nearest neighbors."""
|
|
|
|
if not neighbors:
|
|
return RoutingDecision(
|
|
chosen_path=None,
|
|
chosen_tags=[],
|
|
suggested_new_paths=[],
|
|
suggested_new_tags=[],
|
|
confidence=0.0,
|
|
neighbor_similarity=neighbor_similarity,
|
|
)
|
|
|
|
best_neighbor = neighbors[0]
|
|
candidate_path = best_neighbor.logical_path.strip() if best_neighbor.logical_path.strip() else None
|
|
|
|
candidate_tags: list[str] = []
|
|
for neighbor in neighbors[:5]:
|
|
candidate_tags.extend(neighbor.tags)
|
|
|
|
chosen_path = candidate_path
|
|
suggested_new_paths: list[str] = []
|
|
if chosen_path and allowed_paths:
|
|
matched_paths, novel_paths = _normalize_with_catalog([chosen_path], allowed_paths, limit=1)
|
|
chosen_path = matched_paths[0] if matched_paths else None
|
|
suggested_new_paths = novel_paths
|
|
|
|
matched_tags, novel_tags = _normalize_with_catalog(candidate_tags, allowed_tags, limit=10)
|
|
confidence = max(0.0, min(0.72, neighbor_similarity))
|
|
return RoutingDecision(
|
|
chosen_path=chosen_path,
|
|
chosen_tags=matched_tags,
|
|
suggested_new_paths=suggested_new_paths,
|
|
suggested_new_tags=novel_tags,
|
|
confidence=confidence,
|
|
neighbor_similarity=neighbor_similarity,
|
|
)
|
|
|
|
|
|
def classify_document_routing(session: Session, document: Document, summary_text: str) -> RoutingDecision:
|
|
"""Classifies path and tags by combining semantic neighbors with LLM structured output."""
|
|
|
|
routing_runtime = read_task_runtime_settings(TASK_ROUTING_CLASSIFICATION)
|
|
routing_task = routing_runtime["task"]
|
|
routing_provider = routing_runtime["provider"]
|
|
if not bool(routing_task.get("enabled", True)):
|
|
log_processing_event(
|
|
session=session,
|
|
stage="routing",
|
|
event="Routing task disabled",
|
|
level="warning",
|
|
document=document,
|
|
provider_id=str(routing_provider.get("id", "")),
|
|
model_name=str(routing_task.get("model", "")),
|
|
)
|
|
return RoutingDecision(
|
|
chosen_path=None,
|
|
chosen_tags=[],
|
|
suggested_new_paths=[],
|
|
suggested_new_tags=[],
|
|
confidence=0.0,
|
|
neighbor_similarity=0.0,
|
|
)
|
|
|
|
neighbor_count = int(routing_task.get("neighbor_count", 8))
|
|
neighbor_min_similarity = _clamp_probability(
|
|
routing_task.get("neighbor_min_similarity", 0.84),
|
|
fallback=0.84,
|
|
)
|
|
neighbor_path_override_enabled = bool(routing_task.get("neighbor_path_override_enabled", True))
|
|
neighbor_path_override_min_similarity = _clamp_probability(
|
|
routing_task.get("neighbor_path_override_min_similarity", 0.86),
|
|
fallback=0.86,
|
|
)
|
|
neighbor_path_override_min_gap = _clamp_probability(
|
|
routing_task.get("neighbor_path_override_min_gap", 0.04),
|
|
fallback=0.04,
|
|
)
|
|
neighbor_path_override_max_confidence = _clamp_probability(
|
|
routing_task.get("neighbor_path_override_max_confidence", 0.9),
|
|
fallback=0.9,
|
|
)
|
|
log_processing_event(
|
|
session=session,
|
|
stage="routing",
|
|
event="Typesense neighbor query started",
|
|
level="info",
|
|
document=document,
|
|
payload_json={
|
|
"neighbor_count": neighbor_count,
|
|
"neighbor_min_similarity": neighbor_min_similarity,
|
|
"neighbor_mode": "semantic_text",
|
|
"summary_text_for_semantic_neighbors": summary_text,
|
|
},
|
|
)
|
|
all_neighbors = query_similar_documents(
|
|
summary_text=summary_text,
|
|
limit=neighbor_count,
|
|
exclude_document_id=str(document.id),
|
|
)
|
|
neighbors = [
|
|
neighbor
|
|
for neighbor in all_neighbors
|
|
if _clamp_probability(1.0 - neighbor.vector_distance, fallback=0.0) >= neighbor_min_similarity
|
|
]
|
|
neighbors_for_prompt = neighbors
|
|
|
|
nearest_any_distance = min([neighbor.vector_distance for neighbor in all_neighbors], default=2.0)
|
|
nearest_strong_distance = min([neighbor.vector_distance for neighbor in neighbors], default=2.0)
|
|
neighbor_similarity_any = _clamp_probability(1.0 - nearest_any_distance, fallback=0.0)
|
|
neighbor_similarity = _clamp_probability(1.0 - nearest_strong_distance, fallback=0.0)
|
|
log_processing_event(
|
|
session=session,
|
|
stage="routing",
|
|
event="Typesense neighbor query completed",
|
|
level="info",
|
|
document=document,
|
|
payload_json={
|
|
"neighbors_found": len(all_neighbors),
|
|
"neighbors_matching_threshold": len(neighbors),
|
|
"neighbor_candidates": _build_neighbor_log_context(all_neighbors),
|
|
},
|
|
)
|
|
|
|
allowed_paths = _list_allowed_paths(session, document.id)
|
|
allowed_tags = _list_allowed_tags(session, document.id)
|
|
|
|
user_payload = _build_routing_user_payload(
|
|
document=document,
|
|
summary_text=summary_text,
|
|
allowed_paths=allowed_paths,
|
|
allowed_tags=allowed_tags,
|
|
neighbors=neighbors_for_prompt,
|
|
)
|
|
|
|
llm_failed = False
|
|
try:
|
|
log_processing_event(
|
|
session=session,
|
|
stage="routing",
|
|
event="Routing request started",
|
|
level="info",
|
|
document=document,
|
|
provider_id=str(routing_provider.get("id", "")),
|
|
model_name=str(routing_task.get("model", "")),
|
|
prompt_text=str(routing_task.get("prompt", "")),
|
|
payload_json={
|
|
"neighbor_count": neighbor_count,
|
|
"neighbor_min_similarity": neighbor_min_similarity,
|
|
"available_paths": len(allowed_paths),
|
|
"available_tags": len(allowed_tags),
|
|
"neighbors_found": len(all_neighbors),
|
|
"neighbors_used": len(neighbors_for_prompt),
|
|
"nearest_similarity_any": neighbor_similarity_any,
|
|
"nearest_similarity_strong": neighbor_similarity,
|
|
"allowed_paths": allowed_paths,
|
|
"allowed_tags": allowed_tags,
|
|
"neighbor_examples": _build_neighbor_log_context(neighbors_for_prompt),
|
|
"user_payload": user_payload,
|
|
},
|
|
)
|
|
raw_output = complete_text_task(TASK_ROUTING_CLASSIFICATION, user_payload)
|
|
log_processing_event(
|
|
session=session,
|
|
stage="routing",
|
|
event="Routing response received",
|
|
level="info",
|
|
document=document,
|
|
provider_id=str(routing_provider.get("id", "")),
|
|
model_name=str(routing_task.get("model", "")),
|
|
response_text=raw_output,
|
|
)
|
|
except (ModelTaskError, ModelTaskDisabledError) as error:
|
|
raw_output = "{}"
|
|
llm_failed = True
|
|
log_processing_event(
|
|
session=session,
|
|
stage="routing",
|
|
event="Routing model request failed, continuing with fallback",
|
|
level="error",
|
|
document=document,
|
|
provider_id=str(routing_provider.get("id", "")),
|
|
model_name=str(routing_task.get("model", "")),
|
|
response_text=str(error),
|
|
)
|
|
|
|
parsed = _coerce_json(raw_output)
|
|
chosen_path = parsed.get("chosen_path")
|
|
if not isinstance(chosen_path, str):
|
|
chosen_path = None
|
|
elif chosen_path.strip():
|
|
chosen_path = chosen_path.strip()
|
|
else:
|
|
chosen_path = None
|
|
|
|
raw_tags = parsed.get("chosen_tags", [])
|
|
chosen_tags = [str(tag) for tag in raw_tags] if isinstance(raw_tags, list) else []
|
|
chosen_tags = _dedupe_non_empty(chosen_tags, limit=30)
|
|
|
|
raw_suggested_paths = parsed.get("suggested_new_paths", [])
|
|
suggested_new_paths = [str(path) for path in raw_suggested_paths] if isinstance(raw_suggested_paths, list) else []
|
|
suggested_new_paths = _dedupe_non_empty(suggested_new_paths, limit=10)
|
|
|
|
raw_suggested_tags = parsed.get("suggested_new_tags", [])
|
|
suggested_new_tags = [str(tag) for tag in raw_suggested_tags] if isinstance(raw_suggested_tags, list) else []
|
|
suggested_new_tags = _dedupe_non_empty(suggested_new_tags, limit=20)
|
|
|
|
confidence = _clamp_probability(parsed.get("confidence", 0.0), fallback=0.0)
|
|
|
|
if chosen_path and allowed_paths:
|
|
matched_paths, novel_paths = _normalize_with_catalog([chosen_path], allowed_paths, limit=1)
|
|
if matched_paths:
|
|
chosen_path = matched_paths[0]
|
|
else:
|
|
chosen_path = None
|
|
suggested_new_paths = _dedupe_non_empty([*suggested_new_paths, *novel_paths], limit=10)
|
|
|
|
matched_tags, novel_tags = _normalize_with_catalog(chosen_tags, allowed_tags, limit=30)
|
|
chosen_tags = matched_tags
|
|
suggested_new_tags = _dedupe_non_empty([*suggested_new_tags, *novel_tags], limit=20)
|
|
|
|
if neighbors:
|
|
best_neighbor = neighbors[0]
|
|
best_neighbor_path = best_neighbor.logical_path.strip()
|
|
matched_best_paths, _ = _normalize_with_catalog([best_neighbor_path], allowed_paths, limit=1)
|
|
best_neighbor_path = matched_best_paths[0] if matched_best_paths else best_neighbor_path
|
|
|
|
best_neighbor_similarity = _clamp_probability(1.0 - best_neighbor.vector_distance, fallback=0.0)
|
|
second_neighbor_similarity = _clamp_probability(1.0 - neighbors[1].vector_distance, fallback=0.0) if len(neighbors) > 1 else 0.0
|
|
similarity_gap = best_neighbor_similarity - second_neighbor_similarity
|
|
override_similarity_min = max(neighbor_min_similarity, neighbor_path_override_min_similarity)
|
|
|
|
should_override_path = (
|
|
neighbor_path_override_enabled
|
|
and
|
|
chosen_path is not None
|
|
and best_neighbor_path.strip() != ""
|
|
and chosen_path.strip().lower() != best_neighbor_path.strip().lower()
|
|
and confidence < neighbor_path_override_max_confidence
|
|
and best_neighbor_similarity >= override_similarity_min
|
|
and similarity_gap >= neighbor_path_override_min_gap
|
|
)
|
|
if should_override_path:
|
|
overridden_path = chosen_path
|
|
suggested_new_paths = _dedupe_non_empty([*suggested_new_paths, chosen_path], limit=10)
|
|
chosen_path = best_neighbor_path
|
|
confidence = max(confidence, min(0.86, best_neighbor_similarity))
|
|
log_processing_event(
|
|
session=session,
|
|
stage="routing",
|
|
event="Chosen path overridden by dominant nearest neighbor",
|
|
level="warning",
|
|
document=document,
|
|
provider_id=str(routing_provider.get("id", "")),
|
|
model_name=str(routing_task.get("model", "")),
|
|
payload_json={
|
|
"overridden_path": overridden_path,
|
|
"best_neighbor_path": best_neighbor_path,
|
|
"best_neighbor_similarity": best_neighbor_similarity,
|
|
"second_neighbor_similarity": second_neighbor_similarity,
|
|
"similarity_gap": similarity_gap,
|
|
"llm_confidence": _clamp_probability(parsed.get("confidence", 0.0), fallback=0.0),
|
|
},
|
|
)
|
|
|
|
has_best_neighbor_suggested_path = any(
|
|
path.strip().lower() == best_neighbor_path.strip().lower()
|
|
for path in suggested_new_paths
|
|
)
|
|
first_suggested_path = suggested_new_paths[0].strip().lower() if suggested_new_paths else ""
|
|
should_prioritize_neighbor_suggestion = (
|
|
neighbor_path_override_enabled
|
|
and
|
|
chosen_path is None
|
|
and best_neighbor_path.strip() != ""
|
|
and confidence < neighbor_path_override_max_confidence
|
|
and best_neighbor_similarity >= override_similarity_min
|
|
and similarity_gap >= neighbor_path_override_min_gap
|
|
and (
|
|
not has_best_neighbor_suggested_path
|
|
or first_suggested_path != best_neighbor_path.strip().lower()
|
|
)
|
|
)
|
|
if should_prioritize_neighbor_suggestion:
|
|
suggested_new_paths = _dedupe_non_empty([best_neighbor_path, *suggested_new_paths], limit=10)
|
|
log_processing_event(
|
|
session=session,
|
|
stage="routing",
|
|
event="Suggested path prioritized from dominant nearest neighbor",
|
|
level="warning",
|
|
document=document,
|
|
provider_id=str(routing_provider.get("id", "")),
|
|
model_name=str(routing_task.get("model", "")),
|
|
payload_json={
|
|
"best_neighbor_path": best_neighbor_path,
|
|
"best_neighbor_similarity": best_neighbor_similarity,
|
|
"second_neighbor_similarity": second_neighbor_similarity,
|
|
"similarity_gap": similarity_gap,
|
|
"llm_confidence": _clamp_probability(parsed.get("confidence", 0.0), fallback=0.0),
|
|
"prioritized_suggested_new_paths": suggested_new_paths,
|
|
},
|
|
)
|
|
|
|
if len(neighbors) == 0 and len(all_neighbors) > 0:
|
|
log_processing_event(
|
|
session=session,
|
|
stage="routing",
|
|
event="No neighbors met minimum similarity; routing proceeded without neighbor examples",
|
|
level="warning",
|
|
document=document,
|
|
provider_id=str(routing_provider.get("id", "")),
|
|
model_name=str(routing_task.get("model", "")),
|
|
payload_json={
|
|
"neighbor_min_similarity": neighbor_min_similarity,
|
|
"neighbors_found": len(all_neighbors),
|
|
},
|
|
)
|
|
|
|
llm_has_path_signal = chosen_path is not None or len(suggested_new_paths) > 0
|
|
llm_has_tag_signal = len(chosen_tags) > 0 or len(suggested_new_tags) > 0
|
|
llm_has_any_signal = llm_has_path_signal or llm_has_tag_signal
|
|
|
|
if llm_failed or not llm_has_any_signal or not llm_has_path_signal or not llm_has_tag_signal:
|
|
fallback = _fallback_routing_from_neighbors(
|
|
neighbors=neighbors,
|
|
allowed_paths=allowed_paths,
|
|
allowed_tags=allowed_tags,
|
|
neighbor_similarity=neighbor_similarity,
|
|
)
|
|
if fallback.chosen_path or fallback.chosen_tags or fallback.suggested_new_paths or fallback.suggested_new_tags:
|
|
if not llm_has_path_signal and fallback.chosen_path:
|
|
chosen_path = fallback.chosen_path
|
|
|
|
if not llm_has_path_signal and fallback.suggested_new_paths:
|
|
suggested_new_paths = _dedupe_non_empty(
|
|
[*suggested_new_paths, *fallback.suggested_new_paths],
|
|
limit=10,
|
|
)
|
|
|
|
if not llm_has_tag_signal:
|
|
if llm_failed or not llm_has_any_signal:
|
|
chosen_tags = _dedupe_non_empty([*chosen_tags, *fallback.chosen_tags], limit=30)
|
|
else:
|
|
suggested_new_tags = _dedupe_non_empty(
|
|
[*suggested_new_tags, *fallback.chosen_tags],
|
|
limit=20,
|
|
)
|
|
suggested_new_tags = _dedupe_non_empty(
|
|
[*suggested_new_tags, *fallback.suggested_new_tags],
|
|
limit=20,
|
|
)
|
|
|
|
confidence = max(confidence, fallback.confidence)
|
|
|
|
decision = RoutingDecision(
|
|
chosen_path=chosen_path,
|
|
chosen_tags=chosen_tags,
|
|
suggested_new_paths=suggested_new_paths,
|
|
suggested_new_tags=suggested_new_tags,
|
|
confidence=confidence,
|
|
neighbor_similarity=neighbor_similarity,
|
|
)
|
|
log_processing_event(
|
|
session=session,
|
|
stage="routing",
|
|
event="Routing decision prepared",
|
|
level="info",
|
|
document=document,
|
|
provider_id=str(routing_provider.get("id", "")),
|
|
model_name=str(routing_task.get("model", "")),
|
|
payload_json={
|
|
"chosen_path": decision.chosen_path,
|
|
"chosen_tags": decision.chosen_tags,
|
|
"suggested_new_paths": decision.suggested_new_paths,
|
|
"suggested_new_tags": decision.suggested_new_tags,
|
|
"confidence": decision.confidence,
|
|
"neighbor_similarity": decision.neighbor_similarity,
|
|
"nearest_similarity_any": neighbor_similarity_any,
|
|
},
|
|
)
|
|
return decision
|
|
|
|
|
|
def apply_routing_decision(document: Document, decision: RoutingDecision, session: Session | None = None) -> None:
|
|
"""Applies auto-routing when thresholds are met and stores suggestion metadata otherwise."""
|
|
|
|
routing_task = read_task_runtime_settings(TASK_ROUTING_CLASSIFICATION)["task"]
|
|
confidence_threshold = _clamp_probability(routing_task.get("auto_apply_confidence_threshold", 0.78), 0.78)
|
|
neighbor_threshold = _clamp_probability(
|
|
routing_task.get("auto_apply_neighbor_similarity_threshold", 0.55),
|
|
0.55,
|
|
)
|
|
|
|
style_cluster_id = (
|
|
document.handwriting_style_id
|
|
if document.image_text_type == "handwriting" and document.handwriting_style_id
|
|
else None
|
|
)
|
|
style_path_prefix = style_cluster_id
|
|
if style_cluster_id and session is not None:
|
|
style_path_prefix = resolve_handwriting_style_path_prefix(
|
|
session=session,
|
|
style_cluster_id=style_cluster_id,
|
|
exclude_document_id=str(document.id),
|
|
)
|
|
chosen_path = apply_handwriting_style_path(style_cluster_id=style_path_prefix, path_value=decision.chosen_path)
|
|
suggested_new_paths = _dedupe_non_empty(
|
|
[
|
|
styled_path
|
|
for styled_path in [
|
|
apply_handwriting_style_path(style_cluster_id=style_path_prefix, path_value=path)
|
|
for path in decision.suggested_new_paths
|
|
]
|
|
if styled_path
|
|
],
|
|
limit=10,
|
|
)
|
|
|
|
has_chosen_path = bool(chosen_path)
|
|
confidence_passed = decision.confidence >= confidence_threshold
|
|
auto_apply_blocked_reasons: list[str] = []
|
|
if not has_chosen_path:
|
|
auto_apply_blocked_reasons.append("missing_chosen_path")
|
|
if not confidence_passed:
|
|
auto_apply_blocked_reasons.append("confidence_below_threshold")
|
|
|
|
auto_applied_path = False
|
|
auto_applied_tags = False
|
|
auto_applied = False
|
|
if has_chosen_path and confidence_passed:
|
|
document.logical_path = chosen_path
|
|
auto_applied_path = True
|
|
|
|
routed_tags_for_apply = _dedupe_non_empty(
|
|
[*decision.chosen_tags, *decision.suggested_new_tags],
|
|
limit=50,
|
|
)
|
|
if routed_tags_for_apply:
|
|
document.tags = routed_tags_for_apply
|
|
auto_applied_tags = True
|
|
auto_applied = True
|
|
|
|
metadata_json = dict(document.metadata_json)
|
|
metadata_json["routing"] = {
|
|
"auto_applied": auto_applied,
|
|
"auto_applied_path": auto_applied_path,
|
|
"auto_applied_tags": auto_applied_tags,
|
|
"confidence": decision.confidence,
|
|
"neighbor_similarity": decision.neighbor_similarity,
|
|
"auto_apply_confidence_threshold": confidence_threshold,
|
|
"auto_apply_neighbor_similarity_threshold": neighbor_threshold,
|
|
"auto_apply_checks": {
|
|
"has_chosen_path": has_chosen_path,
|
|
"confidence_passed": confidence_passed,
|
|
"neighbor_similarity_observed": decision.neighbor_similarity,
|
|
"neighbor_similarity_gate_enabled": False,
|
|
},
|
|
"auto_apply_blocked_reasons": auto_apply_blocked_reasons,
|
|
"chosen_path": chosen_path,
|
|
"base_chosen_path": decision.chosen_path,
|
|
"chosen_tags": decision.chosen_tags,
|
|
"suggested_new_paths": suggested_new_paths,
|
|
"base_suggested_new_paths": decision.suggested_new_paths,
|
|
"suggested_new_tags": decision.suggested_new_tags,
|
|
"handwriting_style_id": style_cluster_id,
|
|
"handwriting_style_path_prefix": style_path_prefix,
|
|
}
|
|
document.metadata_json = metadata_json
|
|
|
|
unresolved_path = suggested_new_paths[0] if suggested_new_paths else None
|
|
unresolved_tags = _dedupe_non_empty(decision.suggested_new_tags, limit=50)
|
|
if not auto_applied:
|
|
document.suggested_path = chosen_path or unresolved_path
|
|
document.suggested_tags = _dedupe_non_empty(
|
|
[*decision.chosen_tags, *unresolved_tags],
|
|
limit=50,
|
|
)
|
|
else:
|
|
document.suggested_path = unresolved_path
|
|
document.suggested_tags = [] if auto_applied_tags else unresolved_tags
|
|
|
|
|
|
def upsert_semantic_index(document: Document, summary_text: str) -> None:
|
|
"""Upserts semantic search document representation in Typesense."""
|
|
|
|
upsert_document_index(document=document, summary_text=summary_text)
|