Files

552 lines
24 KiB
Python

from __future__ import annotations
import json
import logging
from datetime import datetime, timedelta, timezone
from typing import Any
from sqlalchemy import func, select
from sqlalchemy.orm import Session
from app.config import Settings
from app.llm import LLMClient
from app.models import Alert, Record, Report, utcnow
logger = logging.getLogger(__name__)
SEVERITY_RANK = {"info": 0, "warning": 1, "critical": 2}
def _as_utc(value: datetime | str | None) -> datetime | None:
if value is None:
return None
if isinstance(value, str):
try:
value = datetime.fromisoformat(value.replace("Z", "+00:00"))
except ValueError:
return None
if value.tzinfo is None:
return value.replace(tzinfo=timezone.utc)
return value
def _fingerprint(domain: str, alert_type: str, key: str) -> str:
return f"{domain}:{alert_type}:{key}"
def _merge_details(existing: str, incoming: dict[str, Any]) -> str:
try:
data = json.loads(existing or "{}")
except json.JSONDecodeError:
data = {}
existing_range = data.get("date_range") if isinstance(data.get("date_range"), dict) else {}
incoming_range = incoming.get("date_range") if isinstance(incoming.get("date_range"), dict) else {}
report_ids = list(data.get("report_db_ids") or [])
if data.get("report_db_id") and data["report_db_id"] not in report_ids:
report_ids.append(data["report_db_id"])
if incoming.get("report_db_id") and incoming["report_db_id"] not in report_ids:
report_ids.append(incoming["report_db_id"])
data.update(incoming)
if "count" in data and "count" in incoming:
data["count"] = max(int(data["count"]), int(incoming["count"]))
if existing_range or incoming_range:
begins = [item for item in [existing_range.get("begin"), incoming_range.get("begin")] if item]
ends = [item for item in [existing_range.get("end"), incoming_range.get("end")] if item]
data["date_range"] = {
"begin": min(begins) if begins else None,
"end": max(ends) if ends else None,
}
if report_ids:
data["report_db_ids"] = report_ids[-25:]
data["report_db_id"] = incoming.get("report_db_id") or data.get("report_db_id")
return json.dumps(data, sort_keys=True)
def _domain_equal(a: str | None, b: str | None) -> bool:
return (a or "").lower().rstrip(".") == (b or "").lower().rstrip(".")
def _is_subdomain(child: str | None, parent: str | None) -> bool:
child_norm = (child or "").lower().rstrip(".")
parent_norm = (parent or "").lower().rstrip(".")
return bool(child_norm and parent_norm and child_norm.endswith(f".{parent_norm}"))
def _published_policy(record: Record, report: Report) -> dict[str, Any]:
effective_source = "p"
effective = report.policy_p
if _is_subdomain(record.header_from, report.domain) and report.policy_sp:
effective_source = "sp"
effective = report.policy_sp
elif not _domain_equal(record.header_from, report.domain) and record.header_from:
effective_source = "p"
effective = report.policy_p
return {
"domain": report.domain,
"header_from": record.header_from,
"p": report.policy_p,
"sp": report.policy_sp,
"pct": report.policy_pct,
"effective": effective,
"effective_source": effective_source,
"adkim": report.adkim,
"aspf": report.aspf,
"fo": report.fo,
}
def _receiver_action(record: Record) -> dict[str, Any]:
return {
"disposition": record.disposition,
"override_type": record.reason_type,
"override_comment": record.reason_comment,
}
def _policy_context_sentence(record: Record, report: Report) -> str:
published = _published_policy(record, report)
receiver = _receiver_action(record)
effective = published.get("effective") or "unspecified"
source = published.get("effective_source") or "p"
disposition = receiver.get("disposition") or "none"
pct = published.get("pct")
pct_text = f"; pct={pct}" if pct is not None else ""
override = receiver.get("override_type")
override_text = f" with override {override}" if override else ""
return f"Published DMARC policy was {source}={effective}{pct_text}; receiver disposition was {disposition}{override_text}."
def create_or_update_alert(
session: Session,
*,
inbox_id: str,
domain: str,
severity: str,
alert_type: str,
key: str,
title: str,
summary: str,
details: dict[str, Any],
) -> tuple[Alert, bool, bool]:
fp = _fingerprint(domain, alert_type, key)
alert = session.scalar(select(Alert).where(Alert.fingerprint == fp, Alert.status == "open"))
now = utcnow()
if alert:
previous = alert.severity
if SEVERITY_RANK[severity] > SEVERITY_RANK[alert.severity]:
alert.severity = severity
alert.title = title
alert.summary = summary
alert.details_json = _merge_details(alert.details_json, details)
alert.last_seen_at = now
alert.updated_at = now
return alert, False, SEVERITY_RANK[alert.severity] > SEVERITY_RANK[previous]
alert = Alert(
fingerprint=fp,
inbox_id=inbox_id,
domain=domain,
severity=severity,
type=alert_type,
title=title,
summary=summary,
details_json=json.dumps(details, sort_keys=True, default=str),
first_seen_at=now,
last_seen_at=now,
)
session.add(alert)
session.flush()
return alert, True, False
def _record_details(record: Record, report: Report) -> dict[str, Any]:
return {
"source_ip": record.source_ip,
"count": record.count,
"spf_aligned": record.spf_aligned,
"dkim_aligned": record.dkim_aligned,
"dmarc_pass": record.dmarc_pass,
"disposition": record.disposition,
"policy_p": report.policy_p,
"policy_sp": report.policy_sp,
"policy_pct": report.policy_pct,
"published_policy": _published_policy(record, report),
"receiver_action": _receiver_action(record),
"known_sender": record.is_known_sender,
"known_sender_id": record.known_sender_id,
"reporting_orgs": [report.org_name] if report.org_name else [],
"report_db_id": report.id,
"report_id": report.report_id,
"date_range": {
"begin": report.date_begin.isoformat() if report.date_begin else None,
"end": report.date_end.isoformat() if report.date_end else None,
},
}
def _new_authenticated_path_alert(session: Session, record: Record, report: Report, details: dict[str, Any]) -> tuple[Alert, bool, bool] | None:
if not record.dmarc_pass or record.is_known_sender:
return None
if record.dkim_aligned and not record.spf_aligned:
return create_or_update_alert(
session=session,
inbox_id=report.inbox_id,
domain=report.domain,
severity="info",
alert_type="dkim_authenticated_relay",
key=record.source_ip,
title=f"DKIM-authenticated relay observed for {report.domain}",
summary=(
f"A receiver observed {record.source_ip} transmitting mail claiming to be from {report.domain}. "
"SPF did not align for that observed hop, but DKIM aligned, so DMARC passed. "
"This commonly represents forwarding or an intermediary mail gateway, not a sender to add to SPF."
),
details=details,
)
if record.spf_aligned and record.dkim_aligned:
return create_or_update_alert(
session=session,
inbox_id=report.inbox_id,
domain=report.domain,
severity="warning",
alert_type="new_authenticated_source",
key=record.source_ip,
title=f"New authenticated source observed for {report.domain}",
summary=(
f"{record.source_ip} is newly observed and passed DMARC with both SPF and DKIM alignment. "
"Confirm whether this is an expected direct sender path before classifying it."
),
details=details,
)
if record.spf_aligned:
return create_or_update_alert(
session=session,
inbox_id=report.inbox_id,
domain=report.domain,
severity="warning",
alert_type="new_spf_authenticated_source",
key=record.source_ip,
title=f"New SPF-authenticated source observed for {report.domain}",
summary=(
f"{record.source_ip} is newly observed and passed DMARC through SPF alignment. "
"Confirm whether this is an expected direct sender path before classifying it."
),
details=details,
)
return None
def _report_time(report: Report) -> datetime:
return _as_utc(report.date_end or report.date_begin or report.created_at) or utcnow()
def _report_day(report: Report) -> datetime:
return _report_time(report).replace(hour=0, minute=0, second=0, microsecond=0)
def _report_evidence(report: Report, *, link_report: bool = True) -> dict[str, Any]:
evidence = {
"reporting_orgs": [report.org_name] if report.org_name else [],
"date_range": {
"begin": report.date_begin.isoformat() if report.date_begin else None,
"end": report.date_end.isoformat() if report.date_end else None,
},
}
if link_report:
evidence["report_db_id"] = report.id
evidence["report_id"] = report.report_id
return evidence
def analyze_report(session: Session, settings: Settings, report: Report, llm: LLMClient | None = None) -> list[tuple[Alert, bool, bool]]:
created: list[tuple[Alert, bool, bool]] = []
thresholds = settings.alerts.thresholds
for record in report.records:
details = _record_details(record, report)
if not record.is_known_sender and not record.spf_aligned and not record.dkim_aligned and record.count >= thresholds.unknown_source_fail_count:
policy_context = _policy_context_sentence(record, report)
created.append(
create_or_update_alert(
session,
inbox_id=report.inbox_id,
domain=report.domain,
severity="critical",
alert_type="unknown_source_failed_both",
key=record.source_ip,
title=f"Unknown source failed SPF and DKIM for {report.domain}",
summary=f"{record.source_ip} sent {record.count} messages that failed SPF and DKIM alignment. {policy_context}",
details=details,
)
)
if record.is_known_sender and not record.dmarc_pass and record.count >= thresholds.min_messages_for_rate_alert:
policy_context = _policy_context_sentence(record, report)
created.append(
create_or_update_alert(
session,
inbox_id=report.inbox_id,
domain=report.domain,
severity="critical",
alert_type="known_sender_dmarc_failure",
key=record.known_sender_id or record.source_ip,
title=f"Known sender failed DMARC for {report.domain}",
summary=f"{record.known_sender_name or record.source_ip} failed DMARC for {record.count} messages. {policy_context}",
details=details,
)
)
if record.disposition in {"quarantine", "reject"} and record.count > 0:
policy_context = _policy_context_sentence(record, report)
created.append(
create_or_update_alert(
session,
inbox_id=report.inbox_id,
domain=report.domain,
severity="critical",
alert_type="quarantine_or_reject_seen",
key=f"{record.disposition}:{record.source_ip}",
title=f"{record.disposition.title()} disposition seen for {report.domain}",
summary=f"Receiver applied {record.disposition} to {record.count} messages. {policy_context}",
details=details,
)
)
existing_source = session.scalar(
select(func.count(Record.id))
.join(Report)
.where(Report.domain == report.domain, Record.source_ip == record.source_ip, Record.id != record.id)
)
if not existing_source and not record.dmarc_pass:
created.append(
create_or_update_alert(
session,
inbox_id=report.inbox_id,
domain=report.domain,
severity="warning",
alert_type="new_unknown_source",
key=record.source_ip,
title=f"New unknown failing source for {report.domain}",
summary=f"{record.source_ip} is newly observed and failed DMARC. {_policy_context_sentence(record, report)}",
details=details,
)
)
if not existing_source and record.dmarc_pass and not record.is_known_sender:
alert = _new_authenticated_path_alert(session, record, report, details)
if alert:
created.append(alert)
created.extend(_rate_alerts(session, settings, report))
created.extend(_reporter_alerts(session, settings, report))
if llm and settings.llm.generate_alert_explanations:
for alert, is_new, severity_increased in created:
if (is_new or severity_increased) and alert.severity in {"warning", "critical"}:
explanation = llm.explain_alert(alert)
alert.llm_summary = explanation.summary
alert.llm_risk = explanation.risk
alert.llm_recommended_action = explanation.recommended_action
return created
def _rate_alerts(session: Session, settings: Settings, report: Report) -> list[tuple[Alert, bool, bool]]:
thresholds = settings.alerts.thresholds
period_start = _as_utc(report.date_begin) or _report_time(report).replace(hour=0, minute=0, second=0, microsecond=0)
period_end = _as_utc(report.date_end) or (period_start + timedelta(days=1))
current_rows = session.execute(
select(Record, Report)
.join(Report)
.where(
Report.domain == report.domain,
func.coalesce(Report.date_end, Report.date_begin, Report.created_at) >= period_start,
func.coalesce(Report.date_end, Report.date_begin, Report.created_at) <= period_end,
)
).all()
current_records = [row for row, _ in current_rows]
total = sum(row.count for row in current_records)
if total < thresholds.min_messages_for_rate_alert:
return _repeated_failure_alerts(session, settings, report, current_records)
alerts: list[tuple[Alert, bool, bool]] = []
evidence = _report_evidence(report, link_report=False)
unknown_fail = sum(row.count for row in current_records if not row.is_known_sender and not row.dmarc_pass)
unknown_fail_rate = unknown_fail / total * 100 if total else 0
if unknown_fail >= thresholds.unknown_source_fail_count and unknown_fail_rate >= thresholds.unknown_source_fail_rate_percent:
alerts.append(
create_or_update_alert(
session,
inbox_id=report.inbox_id,
domain=report.domain,
severity="critical",
alert_type="high_unknown_source_failure_rate",
key="global",
title=f"High unknown source failure rate for {report.domain}",
summary=f"Unknown sources failed DMARC for {unknown_fail} of {total} messages ({unknown_fail_rate:.1f}%).",
details={**evidence, "failed_messages": unknown_fail, "total_messages": total, "failure_rate_percent": unknown_fail_rate},
)
)
known_total = sum(row.count for row in current_records if row.is_known_sender)
known_fail = sum(row.count for row in current_records if row.is_known_sender and not row.dmarc_pass)
known_fail_rate = known_fail / known_total * 100 if known_total else 0
if known_total >= thresholds.min_messages_for_rate_alert and known_fail_rate >= thresholds.known_source_fail_rate_percent:
alerts.append(
create_or_update_alert(
session,
inbox_id=report.inbox_id,
domain=report.domain,
severity="critical",
alert_type="high_known_source_failure_rate",
key="global",
title=f"High known sender failure rate for {report.domain}",
summary=f"Known senders failed DMARC for {known_fail} of {known_total} messages ({known_fail_rate:.1f}%).",
details={
**evidence,
"failed_messages": known_fail,
"known_sender_messages": known_total,
"failure_rate_percent": known_fail_rate,
},
)
)
report_time = _report_time(report)
recent_start = report_time - timedelta(days=1)
trailing_start = report_time - timedelta(days=8)
trend_rows = session.execute(
select(Record, Report)
.join(Report)
.where(
Report.domain == report.domain,
func.coalesce(Report.date_end, Report.date_begin, Report.created_at) >= trailing_start,
func.coalesce(Report.date_end, Report.date_begin, Report.created_at) <= report_time,
)
).all()
current_unknown = sum(
row.count
for row, row_report in trend_rows
if not row.is_known_sender and not row.dmarc_pass and _report_time(row_report) >= recent_start
)
trailing = sum(
row.count
for row, row_report in trend_rows
if not row.is_known_sender
and not row.dmarc_pass
and trailing_start <= _report_time(row_report) < recent_start
)
trailing_avg = trailing / 7 if trailing else 0
if trailing_avg and current_unknown > thresholds.total_volume_spike_multiplier * trailing_avg and current_unknown >= thresholds.unknown_source_fail_count:
alerts.append(
create_or_update_alert(
session,
inbox_id=report.inbox_id,
domain=report.domain,
severity="critical",
alert_type="sudden_unknown_failure_spike",
key="global",
title=f"Unknown failure spike for {report.domain}",
summary=f"Unknown failed volume is {current_unknown}, above the trailing 7-day average of {trailing_avg:.1f}.",
details={**evidence, "current_24h": current_unknown, "trailing_7d_avg": trailing_avg},
)
)
trailing_volume = sum(row.count for row, row_report in trend_rows if trailing_start <= _report_time(row_report) < recent_start)
trailing_volume_avg = trailing_volume / 7 if trailing_volume else 0
drop_threshold = max(0, 1 - thresholds.total_volume_drop_percent / 100)
if trailing_volume_avg and total <= trailing_volume_avg * drop_threshold:
alerts.append(
create_or_update_alert(
session,
inbox_id=report.inbox_id,
domain=report.domain,
severity="warning",
alert_type="total_volume_drop",
key="global",
title=f"DMARC report volume dropped for {report.domain}",
summary=f"Current report volume is {total}, below the trailing 7-day average of {trailing_volume_avg:.1f}.",
details={**evidence, "current_messages": total, "trailing_7d_avg": trailing_volume_avg},
)
)
alerts.extend(_repeated_failure_alerts(session, settings, report, current_records))
return alerts
def _repeated_failure_alerts(
session: Session,
settings: Settings,
report: Report,
current_records: list[Record],
) -> list[tuple[Alert, bool, bool]]:
thresholds = settings.alerts.thresholds
days = max(1, thresholds.repeated_failure_days)
if days <= 1:
return []
report_day = _report_day(report)
start = report_day - timedelta(days=days - 1)
end = report_day + timedelta(days=1)
alerts: list[tuple[Alert, bool, bool]] = []
sources = {row.source_ip: row for row in current_records if not row.dmarc_pass}
for source_ip, current_record in sources.items():
rows = session.execute(
select(Record, Report)
.join(Report)
.where(
Report.domain == report.domain,
Record.source_ip == source_ip,
Record.dmarc_pass.is_(False),
func.coalesce(Report.date_end, Report.date_begin, Report.created_at) >= start,
func.coalesce(Report.date_end, Report.date_begin, Report.created_at) < end,
)
).all()
failure_days = sorted({_report_day(row_report).date().isoformat() for _, row_report in rows})
if len(failure_days) < days:
continue
failed_messages = sum(row.count for row, _ in rows)
severity = "critical" if current_record.is_known_sender else "warning"
sender_label = current_record.known_sender_name or source_ip
alerts.append(
create_or_update_alert(
session,
inbox_id=report.inbox_id,
domain=report.domain,
severity=severity,
alert_type="repeated_dmarc_failure",
key=source_ip,
title=f"Repeated DMARC failure for {sender_label}",
summary=f"{sender_label} failed DMARC on {len(failure_days)} report days in the last {days} days.",
details={
**_record_details(current_record, report),
"failure_days": failure_days,
"window_days": days,
"failed_messages": failed_messages,
},
)
)
return alerts
def _reporter_alerts(session: Session, settings: Settings, report: Report) -> list[tuple[Alert, bool, bool]]:
alerts: list[tuple[Alert, bool, bool]] = []
if report.org_name:
existing = session.scalar(
select(func.count(Report.id)).where(Report.domain == report.domain, Report.org_name == report.org_name, Report.id != report.id)
)
if not existing:
alerts.append(
create_or_update_alert(
session,
inbox_id=report.inbox_id,
domain=report.domain,
severity="info",
alert_type="new_reporter",
key=report.org_name,
title=f"New DMARC reporter for {report.domain}",
summary=f"{report.org_name} sent its first observed aggregate report.",
details={**_report_evidence(report), "reporter": report.org_name},
)
)
first_domain_report = session.scalar(select(func.count(Report.id)).where(Report.domain == report.domain, Report.id != report.id))
if not first_domain_report:
alerts.append(
create_or_update_alert(
session,
inbox_id=report.inbox_id,
domain=report.domain,
severity="info",
alert_type="policy_seen",
key="policy",
title=f"DMARC policy seen for {report.domain}",
summary=f"Published policy p={report.policy_p}, sp={report.policy_sp}, pct={report.policy_pct}.",
details={**_report_evidence(report), "policy_p": report.policy_p, "policy_sp": report.policy_sp, "policy_pct": report.policy_pct},
)
)
return alerts