236 lines
8.4 KiB
Python
236 lines
8.4 KiB
Python
"""Persistence helpers for writing and querying processing pipeline log events."""
|
|
|
|
from typing import Any
|
|
from uuid import UUID
|
|
|
|
from sqlalchemy import delete, func, select
|
|
from sqlalchemy.orm import Session
|
|
|
|
from app.core.config import get_settings
|
|
from app.models.document import Document
|
|
from app.models.processing_log import ProcessingLogEntry
|
|
|
|
|
|
settings = get_settings()
|
|
|
|
MAX_STAGE_LENGTH = 64
|
|
MAX_EVENT_LENGTH = 256
|
|
MAX_LEVEL_LENGTH = 16
|
|
MAX_PROVIDER_LENGTH = 128
|
|
MAX_MODEL_LENGTH = 256
|
|
MAX_DOCUMENT_FILENAME_LENGTH = 512
|
|
MAX_PROMPT_LENGTH = 200000
|
|
MAX_RESPONSE_LENGTH = 200000
|
|
DEFAULT_KEEP_DOCUMENT_SESSIONS = 2
|
|
DEFAULT_KEEP_UNBOUND_ENTRIES = 80
|
|
PROCESSING_LOG_AUTOCOMMIT_SESSION_KEY = "processing_log_autocommit"
|
|
|
|
|
|
def _trim(value: str | None, max_length: int) -> str | None:
|
|
"""Normalizes and truncates text values for safe log persistence."""
|
|
|
|
if value is None:
|
|
return None
|
|
normalized = value.strip()
|
|
if not normalized:
|
|
return None
|
|
if len(normalized) <= max_length:
|
|
return normalized
|
|
return normalized[: max_length - 3] + "..."
|
|
|
|
|
|
def _safe_payload(payload_json: dict[str, Any] | None) -> dict[str, Any]:
|
|
"""Normalizes payload persistence mode using metadata-only defaults for sensitive content."""
|
|
|
|
if not isinstance(payload_json, dict):
|
|
return {}
|
|
if settings.processing_log_store_payload_text:
|
|
return payload_json
|
|
return _metadata_only_payload(payload_json)
|
|
|
|
|
|
def _metadata_only_payload(payload_json: dict[str, Any]) -> dict[str, Any]:
|
|
"""Converts payload content into metadata descriptors without persisting raw text values."""
|
|
|
|
metadata: dict[str, Any] = {}
|
|
for index, (raw_key, raw_value) in enumerate(payload_json.items()):
|
|
if index >= 80:
|
|
break
|
|
key = str(raw_key)
|
|
metadata[key] = _metadata_only_payload_value(raw_value)
|
|
return metadata
|
|
|
|
|
|
def _metadata_only_payload_value(value: Any) -> Any:
|
|
"""Converts one payload value into non-sensitive metadata representation."""
|
|
|
|
if isinstance(value, dict):
|
|
return _metadata_only_payload(value)
|
|
if isinstance(value, (list, tuple)):
|
|
items = list(value)
|
|
return {
|
|
"item_count": len(items),
|
|
"items_preview": [_metadata_only_payload_value(item) for item in items[:20]],
|
|
}
|
|
if isinstance(value, str):
|
|
normalized = value.strip()
|
|
return {
|
|
"text_chars": len(normalized),
|
|
"text_omitted": bool(normalized),
|
|
}
|
|
if isinstance(value, bytes):
|
|
return {"binary_bytes": len(value)}
|
|
if isinstance(value, (int, float, bool)) or value is None:
|
|
return value
|
|
return {"value_type": type(value).__name__}
|
|
|
|
|
|
def set_processing_log_autocommit(session: Session, enabled: bool) -> None:
|
|
"""Toggles per-session immediate commit behavior for processing log events."""
|
|
|
|
session.info[PROCESSING_LOG_AUTOCOMMIT_SESSION_KEY] = bool(enabled)
|
|
|
|
|
|
def is_processing_log_autocommit_enabled(session: Session) -> bool:
|
|
"""Returns whether processing logs are committed immediately for the current session."""
|
|
|
|
return bool(session.info.get(PROCESSING_LOG_AUTOCOMMIT_SESSION_KEY, False))
|
|
|
|
|
|
def log_processing_event(
|
|
session: Session,
|
|
stage: str,
|
|
event: str,
|
|
*,
|
|
level: str = "info",
|
|
document: Document | None = None,
|
|
document_id: UUID | None = None,
|
|
document_filename: str | None = None,
|
|
provider_id: str | None = None,
|
|
model_name: str | None = None,
|
|
prompt_text: str | None = None,
|
|
response_text: str | None = None,
|
|
payload_json: dict[str, Any] | None = None,
|
|
) -> None:
|
|
"""Persists one processing log entry linked to an optional document context."""
|
|
|
|
resolved_document_id = document.id if document is not None else document_id
|
|
resolved_document_filename = document.original_filename if document is not None else document_filename
|
|
|
|
entry = ProcessingLogEntry(
|
|
level=_trim(level, MAX_LEVEL_LENGTH) or "info",
|
|
stage=_trim(stage, MAX_STAGE_LENGTH) or "pipeline",
|
|
event=_trim(event, MAX_EVENT_LENGTH) or "event",
|
|
document_id=resolved_document_id,
|
|
document_filename=_trim(resolved_document_filename, MAX_DOCUMENT_FILENAME_LENGTH),
|
|
provider_id=_trim(provider_id, MAX_PROVIDER_LENGTH),
|
|
model_name=_trim(model_name, MAX_MODEL_LENGTH),
|
|
prompt_text=_trim(prompt_text, MAX_PROMPT_LENGTH) if settings.processing_log_store_model_io_text else None,
|
|
response_text=_trim(response_text, MAX_RESPONSE_LENGTH) if settings.processing_log_store_model_io_text else None,
|
|
payload_json=_safe_payload(payload_json),
|
|
)
|
|
session.add(entry)
|
|
if is_processing_log_autocommit_enabled(session):
|
|
session.commit()
|
|
|
|
|
|
def count_processing_logs(session: Session, document_id: UUID | None = None) -> int:
|
|
"""Counts persisted processing logs, optionally restricted to one document."""
|
|
|
|
statement = select(func.count()).select_from(ProcessingLogEntry)
|
|
if document_id is not None:
|
|
statement = statement.where(ProcessingLogEntry.document_id == document_id)
|
|
return int(session.execute(statement).scalar_one())
|
|
|
|
|
|
def list_processing_logs(
|
|
session: Session,
|
|
*,
|
|
limit: int,
|
|
offset: int,
|
|
document_id: UUID | None = None,
|
|
) -> list[ProcessingLogEntry]:
|
|
"""Lists processing logs ordered by newest-first with optional document filter."""
|
|
|
|
statement = select(ProcessingLogEntry)
|
|
if document_id is not None:
|
|
statement = statement.where(ProcessingLogEntry.document_id == document_id)
|
|
statement = statement.order_by(ProcessingLogEntry.created_at.desc(), ProcessingLogEntry.id.desc()).offset(offset).limit(limit)
|
|
return session.execute(statement).scalars().all()
|
|
|
|
|
|
def cleanup_processing_logs(
|
|
session: Session,
|
|
*,
|
|
keep_document_sessions: int = DEFAULT_KEEP_DOCUMENT_SESSIONS,
|
|
keep_unbound_entries: int = DEFAULT_KEEP_UNBOUND_ENTRIES,
|
|
) -> dict[str, int]:
|
|
"""Deletes old log entries while keeping recent document sessions and unbound events."""
|
|
|
|
normalized_keep_sessions = max(0, keep_document_sessions)
|
|
normalized_keep_unbound = max(0, keep_unbound_entries)
|
|
deleted_document_entries = 0
|
|
deleted_unbound_entries = 0
|
|
|
|
recent_document_rows = session.execute(
|
|
select(
|
|
ProcessingLogEntry.document_id,
|
|
func.max(ProcessingLogEntry.created_at).label("last_seen"),
|
|
)
|
|
.where(ProcessingLogEntry.document_id.is_not(None))
|
|
.group_by(ProcessingLogEntry.document_id)
|
|
.order_by(func.max(ProcessingLogEntry.created_at).desc())
|
|
.limit(normalized_keep_sessions)
|
|
).all()
|
|
keep_document_ids = [row[0] for row in recent_document_rows if row[0] is not None]
|
|
|
|
if keep_document_ids:
|
|
deleted_document_entries = int(
|
|
session.execute(
|
|
delete(ProcessingLogEntry).where(
|
|
ProcessingLogEntry.document_id.is_not(None),
|
|
ProcessingLogEntry.document_id.notin_(keep_document_ids),
|
|
)
|
|
).rowcount
|
|
or 0
|
|
)
|
|
else:
|
|
deleted_document_entries = int(
|
|
session.execute(delete(ProcessingLogEntry).where(ProcessingLogEntry.document_id.is_not(None))).rowcount or 0
|
|
)
|
|
|
|
keep_unbound_rows = session.execute(
|
|
select(ProcessingLogEntry.id)
|
|
.where(ProcessingLogEntry.document_id.is_(None))
|
|
.order_by(ProcessingLogEntry.created_at.desc(), ProcessingLogEntry.id.desc())
|
|
.limit(normalized_keep_unbound)
|
|
).all()
|
|
keep_unbound_ids = [row[0] for row in keep_unbound_rows]
|
|
|
|
if keep_unbound_ids:
|
|
deleted_unbound_entries = int(
|
|
session.execute(
|
|
delete(ProcessingLogEntry).where(
|
|
ProcessingLogEntry.document_id.is_(None),
|
|
ProcessingLogEntry.id.notin_(keep_unbound_ids),
|
|
)
|
|
).rowcount
|
|
or 0
|
|
)
|
|
else:
|
|
deleted_unbound_entries = int(
|
|
session.execute(delete(ProcessingLogEntry).where(ProcessingLogEntry.document_id.is_(None))).rowcount or 0
|
|
)
|
|
|
|
return {
|
|
"deleted_document_entries": deleted_document_entries,
|
|
"deleted_unbound_entries": deleted_unbound_entries,
|
|
}
|
|
|
|
|
|
def clear_processing_logs(session: Session) -> dict[str, int]:
|
|
"""Deletes all persisted processing log entries and returns deletion count."""
|
|
|
|
deleted_entries = int(session.execute(delete(ProcessingLogEntry)).rowcount or 0)
|
|
return {"deleted_entries": deleted_entries}
|