Files
2026-05-16 12:05:36 -03:00

95 lines
3.2 KiB
Python

from __future__ import annotations
import threading
from dataclasses import asdict, dataclass, field
from datetime import datetime
from typing import Callable
from uuid import uuid4
from app.models import utcnow
@dataclass
class ImportJob:
id: str
inbox_id: str
action: str
status: str = "queued"
scanned_messages: int = 0
processed_messages: int = 0
candidate_messages: int = 0
valid_reports_imported: int = 0
duplicate_messages_skipped: int = 0
duplicate_reports_skipped: int = 0
failed_messages: int = 0
rejected_messages: int = 0
records_imported: int = 0
alerts_created: int = 0
duplicate_report_samples: list[dict] | None = None
error: str | None = None
started_at: datetime = field(default_factory=utcnow)
updated_at: datetime = field(default_factory=utcnow)
completed_at: datetime | None = None
@property
def progress_percent(self) -> int | None:
if not self.scanned_messages:
return None
return min(100, round(self.processed_messages / self.scanned_messages * 100))
def to_dict(self) -> dict:
data = asdict(self)
data["started_at"] = self.started_at.isoformat()
data["updated_at"] = self.updated_at.isoformat()
data["completed_at"] = self.completed_at.isoformat() if self.completed_at else None
data["progress_percent"] = self.progress_percent
data["duplicates_skipped"] = self.duplicate_messages_skipped + self.duplicate_reports_skipped
return data
class ImportJobStore:
def __init__(self) -> None:
self._jobs: dict[str, ImportJob] = {}
self._lock = threading.Lock()
def active_for_inbox(self, inbox_id: str) -> ImportJob | None:
with self._lock:
for job in sorted(self._jobs.values(), key=lambda item: item.started_at, reverse=True):
if job.inbox_id == inbox_id and job.status in {"queued", "running"}:
return job
return None
def latest_for_inbox(self, inbox_id: str) -> ImportJob | None:
with self._lock:
jobs = [job for job in self._jobs.values() if job.inbox_id == inbox_id]
return max(jobs, key=lambda item: item.started_at) if jobs else None
def create(self, inbox_id: str, action: str) -> ImportJob:
job = ImportJob(id=uuid4().hex, inbox_id=inbox_id, action=action)
with self._lock:
self._jobs[job.id] = job
return job
def get(self, job_id: str) -> ImportJob | None:
with self._lock:
return self._jobs.get(job_id)
def list(self, inbox_id: str | None = None) -> list[ImportJob]:
with self._lock:
jobs = list(self._jobs.values())
if inbox_id:
jobs = [job for job in jobs if job.inbox_id == inbox_id]
return sorted(jobs, key=lambda item: item.started_at, reverse=True)
def update(self, job_id: str, mutator: Callable[[ImportJob], None]) -> ImportJob | None:
with self._lock:
job = self._jobs.get(job_id)
if not job:
return None
mutator(job)
job.updated_at = utcnow()
return job
import_jobs = ImportJobStore()