Files

212 lines
7.3 KiB
Python

import json
from datetime import datetime, timedelta, timezone
from sqlalchemy import create_engine
from sqlalchemy.orm import Session
from app.analyzer import analyze_report
from app.config import Settings
from app.db import Base
from app.models import Record, Report
def _session():
engine = create_engine("sqlite:///:memory:", future=True)
Base.metadata.create_all(engine)
return Session(engine)
def _settings() -> Settings:
return Settings.model_validate(
{
"inboxes": [],
"known_senders": {
"tukutoi.com": [
{"id": "mailcow", "name": "mailcow outbound", "ip_allowlist": ["198.51.100.5/32"], "dkim_domains": [], "spf_domains": []}
]
},
"alerts": {"email": {"enabled": False}},
}
)
def _report(
session: Session,
*,
source_ip: str,
count: int,
known: bool,
dmarc_pass: bool,
spf_aligned: bool = False,
dkim_aligned: bool | None = None,
report_time: datetime | None = None,
org_name: str = "google.com",
policy_p: str | None = None,
policy_sp: str | None = None,
policy_pct: int | None = None,
disposition: str = "none",
reason_type: str | None = None,
) -> Report:
dkim_aligned = dmarc_pass if dkim_aligned is None else dkim_aligned
report_time = report_time or datetime.now(timezone.utc)
report = Report(
inbox_id="tukutoi",
raw_xml_sha256=f"sha-{source_ip}-{count}-{known}-{dmarc_pass}-{spf_aligned}-{dkim_aligned}-{report_time.isoformat()}-{org_name}",
report_id=f"r-{source_ip}-{report_time.isoformat()}",
org_name=org_name,
domain="tukutoi.com",
date_begin=report_time - timedelta(hours=1),
date_end=report_time,
policy_p=policy_p,
policy_sp=policy_sp,
policy_pct=policy_pct,
)
session.add(report)
session.flush()
session.add(
Record(
report=report,
source_ip=source_ip,
count=count,
disposition=disposition,
policy_dkim="pass" if dkim_aligned else "fail",
policy_spf="pass" if spf_aligned else "fail",
dkim_aligned=dkim_aligned,
spf_aligned=spf_aligned,
dmarc_pass=dmarc_pass,
header_from="tukutoi.com",
known_sender_id="mailcow" if known else None,
known_sender_name="mailcow outbound" if known else None,
is_known_sender=known,
reason_type=reason_type,
)
)
session.commit()
return report
def test_unknown_source_failed_both_alert():
session = _session()
report = _report(session, source_ip="203.0.113.10", count=25, known=False, dmarc_pass=False)
alerts = analyze_report(session, _settings(), report)
assert any(alert.type == "unknown_source_failed_both" and alert.severity == "critical" for alert, _, _ in alerts)
def test_known_sender_failure_alert():
session = _session()
report = _report(session, source_ip="198.51.100.5", count=25, known=True, dmarc_pass=False)
alerts = analyze_report(session, _settings(), report)
assert any(alert.type == "known_sender_dmarc_failure" and alert.severity == "critical" for alert, _, _ in alerts)
def test_dkim_authenticated_relay_is_info_not_sender_warning():
session = _session()
report = _report(
session,
source_ip="209.85.220.69",
count=1,
known=False,
dmarc_pass=True,
spf_aligned=False,
dkim_aligned=True,
)
alerts = analyze_report(session, _settings(), report)
relay = next(alert for alert, _, _ in alerts if alert.type == "dkim_authenticated_relay")
assert relay.severity == "info"
assert "intermediary" in relay.summary
assert "add to SPF" in relay.summary
assert not any(alert.type == "new_passing_source" for alert, _, _ in alerts)
def test_alert_fingerprint_prevents_duplicate_open_alerts():
session = _session()
settings = _settings()
report = _report(session, source_ip="203.0.113.10", count=25, known=False, dmarc_pass=False)
first = analyze_report(session, settings, report)
second = analyze_report(session, settings, report)
created = [is_new for _, is_new, _ in first + second]
assert created.count(True) >= 1
assert created.count(False) >= 1
def test_unknown_failure_spike_uses_trailing_reports_outside_current_period():
session = _session()
settings = _settings()
now = datetime(2026, 5, 16, 12, tzinfo=timezone.utc)
for offset in range(2, 9):
_report(session, source_ip=f"203.0.113.{offset}", count=10, known=False, dmarc_pass=False, report_time=now - timedelta(days=offset))
report = _report(session, source_ip="203.0.113.200", count=40, known=False, dmarc_pass=False, report_time=now)
alerts = analyze_report(session, settings, report)
spike = next(alert for alert, _, _ in alerts if alert.type == "sudden_unknown_failure_spike")
details = json.loads(spike.details_json)
assert details["current_24h"] == 40
assert details["trailing_7d_avg"] > 0
def test_configured_rate_thresholds_create_alerts():
session = _session()
settings = _settings()
report = _report(session, source_ip="203.0.113.55", count=25, known=False, dmarc_pass=False)
alerts = analyze_report(session, settings, report)
assert any(alert.type == "high_unknown_source_failure_rate" for alert, _, _ in alerts)
def test_repeated_failure_days_threshold_creates_alert():
session = _session()
settings = _settings()
now = datetime(2026, 5, 16, 12, tzinfo=timezone.utc)
_report(session, source_ip="203.0.113.77", count=8, known=False, dmarc_pass=False, report_time=now - timedelta(days=1))
report = _report(session, source_ip="203.0.113.77", count=8, known=False, dmarc_pass=False, report_time=now)
alerts = analyze_report(session, settings, report)
assert any(alert.type == "repeated_dmarc_failure" for alert, _, _ in alerts)
def test_missing_reporter_gap_does_not_create_alert():
session = _session()
settings = _settings()
now = datetime(2026, 5, 16, 12, tzinfo=timezone.utc)
_report(session, source_ip="203.0.113.88", count=1, known=False, dmarc_pass=True, report_time=now - timedelta(days=5), org_name="old-reporter")
report = _report(session, source_ip="203.0.113.89", count=1, known=False, dmarc_pass=True, report_time=now, org_name="current-reporter")
alerts = analyze_report(session, settings, report)
assert not any(alert.type == "missing_reporter" for alert, _, _ in alerts)
def test_alert_details_include_published_policy_and_receiver_action():
session = _session()
report = _report(
session,
source_ip="203.0.113.91",
count=25,
known=False,
dmarc_pass=False,
policy_p="reject",
policy_sp="quarantine",
policy_pct=100,
disposition="reject",
)
alerts = analyze_report(session, _settings(), report)
alert = next(alert for alert, _, _ in alerts if alert.type == "unknown_source_failed_both")
details = json.loads(alert.details_json)
assert details["published_policy"]["p"] == "reject"
assert details["published_policy"]["effective"] == "reject"
assert details["published_policy"]["effective_source"] == "p"
assert details["receiver_action"]["disposition"] == "reject"
assert "Published DMARC policy was p=reject; pct=100" in alert.summary