diff --git a/README.md b/README.md index a18b4c9..6b43b59 100644 --- a/README.md +++ b/README.md @@ -182,8 +182,10 @@ GET /api/alerts POST /api/alerts/{id}/ack POST /api/alerts/{id}/resolve POST /api/alerts/{id}/reopen -POST /api/admin/process-now -POST /api/admin/backlog +POST /api/admin/import-jobs/process-now +POST /api/admin/import-jobs/backlog +GET /api/admin/import-jobs +GET /api/admin/import-jobs/{id} ``` `/health` is public. Homepage API routes use bearer auth when enabled. Dashboard and admin/API management routes use Basic Auth. diff --git a/app/inbox_locks.py b/app/inbox_locks.py index e0e8185..dcf7129 100644 --- a/app/inbox_locks.py +++ b/app/inbox_locks.py @@ -1,18 +1,25 @@ from __future__ import annotations +import fcntl +from pathlib import Path import threading from dataclasses import dataclass +from typing import IO @dataclass class InboxRunLease: inbox_id: str _locks: list[threading.Lock] + _lock_file: IO[str] | None = None _released: bool = False def release(self) -> None: if not self._released: self._released = True + if self._lock_file: + fcntl.flock(self._lock_file.fileno(), fcntl.LOCK_UN) + self._lock_file.close() for lock in reversed(self._locks): lock.release() @@ -28,16 +35,34 @@ class InboxRunLocks: self._guard = threading.Lock() self._global_lock = threading.Lock() self._locks: dict[str, threading.Lock] = {} + self._lock_path = Path("data/import.lock") + + def _acquire_file_lock(self, *, blocking: bool) -> IO[str] | None: + self._lock_path.parent.mkdir(parents=True, exist_ok=True) + handle = self._lock_path.open("a+", encoding="utf-8") + flags = fcntl.LOCK_EX if blocking else fcntl.LOCK_EX | fcntl.LOCK_NB + try: + fcntl.flock(handle.fileno(), flags) + return handle + except BlockingIOError: + handle.close() + return None def acquire(self, inbox_id: str, *, blocking: bool = False) -> InboxRunLease | None: if not self._global_lock.acquire(blocking=blocking): return None + lock_file = self._acquire_file_lock(blocking=blocking) + if not lock_file: + self._global_lock.release() + return None with self._guard: lock = self._locks.setdefault(inbox_id, threading.Lock()) if not lock.acquire(blocking=blocking): + fcntl.flock(lock_file.fileno(), fcntl.LOCK_UN) + lock_file.close() self._global_lock.release() return None - return InboxRunLease(inbox_id=inbox_id, _locks=[self._global_lock, lock]) + return InboxRunLease(inbox_id=inbox_id, _locks=[self._global_lock, lock], _lock_file=lock_file) def active(self, inbox_id: str) -> bool: lease = self.acquire(inbox_id, blocking=False) diff --git a/app/main.py b/app/main.py index a8aa30d..63cb881 100644 --- a/app/main.py +++ b/app/main.py @@ -916,42 +916,3 @@ def api_import_job(job_id: str): def api_inbox_status(inbox_id: str, session: Session = Depends(get_db)): return _inbox_status_payload(inbox_id, session) - -@app.post("/api/admin/process-now", dependencies=dashboard_post_auth) -def api_process_now(body: ProcessNowRequest, session: Session = Depends(get_db)): - try: - inbox = settings.get_inbox(body.inbox_id) - except KeyError: - raise HTTPException(status_code=404, detail=f"Unknown inbox: {body.inbox_id}") from None - lease = inbox_run_locks.acquire(inbox.id, blocking=False) - if not lease: - raise HTTPException(status_code=409, detail=f"Inbox {inbox.id} is already processing.") - with lease: - summary = process_inbox(session, settings, inbox, mode=body.mode, limit=body.limit) - return summary.__dict__ - - -@app.post("/api/admin/backlog", dependencies=dashboard_post_auth) -def api_backlog(body: BacklogRequest, session: Session = Depends(get_db)): - try: - inbox = settings.get_inbox(body.inbox_id) - except KeyError: - raise HTTPException(status_code=404, detail=f"Unknown inbox: {body.inbox_id}") from None - lease = inbox_run_locks.acquire(inbox.id, blocking=False) - if not lease: - raise HTTPException(status_code=409, detail=f"Inbox {inbox.id} is already processing.") - with lease: - summary = process_inbox( - session, - settings, - inbox, - folder=body.folder or inbox.folder, - mode="backlog", - since=body.since, - before=body.before, - limit=body.limit, - dry_run=body.dry_run, - reprocess=body.reprocess, - mark_seen=body.mark_seen, - ) - return summary.__dict__ diff --git a/docs/runtime.md b/docs/runtime.md index 609c593..b780953 100644 --- a/docs/runtime.md +++ b/docs/runtime.md @@ -70,10 +70,12 @@ The CLI calls `process_inbox()` in backlog mode and prints the resulting `Proces `app/main.py` exposes admin endpoints that call the same processing pipeline: -- `POST /api/admin/process-now`: processes the configured inbox using request fields from `ProcessNowRequest`. -- `POST /api/admin/backlog`: runs backlog processing using request fields from `BacklogRequest`. +- `POST /api/admin/import-jobs/process-now`: starts a queued process-now job using request fields from `ProcessNowRequest`. +- `POST /api/admin/import-jobs/backlog`: starts a queued backlog job using request fields from `BacklogRequest`. +- `GET /api/admin/import-jobs`: lists import jobs. +- `GET /api/admin/import-jobs/{id}`: returns import job progress. -Both endpoints use dashboard Basic Auth dependencies. +These endpoints use dashboard Basic Auth dependencies. ## Scheduled Work diff --git a/tests/test_inbox_locks.py b/tests/test_inbox_locks.py index e02e393..121b43c 100644 --- a/tests/test_inbox_locks.py +++ b/tests/test_inbox_locks.py @@ -14,3 +14,19 @@ def test_inbox_run_locks_serialize_different_inboxes(): second = locks.acquire("second", blocking=False) assert second is not None second.release() + + +def test_inbox_run_locks_serialize_across_lock_instances(): + first_locks = InboxRunLocks() + second_locks = InboxRunLocks() + first = first_locks.acquire("first", blocking=False) + assert first is not None + + try: + assert second_locks.acquire("second", blocking=False) is None + finally: + first.release() + + second = second_locks.acquire("second", blocking=False) + assert second is not None + second.release()