import json from datetime import datetime, timedelta, timezone from sqlalchemy import create_engine from sqlalchemy.orm import Session from app.analyzer import analyze_report from app.config import Settings from app.db import Base from app.models import Record, Report def _session(): engine = create_engine("sqlite:///:memory:", future=True) Base.metadata.create_all(engine) return Session(engine) def _settings() -> Settings: return Settings.model_validate( { "inboxes": [], "known_senders": { "tukutoi.com": [ {"id": "mailcow", "name": "mailcow outbound", "ip_allowlist": ["198.51.100.5/32"], "dkim_domains": [], "spf_domains": []} ] }, "alerts": {"email": {"enabled": False}}, } ) def _report( session: Session, *, source_ip: str, count: int, known: bool, dmarc_pass: bool, spf_aligned: bool = False, dkim_aligned: bool | None = None, report_time: datetime | None = None, org_name: str = "google.com", policy_p: str | None = None, policy_sp: str | None = None, policy_pct: int | None = None, disposition: str = "none", reason_type: str | None = None, ) -> Report: dkim_aligned = dmarc_pass if dkim_aligned is None else dkim_aligned report_time = report_time or datetime.now(timezone.utc) report = Report( inbox_id="tukutoi", raw_xml_sha256=f"sha-{source_ip}-{count}-{known}-{dmarc_pass}-{spf_aligned}-{dkim_aligned}-{report_time.isoformat()}-{org_name}", report_id=f"r-{source_ip}-{report_time.isoformat()}", org_name=org_name, domain="tukutoi.com", date_begin=report_time - timedelta(hours=1), date_end=report_time, policy_p=policy_p, policy_sp=policy_sp, policy_pct=policy_pct, ) session.add(report) session.flush() session.add( Record( report=report, source_ip=source_ip, count=count, disposition=disposition, policy_dkim="pass" if dkim_aligned else "fail", policy_spf="pass" if spf_aligned else "fail", dkim_aligned=dkim_aligned, spf_aligned=spf_aligned, dmarc_pass=dmarc_pass, header_from="tukutoi.com", known_sender_id="mailcow" if known else None, known_sender_name="mailcow outbound" if known else None, is_known_sender=known, reason_type=reason_type, ) ) session.commit() return report def test_unknown_source_failed_both_alert(): session = _session() report = _report(session, source_ip="203.0.113.10", count=25, known=False, dmarc_pass=False) alerts = analyze_report(session, _settings(), report) assert any(alert.type == "unknown_source_failed_both" and alert.severity == "critical" for alert, _, _ in alerts) def test_known_sender_failure_alert(): session = _session() report = _report(session, source_ip="198.51.100.5", count=25, known=True, dmarc_pass=False) alerts = analyze_report(session, _settings(), report) assert any(alert.type == "known_sender_dmarc_failure" and alert.severity == "critical" for alert, _, _ in alerts) def test_dkim_authenticated_relay_is_info_not_sender_warning(): session = _session() report = _report( session, source_ip="209.85.220.69", count=1, known=False, dmarc_pass=True, spf_aligned=False, dkim_aligned=True, ) alerts = analyze_report(session, _settings(), report) relay = next(alert for alert, _, _ in alerts if alert.type == "dkim_authenticated_relay") assert relay.severity == "info" assert "intermediary" in relay.summary assert "add to SPF" in relay.summary assert not any(alert.type == "new_passing_source" for alert, _, _ in alerts) def test_alert_fingerprint_prevents_duplicate_open_alerts(): session = _session() settings = _settings() report = _report(session, source_ip="203.0.113.10", count=25, known=False, dmarc_pass=False) first = analyze_report(session, settings, report) second = analyze_report(session, settings, report) created = [is_new for _, is_new, _ in first + second] assert created.count(True) >= 1 assert created.count(False) >= 1 def test_unknown_failure_spike_uses_trailing_reports_outside_current_period(): session = _session() settings = _settings() now = datetime(2026, 5, 16, 12, tzinfo=timezone.utc) for offset in range(2, 9): _report(session, source_ip=f"203.0.113.{offset}", count=10, known=False, dmarc_pass=False, report_time=now - timedelta(days=offset)) report = _report(session, source_ip="203.0.113.200", count=40, known=False, dmarc_pass=False, report_time=now) alerts = analyze_report(session, settings, report) spike = next(alert for alert, _, _ in alerts if alert.type == "sudden_unknown_failure_spike") details = json.loads(spike.details_json) assert details["current_24h"] == 40 assert details["trailing_7d_avg"] > 0 def test_configured_rate_thresholds_create_alerts(): session = _session() settings = _settings() report = _report(session, source_ip="203.0.113.55", count=25, known=False, dmarc_pass=False) alerts = analyze_report(session, settings, report) assert any(alert.type == "high_unknown_source_failure_rate" for alert, _, _ in alerts) def test_repeated_failure_days_threshold_creates_alert(): session = _session() settings = _settings() now = datetime(2026, 5, 16, 12, tzinfo=timezone.utc) _report(session, source_ip="203.0.113.77", count=8, known=False, dmarc_pass=False, report_time=now - timedelta(days=1)) report = _report(session, source_ip="203.0.113.77", count=8, known=False, dmarc_pass=False, report_time=now) alerts = analyze_report(session, settings, report) assert any(alert.type == "repeated_dmarc_failure" for alert, _, _ in alerts) def test_missing_reporter_gap_does_not_create_alert(): session = _session() settings = _settings() now = datetime(2026, 5, 16, 12, tzinfo=timezone.utc) _report(session, source_ip="203.0.113.88", count=1, known=False, dmarc_pass=True, report_time=now - timedelta(days=5), org_name="old-reporter") report = _report(session, source_ip="203.0.113.89", count=1, known=False, dmarc_pass=True, report_time=now, org_name="current-reporter") alerts = analyze_report(session, settings, report) assert not any(alert.type == "missing_reporter" for alert, _, _ in alerts) def test_alert_details_include_published_policy_and_receiver_action(): session = _session() report = _report( session, source_ip="203.0.113.91", count=25, known=False, dmarc_pass=False, policy_p="reject", policy_sp="quarantine", policy_pct=100, disposition="reject", ) alerts = analyze_report(session, _settings(), report) alert = next(alert for alert, _, _ in alerts if alert.type == "unknown_source_failed_both") details = json.loads(alert.details_json) assert details["published_policy"]["p"] == "reject" assert details["published_policy"]["effective"] == "reject" assert details["published_policy"]["effective_source"] == "p" assert details["receiver_action"]["disposition"] == "reject" assert "Published DMARC policy was p=reject; pct=100" in alert.summary