"""Admin-only API endpoints for processing pipeline event logs.""" from uuid import UUID from fastapi import APIRouter, Depends, Query from sqlalchemy.orm import Session from app.core.config import get_settings from app.db.base import get_session from app.schemas.processing_logs import ProcessingLogEntryResponse, ProcessingLogListResponse from app.services.app_settings import read_processing_log_retention_settings from app.services.processing_logs import ( cleanup_processing_logs, clear_processing_logs, count_processing_logs, list_processing_logs, ) router = APIRouter() settings = get_settings() @router.get("", response_model=ProcessingLogListResponse) def get_processing_logs( offset: int = Query(default=0, ge=0), limit: int = Query(default=120, ge=1, le=settings.processing_log_max_unbound_entries), document_id: UUID | None = Query(default=None), session: Session = Depends(get_session), ) -> ProcessingLogListResponse: """Returns paginated processing logs ordered from newest to oldest.""" items = list_processing_logs( session=session, limit=limit, offset=offset, document_id=document_id, ) total = count_processing_logs(session=session, document_id=document_id) return ProcessingLogListResponse( total=total, items=[ProcessingLogEntryResponse.model_validate(item) for item in items], ) @router.post("/trim") def trim_processing_logs( keep_document_sessions: int | None = Query(default=None, ge=0, le=settings.processing_log_max_document_sessions), keep_unbound_entries: int | None = Query(default=None, ge=0, le=settings.processing_log_max_unbound_entries), session: Session = Depends(get_session), ) -> dict[str, int]: """Deletes old processing logs using query values or persisted retention defaults.""" retention_defaults = read_processing_log_retention_settings() resolved_keep_document_sessions = ( keep_document_sessions if keep_document_sessions is not None else int(retention_defaults.get("keep_document_sessions", 2)) ) resolved_keep_unbound_entries = ( keep_unbound_entries if keep_unbound_entries is not None else int(retention_defaults.get("keep_unbound_entries", 80)) ) capped_keep_document_sessions = min( settings.processing_log_max_document_sessions, max(0, int(resolved_keep_document_sessions)), ) capped_keep_unbound_entries = min( settings.processing_log_max_unbound_entries, max(0, int(resolved_keep_unbound_entries)), ) result = cleanup_processing_logs( session=session, keep_document_sessions=capped_keep_document_sessions, keep_unbound_entries=capped_keep_unbound_entries, ) session.commit() return result @router.post("/clear") def clear_all_processing_logs(session: Session = Depends(get_session)) -> dict[str, int]: """Deletes all processing logs to reset the diagnostics timeline.""" result = clear_processing_logs(session=session) session.commit() return result