From 026efec79beee5653cc5cd421691e407c5d93a68 Mon Sep 17 00:00:00 2001 From: Beda Schmid Date: Sat, 16 May 2026 13:25:10 -0300 Subject: [PATCH] Remove useless reports, avoid DB-Races --- app/analyzer.py | 25 ------------ app/config.py | 1 - app/db.py | 21 +++++++++- app/inbox_locks.py | 11 ++++-- app/main.py | 82 ++++++++++++++++++++------------------- config/config.example.yml | 1 - config/config.yml | 1 - tests/test_analyzer.py | 4 +- tests/test_inbox_locks.py | 16 ++++++++ 9 files changed, 88 insertions(+), 74 deletions(-) create mode 100644 tests/test_inbox_locks.py diff --git a/app/analyzer.py b/app/analyzer.py index fd09759..7b848ed 100644 --- a/app/analyzer.py +++ b/app/analyzer.py @@ -486,29 +486,4 @@ def _reporter_alerts(session: Session, settings: Settings, report: Report) -> li details={**_report_evidence(report), "policy_p": report.policy_p, "policy_sp": report.policy_sp, "policy_pct": report.policy_pct}, ) ) - missing_after = max(1, settings.alerts.thresholds.missing_reporter_days) - cutoff = _report_time(report) - timedelta(days=missing_after) - reporter_rows = session.execute( - select(Report.org_name, func.max(func.coalesce(Report.date_end, Report.date_begin, Report.created_at))) - .where(Report.domain == report.domain, Report.org_name.is_not(None)) - .group_by(Report.org_name) - ).all() - for org_name, last_seen in reporter_rows: - last_seen_at = _as_utc(last_seen) - if not org_name or not last_seen_at or last_seen_at >= cutoff: - continue - missed_days = (_report_time(report) - last_seen_at).days - alerts.append( - create_or_update_alert( - session, - inbox_id=report.inbox_id, - domain=report.domain, - severity="warning", - alert_type="missing_reporter", - key=org_name, - title=f"DMARC reporter missing for {report.domain}", - summary=f"{org_name} has not sent a DMARC aggregate report for {missed_days} days.", - details={**_report_evidence(report, link_report=False), "reporter": org_name, "last_seen_at": last_seen_at.isoformat()}, - ) - ) return alerts diff --git a/app/config.py b/app/config.py index 713bd00..5a67f82 100644 --- a/app/config.py +++ b/app/config.py @@ -110,7 +110,6 @@ class AlertThresholds(BaseModel): total_volume_drop_percent: float = 80 min_messages_for_rate_alert: int = 20 repeated_failure_days: int = 2 - missing_reporter_days: int = 3 class AlertsConfig(BaseModel): diff --git a/app/db.py b/app/db.py index 91c6094..0c2c09f 100644 --- a/app/db.py +++ b/app/db.py @@ -4,7 +4,7 @@ from contextlib import contextmanager from pathlib import Path from typing import Iterator -from sqlalchemy import create_engine, text +from sqlalchemy import create_engine, event, text from sqlalchemy.orm import DeclarativeBase, Session, sessionmaker from app.config import get_settings @@ -23,7 +23,24 @@ def _engine_url() -> str: return url -engine = create_engine(_engine_url(), future=True, pool_pre_ping=True) +def _connect_args(url: str) -> dict: + if url.startswith("sqlite:"): + return {"timeout": 60} + return {} + + +engine_url = _engine_url() +engine = create_engine(engine_url, future=True, pool_pre_ping=True, connect_args=_connect_args(engine_url)) + + +@event.listens_for(engine, "connect") +def _configure_sqlite(dbapi_connection, connection_record) -> None: + if engine.url.get_backend_name() != "sqlite": + return + cursor = dbapi_connection.cursor() + cursor.execute("PRAGMA busy_timeout=60000") + cursor.execute("PRAGMA journal_mode=WAL") + cursor.close() SessionLocal = sessionmaker(bind=engine, autoflush=False, autocommit=False, future=True) diff --git a/app/inbox_locks.py b/app/inbox_locks.py index caec751..e0e8185 100644 --- a/app/inbox_locks.py +++ b/app/inbox_locks.py @@ -7,13 +7,14 @@ from dataclasses import dataclass @dataclass class InboxRunLease: inbox_id: str - _lock: threading.Lock + _locks: list[threading.Lock] _released: bool = False def release(self) -> None: if not self._released: self._released = True - self._lock.release() + for lock in reversed(self._locks): + lock.release() def __enter__(self) -> "InboxRunLease": return self @@ -25,14 +26,18 @@ class InboxRunLease: class InboxRunLocks: def __init__(self) -> None: self._guard = threading.Lock() + self._global_lock = threading.Lock() self._locks: dict[str, threading.Lock] = {} def acquire(self, inbox_id: str, *, blocking: bool = False) -> InboxRunLease | None: + if not self._global_lock.acquire(blocking=blocking): + return None with self._guard: lock = self._locks.setdefault(inbox_id, threading.Lock()) if not lock.acquire(blocking=blocking): + self._global_lock.release() return None - return InboxRunLease(inbox_id=inbox_id, _lock=lock) + return InboxRunLease(inbox_id=inbox_id, _locks=[self._global_lock, lock]) def active(self, inbox_id: str) -> bool: lease = self.acquire(inbox_id, blocking=False) diff --git a/app/main.py b/app/main.py index 72ccf5b..a8aa30d 100644 --- a/app/main.py +++ b/app/main.py @@ -773,41 +773,48 @@ def _copy_summary_to_job(job_id: str, summary) -> None: import_jobs.update(job_id, mutate) -def _run_import_job(job_id: str, action: str, body: ProcessNowRequest | BacklogRequest, lease: InboxRunLease) -> None: - def mark_running(job): - job.status = "running" - - import_jobs.update(job_id, mark_running) +def _run_import_job(job_id: str, action: str, body: ProcessNowRequest | BacklogRequest) -> None: + lease: InboxRunLease | None = None try: inbox = settings.get_inbox(body.inbox_id) - with session_scope() as session: - if action == "backlog": - assert isinstance(body, BacklogRequest) - summary = process_inbox( - session, - settings, - inbox, - folder=body.folder or inbox.folder, - mode="backlog", - since=body.since, - before=body.before, - limit=body.limit, - dry_run=body.dry_run, - reprocess=body.reprocess, - mark_seen=body.mark_seen, - progress_callback=lambda item: _copy_summary_to_job(job_id, item), - ) - else: - assert isinstance(body, ProcessNowRequest) - summary = process_inbox( - session, - settings, - inbox, - mode=body.mode, - limit=body.limit, - progress_callback=lambda item: _copy_summary_to_job(job_id, item), - ) - _copy_summary_to_job(job_id, summary) + lease = inbox_run_locks.acquire(inbox.id, blocking=True) + + def mark_running(job): + job.status = "running" + + import_jobs.update(job_id, mark_running) + try: + with session_scope() as session: + if action == "backlog": + assert isinstance(body, BacklogRequest) + summary = process_inbox( + session, + settings, + inbox, + folder=body.folder or inbox.folder, + mode="backlog", + since=body.since, + before=body.before, + limit=body.limit, + dry_run=body.dry_run, + reprocess=body.reprocess, + mark_seen=body.mark_seen, + progress_callback=lambda item: _copy_summary_to_job(job_id, item), + ) + else: + assert isinstance(body, ProcessNowRequest) + summary = process_inbox( + session, + settings, + inbox, + mode=body.mode, + limit=body.limit, + progress_callback=lambda item: _copy_summary_to_job(job_id, item), + ) + _copy_summary_to_job(job_id, summary) + finally: + lease.release() + lease = None def mark_done(job): job.status = "succeeded" @@ -824,7 +831,8 @@ def _run_import_job(job_id: str, action: str, body: ProcessNowRequest | BacklogR import_jobs.update(job_id, mark_failed) finally: - lease.release() + if lease: + lease.release() def _start_import_job(action: str, body: ProcessNowRequest | BacklogRequest) -> dict: @@ -835,16 +843,12 @@ def _start_import_job(action: str, body: ProcessNowRequest | BacklogRequest) -> active = import_jobs.active_for_inbox(body.inbox_id) if active: return active.to_dict() - lease = inbox_run_locks.acquire(body.inbox_id, blocking=False) - if not lease: - raise HTTPException(status_code=409, detail=f"Inbox {body.inbox_id} is already processing.") try: job = import_jobs.create(body.inbox_id, action) - thread = threading.Thread(target=_run_import_job, args=(job.id, action, body, lease), daemon=True) + thread = threading.Thread(target=_run_import_job, args=(job.id, action, body), daemon=True) thread.start() return job.to_dict() except Exception: - lease.release() raise diff --git a/config/config.example.yml b/config/config.example.yml index 9f29b9e..422dffa 100644 --- a/config/config.example.yml +++ b/config/config.example.yml @@ -100,4 +100,3 @@ alerts: total_volume_drop_percent: 80 min_messages_for_rate_alert: 20 repeated_failure_days: 2 - missing_reporter_days: 3 diff --git a/config/config.yml b/config/config.yml index bee1e6d..fd00d8c 100644 --- a/config/config.yml +++ b/config/config.yml @@ -124,4 +124,3 @@ alerts: total_volume_drop_percent: 80 min_messages_for_rate_alert: 20 repeated_failure_days: 2 - missing_reporter_days: 3 diff --git a/tests/test_analyzer.py b/tests/test_analyzer.py index 0164590..57a8fa3 100644 --- a/tests/test_analyzer.py +++ b/tests/test_analyzer.py @@ -165,7 +165,7 @@ def test_repeated_failure_days_threshold_creates_alert(): assert any(alert.type == "repeated_dmarc_failure" for alert, _, _ in alerts) -def test_missing_reporter_threshold_creates_alert(): +def test_missing_reporter_gap_does_not_create_alert(): session = _session() settings = _settings() now = datetime(2026, 5, 16, 12, tzinfo=timezone.utc) @@ -174,4 +174,4 @@ def test_missing_reporter_threshold_creates_alert(): alerts = analyze_report(session, settings, report) - assert any(alert.type == "missing_reporter" for alert, _, _ in alerts) + assert not any(alert.type == "missing_reporter" for alert, _, _ in alerts) diff --git a/tests/test_inbox_locks.py b/tests/test_inbox_locks.py new file mode 100644 index 0000000..e02e393 --- /dev/null +++ b/tests/test_inbox_locks.py @@ -0,0 +1,16 @@ +from app.inbox_locks import InboxRunLocks + + +def test_inbox_run_locks_serialize_different_inboxes(): + locks = InboxRunLocks() + first = locks.acquire("first", blocking=False) + assert first is not None + + try: + assert locks.acquire("second", blocking=False) is None + finally: + first.release() + + second = locks.acquire("second", blocking=False) + assert second is not None + second.release()