Files
ledgerdock/backend/app/api/routes_documents.py

986 lines
37 KiB
Python

"""Authenticated document CRUD, lifecycle, metadata, file access, and content export endpoints."""
import re
import tempfile
import unicodedata
import zipfile
from datetime import datetime, time
from pathlib import Path
from typing import Annotated, BinaryIO, Iterator, Literal
from uuid import UUID
from fastapi import APIRouter, Depends, File, Form, HTTPException, Query, UploadFile
from fastapi.responses import FileResponse, Response, StreamingResponse
from sqlalchemy import or_, func, select
from sqlalchemy.orm import Session
from app.api.auth import AuthContext, require_user_or_admin
from app.core.config import get_settings, is_inline_preview_mime_type_safe
from app.db.base import get_session
from app.models.auth import UserRole
from app.models.document import Document, DocumentStatus
from app.schemas.documents import (
ContentExportRequest,
DocumentDetailResponse,
DocumentResponse,
DocumentsListResponse,
DocumentUpdateRequest,
UploadConflict,
UploadResponse,
)
from app.services.app_settings import read_predefined_paths_settings, read_predefined_tags_settings
from app.services.extractor import sniff_mime
from app.services.handwriting_style import delete_many_handwriting_style_documents
from app.services.processing_logs import log_processing_event, set_processing_log_autocommit
from app.services.rate_limiter import increment_rate_limit
from app.services.storage import absolute_path, compute_sha256, store_bytes
from app.services.typesense_index import delete_many_documents_index, upsert_document_index
from app.worker.queue import get_processing_queue
router = APIRouter()
settings = get_settings()
def _scope_document_statement_for_auth_context(statement, auth_context: AuthContext):
"""Restricts document statements to caller-owned rows for non-admin users."""
if auth_context.role == UserRole.ADMIN:
return statement
return statement.where(Document.owner_user_id == auth_context.user_id)
def _is_predefined_entry_visible_to_auth_context(entry: dict[str, object], auth_context: AuthContext) -> bool:
"""Returns whether one predefined catalog entry is visible to the active caller role."""
if auth_context.role == UserRole.ADMIN:
return True
return bool(entry.get("global_shared", False))
def _collect_visible_predefined_values(
entries: list[dict[str, object]],
*,
auth_context: AuthContext,
) -> set[str]:
"""Collects normalized predefined values visible for the active caller role."""
visible_values: set[str] = set()
for entry in entries:
if not _is_predefined_entry_visible_to_auth_context(entry, auth_context):
continue
normalized = str(entry.get("value", "")).strip()
if normalized:
visible_values.add(normalized)
return visible_values
def _ensure_document_access(document: Document, auth_context: AuthContext) -> None:
"""Enforces owner-level access for non-admin users and raises not-found on violations."""
if auth_context.role == UserRole.ADMIN:
return
if document.owner_user_id != auth_context.user_id:
raise HTTPException(status_code=404, detail="Document not found")
def _stream_binary_file_chunks(handle: BinaryIO, *, chunk_bytes: int) -> Iterator[bytes]:
"""Streams binary file-like content in bounded chunks and closes handle after completion."""
try:
while True:
chunk = handle.read(chunk_bytes)
if not chunk:
break
yield chunk
finally:
handle.close()
def _enforce_content_export_rate_limit(auth_context: AuthContext) -> None:
"""Applies per-user fixed-window rate limiting for markdown export requests."""
try:
current_count, limit = increment_rate_limit(
scope="content-md-export",
subject=str(auth_context.user_id),
limit=settings.content_export_rate_limit_per_minute,
window_seconds=60,
)
except RuntimeError as error:
raise HTTPException(
status_code=503,
detail="Rate limiter backend unavailable",
) from error
if limit > 0 and current_count > limit:
raise HTTPException(
status_code=429,
detail=f"Export rate limit exceeded ({limit} requests per minute)",
)
def _parse_csv(value: str | None) -> list[str]:
"""Parses comma-separated query values into a normalized non-empty list."""
if not value:
return []
return [part.strip() for part in value.split(",") if part.strip()]
def _parse_date(value: str | None) -> datetime | None:
"""Parses ISO date strings into UTC-naive midnight datetimes."""
if not value:
return None
try:
parsed = datetime.fromisoformat(value)
return parsed
except ValueError:
pass
try:
date_value = datetime.strptime(value, "%Y-%m-%d").date()
return datetime.combine(date_value, time.min)
except ValueError:
return None
def _apply_discovery_filters(
statement,
*,
path_filter: str | None,
tag_filter: str | None,
type_filter: str | None,
processed_from: str | None,
processed_to: str | None,
):
"""Applies optional path/tag/type/date filters to list and search statements."""
if path_filter and path_filter.strip():
statement = statement.where(Document.logical_path.ilike(f"{path_filter.strip()}%"))
tags = _parse_csv(tag_filter)
if tags:
statement = statement.where(Document.tags.overlap(tags))
types = _parse_csv(type_filter)
if types:
type_clauses = []
for value in types:
lowered = value.lower()
type_clauses.append(Document.extension.ilike(lowered))
type_clauses.append(Document.mime_type.ilike(lowered))
type_clauses.append(Document.image_text_type.ilike(lowered))
statement = statement.where(or_(*type_clauses))
processed_from_dt = _parse_date(processed_from)
if processed_from_dt is not None:
statement = statement.where(Document.processed_at.is_not(None), Document.processed_at >= processed_from_dt)
processed_to_dt = _parse_date(processed_to)
if processed_to_dt is not None:
statement = statement.where(Document.processed_at.is_not(None), Document.processed_at <= processed_to_dt)
return statement
def _summary_for_index(document: Document) -> str:
"""Resolves best-available summary text for semantic index updates outside worker pipeline."""
candidate = document.metadata_json.get("summary_text")
if isinstance(candidate, str) and candidate.strip():
return candidate.strip()
extracted = document.extracted_text.strip()
if extracted:
return extracted[:12000]
return f"{document.original_filename}\n{document.mime_type}\n{document.logical_path}"
def _normalize_tags(raw_tags: str | None) -> list[str]:
"""Parses comma-separated tags into a cleaned unique list."""
if not raw_tags:
return []
tags = [tag.strip() for tag in raw_tags.split(",") if tag.strip()]
return list(dict.fromkeys(tags))[:50]
def _sanitize_filename(filename: str) -> str:
"""Normalizes user-supplied filenames while preserving readability and extensions."""
base = filename.strip().replace("\\", " ").replace("/", " ")
base = re.sub(r"\s+", " ", base)
return base[:512] or "document"
def _slugify_segment(value: str) -> str:
"""Creates a filesystem-safe slug for path segments and markdown file names."""
normalized = unicodedata.normalize("NFKD", value)
ascii_text = normalized.encode("ascii", "ignore").decode("ascii")
cleaned = re.sub(r"[^a-zA-Z0-9._ -]+", "", ascii_text).strip()
compact = re.sub(r"\s+", "-", cleaned)
compact = compact.strip(".-_")
return compact[:120] or "document"
def _markdown_for_document(document: Document) -> str:
"""Builds a markdown representation of extracted document content and metadata."""
lines = [
f"# {document.original_filename}",
"",
f"- Document ID: `{document.id}`",
f"- Logical Path: `{document.logical_path}`",
f"- Source Path: `{document.source_relative_path}`",
f"- Tags: {', '.join(document.tags) if document.tags else '(none)' }",
"",
"## Extracted Content",
"",
]
if document.extracted_text.strip():
lines.append(document.extracted_text)
else:
lines.append("_No extracted text available for this document._")
return "\n".join(lines).strip() + "\n"
def _markdown_filename(document: Document) -> str:
"""Builds a deterministic markdown filename for a single document export."""
stem = Path(document.original_filename).stem or document.original_filename
slug = _slugify_segment(stem)
return f"{slug}-{str(document.id)[:8]}.md"
def _zip_entry_name(document: Document, used_names: set[str]) -> str:
"""Builds a unique zip entry path for a document markdown export."""
path_segments = [segment for segment in document.logical_path.split("/") if segment]
sanitized_segments = [_slugify_segment(segment) for segment in path_segments]
filename = _markdown_filename(document)
base_entry = "/".join([*sanitized_segments, filename]) if sanitized_segments else filename
entry = base_entry
suffix = 1
while entry in used_names:
stem = Path(filename).stem
ext = Path(filename).suffix
candidate = f"{stem}-{suffix}{ext}"
entry = "/".join([*sanitized_segments, candidate]) if sanitized_segments else candidate
suffix += 1
used_names.add(entry)
return entry
def _resolve_previous_status(metadata_json: dict, fallback_status: DocumentStatus) -> DocumentStatus:
"""Resolves the status to restore from trash using recorded metadata."""
raw_status = metadata_json.get("status_before_trash")
if isinstance(raw_status, str):
try:
parsed = DocumentStatus(raw_status)
if parsed != DocumentStatus.TRASHED:
return parsed
except ValueError:
pass
return fallback_status
def _build_document_list_statement(
only_trashed: bool,
include_trashed: bool,
path_prefix: str | None,
):
"""Builds a base SQLAlchemy select statement with lifecycle and path filters."""
statement = select(Document)
if only_trashed:
statement = statement.where(Document.status == DocumentStatus.TRASHED)
elif not include_trashed:
statement = statement.where(Document.status != DocumentStatus.TRASHED)
if path_prefix:
trimmed_prefix = path_prefix.strip()
if trimmed_prefix:
statement = statement.where(Document.logical_path.ilike(f"{trimmed_prefix}%"))
return statement
def _enforce_upload_shape(files: list[UploadFile]) -> None:
"""Validates upload request shape against configured file-count bounds."""
if not files:
raise HTTPException(status_code=400, detail="Upload request must include at least one file")
if len(files) > settings.max_upload_files_per_request:
raise HTTPException(
status_code=413,
detail=(
"Upload request exceeds file count limit "
f"({len(files)} > {settings.max_upload_files_per_request})"
),
)
async def _read_upload_bytes(file: UploadFile, max_bytes: int) -> bytes:
"""Reads one upload file while enforcing per-file byte limits."""
data = await file.read(max_bytes + 1)
if len(data) > max_bytes:
raise HTTPException(
status_code=413,
detail=f"File '{file.filename or 'upload'}' exceeds per-file limit of {max_bytes} bytes",
)
return data
def _collect_document_tree(session: Session, root_document_id: UUID) -> list[tuple[int, Document]]:
"""Collects a document and all descendants for recursive permanent deletion."""
queue: list[tuple[UUID, int]] = [(root_document_id, 0)]
visited: set[UUID] = set()
collected: list[tuple[int, Document]] = []
while queue:
current_id, depth = queue.pop(0)
if current_id in visited:
continue
visited.add(current_id)
document = session.execute(select(Document).where(Document.id == current_id)).scalar_one_or_none()
if document is None:
continue
collected.append((depth, document))
child_ids = session.execute(
select(Document.id).where(Document.parent_document_id == current_id)
).scalars().all()
for child_id in child_ids:
queue.append((child_id, depth + 1))
collected.sort(key=lambda item: item[0], reverse=True)
return collected
@router.get("", response_model=DocumentsListResponse)
def list_documents(
offset: int = Query(default=0, ge=0),
limit: int = Query(default=50, ge=1, le=200),
include_trashed: bool = Query(default=False),
only_trashed: bool = Query(default=False),
path_prefix: str | None = Query(default=None),
path_filter: str | None = Query(default=None),
tag_filter: str | None = Query(default=None),
type_filter: str | None = Query(default=None),
processed_from: str | None = Query(default=None),
processed_to: str | None = Query(default=None),
auth_context: AuthContext = Depends(require_user_or_admin),
session: Session = Depends(get_session),
) -> DocumentsListResponse:
"""Returns paginated documents ordered by newest upload timestamp."""
base_statement = _build_document_list_statement(
only_trashed=only_trashed,
include_trashed=include_trashed,
path_prefix=path_prefix,
)
base_statement = _scope_document_statement_for_auth_context(base_statement, auth_context)
base_statement = _apply_discovery_filters(
base_statement,
path_filter=path_filter,
tag_filter=tag_filter,
type_filter=type_filter,
processed_from=processed_from,
processed_to=processed_to,
)
statement = base_statement.order_by(Document.created_at.desc()).offset(offset).limit(limit)
items = session.execute(statement).scalars().all()
count_statement = select(func.count()).select_from(base_statement.subquery())
total = session.execute(count_statement).scalar_one()
return DocumentsListResponse(total=total, items=[DocumentResponse.model_validate(item) for item in items])
@router.get("/tags")
def list_tags(
include_trashed: bool = Query(default=False),
auth_context: AuthContext = Depends(require_user_or_admin),
session: Session = Depends(get_session),
) -> dict[str, list[str]]:
"""Returns distinct tags currently assigned across all matching documents."""
statement = select(Document.tags)
statement = _scope_document_statement_for_auth_context(statement, auth_context)
if not include_trashed:
statement = statement.where(Document.status != DocumentStatus.TRASHED)
rows = session.execute(statement).scalars().all()
tags = {tag for row in rows for tag in row if tag}
tags.update(
_collect_visible_predefined_values(
read_predefined_tags_settings(),
auth_context=auth_context,
)
)
tags = sorted(tags)
return {"tags": tags}
@router.get("/paths")
def list_paths(
include_trashed: bool = Query(default=False),
auth_context: AuthContext = Depends(require_user_or_admin),
session: Session = Depends(get_session),
) -> dict[str, list[str]]:
"""Returns distinct logical paths currently assigned across all matching documents."""
statement = select(Document.logical_path)
statement = _scope_document_statement_for_auth_context(statement, auth_context)
if not include_trashed:
statement = statement.where(Document.status != DocumentStatus.TRASHED)
rows = session.execute(statement).scalars().all()
paths = {row for row in rows if row}
paths.update(
_collect_visible_predefined_values(
read_predefined_paths_settings(),
auth_context=auth_context,
)
)
paths = sorted(paths)
return {"paths": paths}
@router.get("/types")
def list_types(
include_trashed: bool = Query(default=False),
auth_context: AuthContext = Depends(require_user_or_admin),
session: Session = Depends(get_session),
) -> dict[str, list[str]]:
"""Returns distinct document type values from extension, MIME, and image text type."""
statement = select(Document.extension, Document.mime_type, Document.image_text_type)
statement = _scope_document_statement_for_auth_context(statement, auth_context)
if not include_trashed:
statement = statement.where(Document.status != DocumentStatus.TRASHED)
rows = session.execute(statement).all()
values: set[str] = set()
for extension, mime_type, image_text_type in rows:
for candidate in (extension, mime_type, image_text_type):
normalized = str(candidate).strip().lower() if isinstance(candidate, str) else ""
if normalized:
values.add(normalized)
return {"types": sorted(values)}
@router.post("/content-md/export")
def export_contents_markdown(
payload: ContentExportRequest,
auth_context: AuthContext = Depends(require_user_or_admin),
session: Session = Depends(get_session),
) -> StreamingResponse:
"""Exports extracted contents for selected documents as individual markdown files in a ZIP archive."""
_enforce_content_export_rate_limit(auth_context)
has_document_ids = len(payload.document_ids) > 0
has_path_prefix = bool(payload.path_prefix and payload.path_prefix.strip())
if not has_document_ids and not has_path_prefix:
raise HTTPException(status_code=400, detail="Provide document_ids or path_prefix for export")
statement = select(Document)
statement = _scope_document_statement_for_auth_context(statement, auth_context)
if has_document_ids:
statement = statement.where(Document.id.in_(payload.document_ids))
if has_path_prefix:
statement = statement.where(Document.logical_path.ilike(f"{payload.path_prefix.strip()}%"))
if payload.only_trashed:
statement = statement.where(Document.status == DocumentStatus.TRASHED)
elif not payload.include_trashed:
statement = statement.where(Document.status != DocumentStatus.TRASHED)
max_documents = max(1, int(settings.content_export_max_documents))
ordered_statement = statement.order_by(Document.logical_path.asc(), Document.created_at.asc()).limit(max_documents + 1)
documents = session.execute(ordered_statement).scalars().all()
if len(documents) > max_documents:
raise HTTPException(
status_code=413,
detail=f"Export exceeds maximum document count ({len(documents)} > {max_documents})",
)
if not documents:
raise HTTPException(status_code=404, detail="No matching documents found for export")
max_total_bytes = max(1, int(settings.content_export_max_total_bytes))
max_spool_memory = max(64 * 1024, int(settings.content_export_spool_max_memory_bytes))
archive_file = tempfile.SpooledTemporaryFile(max_size=max_spool_memory, mode="w+b")
total_export_bytes = 0
used_entries: set[str] = set()
try:
with zipfile.ZipFile(archive_file, mode="w", compression=zipfile.ZIP_DEFLATED) as archive:
for document in documents:
markdown_bytes = _markdown_for_document(document).encode("utf-8")
total_export_bytes += len(markdown_bytes)
if total_export_bytes > max_total_bytes:
raise HTTPException(
status_code=413,
detail=(
"Export exceeds total markdown size limit "
f"({total_export_bytes} > {max_total_bytes} bytes)"
),
)
entry_name = _zip_entry_name(document, used_entries)
archive.writestr(entry_name, markdown_bytes)
archive_file.seek(0)
except Exception:
archive_file.close()
raise
chunk_bytes = max(4 * 1024, int(settings.content_export_stream_chunk_bytes))
headers = {"Content-Disposition": 'attachment; filename="document-contents-md.zip"'}
return StreamingResponse(
_stream_binary_file_chunks(archive_file, chunk_bytes=chunk_bytes),
media_type="application/zip",
headers=headers,
)
@router.get("/{document_id}", response_model=DocumentDetailResponse)
def get_document(
document_id: UUID,
auth_context: AuthContext = Depends(require_user_or_admin),
session: Session = Depends(get_session),
) -> DocumentDetailResponse:
"""Returns one document by unique identifier."""
statement = _scope_document_statement_for_auth_context(
select(Document).where(Document.id == document_id),
auth_context,
)
document = session.execute(statement).scalar_one_or_none()
if document is None:
raise HTTPException(status_code=404, detail="Document not found")
return DocumentDetailResponse.model_validate(document)
@router.get("/{document_id}/download")
def download_document(
document_id: UUID,
auth_context: AuthContext = Depends(require_user_or_admin),
session: Session = Depends(get_session),
) -> FileResponse:
"""Downloads original document bytes for the requested document identifier."""
statement = _scope_document_statement_for_auth_context(
select(Document).where(Document.id == document_id),
auth_context,
)
document = session.execute(statement).scalar_one_or_none()
if document is None:
raise HTTPException(status_code=404, detail="Document not found")
file_path = absolute_path(document.stored_relative_path)
return FileResponse(path=file_path, filename=document.original_filename, media_type=document.mime_type)
@router.get("/{document_id}/preview")
def preview_document(
document_id: UUID,
auth_context: AuthContext = Depends(require_user_or_admin),
session: Session = Depends(get_session),
) -> FileResponse:
"""Streams trusted-safe MIME types inline and forces attachment for active script-capable types."""
statement = _scope_document_statement_for_auth_context(
select(Document).where(Document.id == document_id),
auth_context,
)
document = session.execute(statement).scalar_one_or_none()
if document is None:
raise HTTPException(status_code=404, detail="Document not found")
original_path = absolute_path(document.stored_relative_path)
common_headers = {"X-Content-Type-Options": "nosniff"}
if not is_inline_preview_mime_type_safe(document.mime_type):
return FileResponse(
path=original_path,
filename=document.original_filename,
media_type="application/octet-stream",
headers=common_headers,
)
return FileResponse(path=original_path, media_type=document.mime_type, headers=common_headers)
@router.get("/{document_id}/thumbnail")
def thumbnail_document(
document_id: UUID,
auth_context: AuthContext = Depends(require_user_or_admin),
session: Session = Depends(get_session),
) -> FileResponse:
"""Returns a generated thumbnail image for dashboard card previews."""
statement = _scope_document_statement_for_auth_context(
select(Document).where(Document.id == document_id),
auth_context,
)
document = session.execute(statement).scalar_one_or_none()
if document is None:
raise HTTPException(status_code=404, detail="Document not found")
preview_relative_path = document.metadata_json.get("preview_relative_path")
if not preview_relative_path:
raise HTTPException(status_code=404, detail="Thumbnail not available")
preview_path = absolute_path(preview_relative_path)
if not preview_path.exists():
raise HTTPException(status_code=404, detail="Thumbnail file not found")
return FileResponse(path=preview_path)
@router.get("/{document_id}/content-md")
def download_document_content_markdown(
document_id: UUID,
auth_context: AuthContext = Depends(require_user_or_admin),
session: Session = Depends(get_session),
) -> Response:
"""Downloads extracted content for one document as a markdown file."""
statement = _scope_document_statement_for_auth_context(
select(Document).where(Document.id == document_id),
auth_context,
)
document = session.execute(statement).scalar_one_or_none()
if document is None:
raise HTTPException(status_code=404, detail="Document not found")
markdown_content = _markdown_for_document(document)
filename = _markdown_filename(document)
headers = {"Content-Disposition": f'attachment; filename="{filename}"'}
return Response(content=markdown_content, media_type="text/markdown; charset=utf-8", headers=headers)
@router.post("/upload", response_model=UploadResponse)
async def upload_documents(
files: Annotated[list[UploadFile], File(description="Files to upload")],
relative_paths: Annotated[list[str] | None, Form()] = None,
logical_path: Annotated[str, Form()] = "Inbox",
tags: Annotated[str | None, Form()] = None,
conflict_mode: Annotated[Literal["ask", "replace", "duplicate"], Form()] = "ask",
auth_context: AuthContext = Depends(require_user_or_admin),
session: Session = Depends(get_session),
) -> UploadResponse:
"""Uploads files, records metadata, and enqueues asynchronous extraction tasks."""
_enforce_upload_shape(files)
set_processing_log_autocommit(session, True)
normalized_tags = _normalize_tags(tags)
queue = get_processing_queue()
uploaded: list[DocumentResponse] = []
conflicts: list[UploadConflict] = []
total_request_bytes = 0
indexed_relative_paths = relative_paths or []
prepared_uploads: list[dict[str, object]] = []
for idx, file in enumerate(files):
filename = file.filename or f"uploaded_{idx}"
data = await _read_upload_bytes(file, settings.max_upload_file_size_bytes)
total_request_bytes += len(data)
if total_request_bytes > settings.max_upload_request_size_bytes:
raise HTTPException(
status_code=413,
detail=(
"Upload request exceeds total size limit "
f"({total_request_bytes} > {settings.max_upload_request_size_bytes} bytes)"
),
)
sha256 = compute_sha256(data)
source_relative_path = indexed_relative_paths[idx] if idx < len(indexed_relative_paths) else filename
extension = Path(filename).suffix.lower()
detected_mime = sniff_mime(data)
log_processing_event(
session=session,
stage="upload",
event="Upload request received",
level="info",
document_filename=filename,
payload_json={
"source_relative_path": source_relative_path,
"logical_path": logical_path,
"tags": normalized_tags,
"mime_type": detected_mime,
"size_bytes": len(data),
"conflict_mode": conflict_mode,
},
)
prepared_uploads.append(
{
"filename": filename,
"data": data,
"sha256": sha256,
"source_relative_path": source_relative_path,
"extension": extension,
"mime_type": detected_mime,
}
)
existing_statement = _scope_document_statement_for_auth_context(
select(Document).where(Document.sha256 == sha256),
auth_context,
)
existing = session.execute(existing_statement).scalar_one_or_none()
if existing and conflict_mode == "ask":
log_processing_event(
session=session,
stage="upload",
event="Upload conflict detected",
level="warning",
document_id=existing.id,
document_filename=filename,
payload_json={
"sha256": sha256,
"existing_document_id": str(existing.id),
},
)
conflicts.append(
UploadConflict(
original_filename=filename,
sha256=sha256,
existing_document_id=existing.id,
)
)
if conflicts and conflict_mode == "ask":
session.commit()
return UploadResponse(uploaded=[], conflicts=conflicts)
for prepared in prepared_uploads:
existing_statement = _scope_document_statement_for_auth_context(
select(Document).where(Document.sha256 == str(prepared["sha256"])),
auth_context,
)
existing = session.execute(existing_statement).scalar_one_or_none()
replaces_document_id = existing.id if existing and conflict_mode == "replace" else None
stored_relative_path = store_bytes(str(prepared["filename"]), bytes(prepared["data"]))
document = Document(
original_filename=str(prepared["filename"]),
source_relative_path=str(prepared["source_relative_path"]),
stored_relative_path=stored_relative_path,
mime_type=str(prepared["mime_type"]),
extension=str(prepared["extension"]),
sha256=str(prepared["sha256"]),
size_bytes=len(bytes(prepared["data"])),
logical_path=logical_path,
tags=list(normalized_tags),
owner_user_id=auth_context.user_id,
replaces_document_id=replaces_document_id,
metadata_json={"upload": "web"},
)
session.add(document)
session.flush()
queue.enqueue("app.worker.tasks.process_document_task", str(document.id))
log_processing_event(
session=session,
stage="upload",
event="Document record created and queued",
level="info",
document=document,
payload_json={
"source_relative_path": document.source_relative_path,
"stored_relative_path": document.stored_relative_path,
"logical_path": document.logical_path,
"tags": list(document.tags),
"replaces_document_id": str(replaces_document_id) if replaces_document_id is not None else None,
},
)
uploaded.append(DocumentResponse.model_validate(document))
session.commit()
return UploadResponse(uploaded=uploaded, conflicts=conflicts)
@router.patch("/{document_id}", response_model=DocumentResponse)
def update_document(
document_id: UUID,
payload: DocumentUpdateRequest,
auth_context: AuthContext = Depends(require_user_or_admin),
session: Session = Depends(get_session),
) -> DocumentResponse:
"""Updates document metadata and refreshes semantic index representation."""
statement = _scope_document_statement_for_auth_context(
select(Document).where(Document.id == document_id),
auth_context,
)
document = session.execute(statement).scalar_one_or_none()
if document is None:
raise HTTPException(status_code=404, detail="Document not found")
if payload.original_filename is not None:
document.original_filename = _sanitize_filename(payload.original_filename)
if payload.logical_path is not None:
document.logical_path = payload.logical_path.strip() or "Inbox"
if payload.tags is not None:
document.tags = list(dict.fromkeys([tag.strip() for tag in payload.tags if tag.strip()]))[:50]
try:
upsert_document_index(document=document, summary_text=_summary_for_index(document))
except Exception:
pass
session.commit()
session.refresh(document)
return DocumentResponse.model_validate(document)
@router.post("/{document_id}/trash", response_model=DocumentResponse)
def trash_document(
document_id: UUID,
auth_context: AuthContext = Depends(require_user_or_admin),
session: Session = Depends(get_session),
) -> DocumentResponse:
"""Marks a document as trashed without deleting files from storage."""
statement = _scope_document_statement_for_auth_context(
select(Document).where(Document.id == document_id),
auth_context,
)
document = session.execute(statement).scalar_one_or_none()
if document is None:
raise HTTPException(status_code=404, detail="Document not found")
if document.status != DocumentStatus.TRASHED:
document.metadata_json = {
**document.metadata_json,
"status_before_trash": document.status.value,
}
document.status = DocumentStatus.TRASHED
try:
upsert_document_index(document=document, summary_text=_summary_for_index(document))
except Exception:
pass
session.commit()
session.refresh(document)
return DocumentResponse.model_validate(document)
@router.post("/{document_id}/restore", response_model=DocumentResponse)
def restore_document(
document_id: UUID,
auth_context: AuthContext = Depends(require_user_or_admin),
session: Session = Depends(get_session),
) -> DocumentResponse:
"""Restores a trashed document to its previous lifecycle status."""
statement = _scope_document_statement_for_auth_context(
select(Document).where(Document.id == document_id),
auth_context,
)
document = session.execute(statement).scalar_one_or_none()
if document is None:
raise HTTPException(status_code=404, detail="Document not found")
if document.status == DocumentStatus.TRASHED:
fallback = DocumentStatus.PROCESSED if document.processed_at else DocumentStatus.QUEUED
restored_status = _resolve_previous_status(document.metadata_json, fallback)
document.status = restored_status
metadata_json = dict(document.metadata_json)
metadata_json.pop("status_before_trash", None)
document.metadata_json = metadata_json
try:
upsert_document_index(document=document, summary_text=_summary_for_index(document))
except Exception:
pass
session.commit()
session.refresh(document)
return DocumentResponse.model_validate(document)
@router.delete("/{document_id}")
def delete_document(
document_id: UUID,
auth_context: AuthContext = Depends(require_user_or_admin),
session: Session = Depends(get_session),
) -> dict[str, int]:
"""Permanently deletes a document and all descendant archive members including stored files."""
root_statement = _scope_document_statement_for_auth_context(
select(Document).where(Document.id == document_id),
auth_context,
)
root = session.execute(root_statement).scalar_one_or_none()
if root is None:
raise HTTPException(status_code=404, detail="Document not found")
if root.status != DocumentStatus.TRASHED:
raise HTTPException(status_code=400, detail="Move document to trash before permanent deletion")
document_tree = _collect_document_tree(session=session, root_document_id=document_id)
if auth_context.role != UserRole.ADMIN:
for _, document in document_tree:
_ensure_document_access(document, auth_context)
document_ids = [document.id for _, document in document_tree]
try:
delete_many_documents_index([str(current_id) for current_id in document_ids])
except Exception:
pass
try:
delete_many_handwriting_style_documents([str(current_id) for current_id in document_ids])
except Exception:
pass
deleted_files = 0
for _, document in document_tree:
source_path = absolute_path(document.stored_relative_path)
if source_path.exists() and source_path.is_file():
source_path.unlink(missing_ok=True)
deleted_files += 1
preview_relative_path = document.metadata_json.get("preview_relative_path")
if isinstance(preview_relative_path, str):
preview_path = absolute_path(preview_relative_path)
if preview_path.exists() and preview_path.is_file():
preview_path.unlink(missing_ok=True)
session.delete(document)
session.commit()
return {"deleted_documents": len(document_tree), "deleted_files": deleted_files}
@router.post("/{document_id}/reprocess", response_model=DocumentResponse)
def reprocess_document(
document_id: UUID,
auth_context: AuthContext = Depends(require_user_or_admin),
session: Session = Depends(get_session),
) -> DocumentResponse:
"""Re-enqueues a document for extraction and suggestion processing."""
statement = _scope_document_statement_for_auth_context(
select(Document).where(Document.id == document_id),
auth_context,
)
document = session.execute(statement).scalar_one_or_none()
if document is None:
raise HTTPException(status_code=404, detail="Document not found")
if document.status == DocumentStatus.TRASHED:
raise HTTPException(status_code=400, detail="Restore document before reprocessing")
queue = get_processing_queue()
document.status = DocumentStatus.QUEUED
try:
upsert_document_index(document=document, summary_text=_summary_for_index(document))
except Exception:
pass
session.commit()
queue.enqueue("app.worker.tasks.process_document_task", str(document.id))
session.refresh(document)
return DocumentResponse.model_validate(document)