76 lines
2.3 KiB
Python
76 lines
2.3 KiB
Python
from __future__ import annotations
|
|
|
|
import fcntl
|
|
from pathlib import Path
|
|
import threading
|
|
from dataclasses import dataclass
|
|
from typing import IO
|
|
|
|
|
|
@dataclass
|
|
class InboxRunLease:
|
|
inbox_id: str
|
|
_locks: list[threading.Lock]
|
|
_lock_file: IO[str] | None = None
|
|
_released: bool = False
|
|
|
|
def release(self) -> None:
|
|
if not self._released:
|
|
self._released = True
|
|
if self._lock_file:
|
|
fcntl.flock(self._lock_file.fileno(), fcntl.LOCK_UN)
|
|
self._lock_file.close()
|
|
for lock in reversed(self._locks):
|
|
lock.release()
|
|
|
|
def __enter__(self) -> "InboxRunLease":
|
|
return self
|
|
|
|
def __exit__(self, exc_type, exc, tb) -> None:
|
|
self.release()
|
|
|
|
|
|
class InboxRunLocks:
|
|
def __init__(self) -> None:
|
|
self._guard = threading.Lock()
|
|
self._global_lock = threading.Lock()
|
|
self._locks: dict[str, threading.Lock] = {}
|
|
self._lock_path = Path("data/import.lock")
|
|
|
|
def _acquire_file_lock(self, *, blocking: bool) -> IO[str] | None:
|
|
self._lock_path.parent.mkdir(parents=True, exist_ok=True)
|
|
handle = self._lock_path.open("a+", encoding="utf-8")
|
|
flags = fcntl.LOCK_EX if blocking else fcntl.LOCK_EX | fcntl.LOCK_NB
|
|
try:
|
|
fcntl.flock(handle.fileno(), flags)
|
|
return handle
|
|
except BlockingIOError:
|
|
handle.close()
|
|
return None
|
|
|
|
def acquire(self, inbox_id: str, *, blocking: bool = False) -> InboxRunLease | None:
|
|
if not self._global_lock.acquire(blocking=blocking):
|
|
return None
|
|
lock_file = self._acquire_file_lock(blocking=blocking)
|
|
if not lock_file:
|
|
self._global_lock.release()
|
|
return None
|
|
with self._guard:
|
|
lock = self._locks.setdefault(inbox_id, threading.Lock())
|
|
if not lock.acquire(blocking=blocking):
|
|
fcntl.flock(lock_file.fileno(), fcntl.LOCK_UN)
|
|
lock_file.close()
|
|
self._global_lock.release()
|
|
return None
|
|
return InboxRunLease(inbox_id=inbox_id, _locks=[self._global_lock, lock], _lock_file=lock_file)
|
|
|
|
def active(self, inbox_id: str) -> bool:
|
|
lease = self.acquire(inbox_id, blocking=False)
|
|
if not lease:
|
|
return True
|
|
lease.release()
|
|
return False
|
|
|
|
|
|
inbox_run_locks = InboxRunLocks()
|