from __future__ import annotations import json import logging from datetime import datetime, timedelta, timezone from typing import Any from sqlalchemy import func, select from sqlalchemy.orm import Session from app.config import Settings from app.llm import LLMClient from app.models import Alert, Record, Report, utcnow logger = logging.getLogger(__name__) SEVERITY_RANK = {"info": 0, "warning": 1, "critical": 2} def _as_utc(value: datetime | str | None) -> datetime | None: if value is None: return None if isinstance(value, str): try: value = datetime.fromisoformat(value.replace("Z", "+00:00")) except ValueError: return None if value.tzinfo is None: return value.replace(tzinfo=timezone.utc) return value def _fingerprint(domain: str, alert_type: str, key: str) -> str: return f"{domain}:{alert_type}:{key}" def _merge_details(existing: str, incoming: dict[str, Any]) -> str: try: data = json.loads(existing or "{}") except json.JSONDecodeError: data = {} existing_range = data.get("date_range") if isinstance(data.get("date_range"), dict) else {} incoming_range = incoming.get("date_range") if isinstance(incoming.get("date_range"), dict) else {} report_ids = list(data.get("report_db_ids") or []) if data.get("report_db_id") and data["report_db_id"] not in report_ids: report_ids.append(data["report_db_id"]) if incoming.get("report_db_id") and incoming["report_db_id"] not in report_ids: report_ids.append(incoming["report_db_id"]) data.update(incoming) if "count" in data and "count" in incoming: data["count"] = max(int(data["count"]), int(incoming["count"])) if existing_range or incoming_range: begins = [item for item in [existing_range.get("begin"), incoming_range.get("begin")] if item] ends = [item for item in [existing_range.get("end"), incoming_range.get("end")] if item] data["date_range"] = { "begin": min(begins) if begins else None, "end": max(ends) if ends else None, } if report_ids: data["report_db_ids"] = report_ids[-25:] data["report_db_id"] = incoming.get("report_db_id") or data.get("report_db_id") return json.dumps(data, sort_keys=True) def create_or_update_alert( session: Session, *, inbox_id: str, domain: str, severity: str, alert_type: str, key: str, title: str, summary: str, details: dict[str, Any], ) -> tuple[Alert, bool, bool]: fp = _fingerprint(domain, alert_type, key) alert = session.scalar(select(Alert).where(Alert.fingerprint == fp, Alert.status == "open")) now = utcnow() if alert: previous = alert.severity if SEVERITY_RANK[severity] > SEVERITY_RANK[alert.severity]: alert.severity = severity alert.title = title alert.summary = summary alert.details_json = _merge_details(alert.details_json, details) alert.last_seen_at = now alert.updated_at = now return alert, False, SEVERITY_RANK[alert.severity] > SEVERITY_RANK[previous] alert = Alert( fingerprint=fp, inbox_id=inbox_id, domain=domain, severity=severity, type=alert_type, title=title, summary=summary, details_json=json.dumps(details, sort_keys=True, default=str), first_seen_at=now, last_seen_at=now, ) session.add(alert) session.flush() return alert, True, False def _record_details(record: Record, report: Report) -> dict[str, Any]: return { "source_ip": record.source_ip, "count": record.count, "spf_aligned": record.spf_aligned, "dkim_aligned": record.dkim_aligned, "dmarc_pass": record.dmarc_pass, "disposition": record.disposition, "known_sender": record.is_known_sender, "known_sender_id": record.known_sender_id, "reporting_orgs": [report.org_name] if report.org_name else [], "report_db_id": report.id, "report_id": report.report_id, "date_range": { "begin": report.date_begin.isoformat() if report.date_begin else None, "end": report.date_end.isoformat() if report.date_end else None, }, } def _new_authenticated_path_alert(session: Session, record: Record, report: Report, details: dict[str, Any]) -> tuple[Alert, bool, bool] | None: if not record.dmarc_pass or record.is_known_sender: return None if record.dkim_aligned and not record.spf_aligned: return create_or_update_alert( session=session, inbox_id=report.inbox_id, domain=report.domain, severity="info", alert_type="dkim_authenticated_relay", key=record.source_ip, title=f"DKIM-authenticated relay observed for {report.domain}", summary=( f"A receiver observed {record.source_ip} transmitting mail claiming to be from {report.domain}. " "SPF did not align for that observed hop, but DKIM aligned, so DMARC passed. " "This commonly represents forwarding or an intermediary mail gateway, not a sender to add to SPF." ), details=details, ) if record.spf_aligned and record.dkim_aligned: return create_or_update_alert( session=session, inbox_id=report.inbox_id, domain=report.domain, severity="warning", alert_type="new_authenticated_source", key=record.source_ip, title=f"New authenticated source observed for {report.domain}", summary=( f"{record.source_ip} is newly observed and passed DMARC with both SPF and DKIM alignment. " "Confirm whether this is an expected direct sender path before classifying it." ), details=details, ) if record.spf_aligned: return create_or_update_alert( session=session, inbox_id=report.inbox_id, domain=report.domain, severity="warning", alert_type="new_spf_authenticated_source", key=record.source_ip, title=f"New SPF-authenticated source observed for {report.domain}", summary=( f"{record.source_ip} is newly observed and passed DMARC through SPF alignment. " "Confirm whether this is an expected direct sender path before classifying it." ), details=details, ) return None def _report_time(report: Report) -> datetime: return _as_utc(report.date_end or report.date_begin or report.created_at) or utcnow() def _report_day(report: Report) -> datetime: return _report_time(report).replace(hour=0, minute=0, second=0, microsecond=0) def _report_evidence(report: Report, *, link_report: bool = True) -> dict[str, Any]: evidence = { "reporting_orgs": [report.org_name] if report.org_name else [], "date_range": { "begin": report.date_begin.isoformat() if report.date_begin else None, "end": report.date_end.isoformat() if report.date_end else None, }, } if link_report: evidence["report_db_id"] = report.id evidence["report_id"] = report.report_id return evidence def analyze_report(session: Session, settings: Settings, report: Report, llm: LLMClient | None = None) -> list[tuple[Alert, bool, bool]]: created: list[tuple[Alert, bool, bool]] = [] thresholds = settings.alerts.thresholds for record in report.records: details = _record_details(record, report) if not record.is_known_sender and not record.spf_aligned and not record.dkim_aligned and record.count >= thresholds.unknown_source_fail_count: created.append( create_or_update_alert( session, inbox_id=report.inbox_id, domain=report.domain, severity="critical", alert_type="unknown_source_failed_both", key=record.source_ip, title=f"Unknown source failed SPF and DKIM for {report.domain}", summary=f"{record.source_ip} sent {record.count} messages that failed SPF and DKIM alignment.", details=details, ) ) if record.is_known_sender and not record.dmarc_pass and record.count >= thresholds.min_messages_for_rate_alert: created.append( create_or_update_alert( session, inbox_id=report.inbox_id, domain=report.domain, severity="critical", alert_type="known_sender_dmarc_failure", key=record.known_sender_id or record.source_ip, title=f"Known sender failed DMARC for {report.domain}", summary=f"{record.known_sender_name or record.source_ip} failed DMARC for {record.count} messages.", details=details, ) ) if record.disposition in {"quarantine", "reject"} and record.count > 0: created.append( create_or_update_alert( session, inbox_id=report.inbox_id, domain=report.domain, severity="critical", alert_type="quarantine_or_reject_seen", key=f"{record.disposition}:{record.source_ip}", title=f"{record.disposition.title()} disposition seen for {report.domain}", summary=f"Receiver applied {record.disposition} to {record.count} messages.", details=details, ) ) existing_source = session.scalar( select(func.count(Record.id)) .join(Report) .where(Report.domain == report.domain, Record.source_ip == record.source_ip, Record.id != record.id) ) if not existing_source and not record.dmarc_pass: created.append( create_or_update_alert( session, inbox_id=report.inbox_id, domain=report.domain, severity="warning", alert_type="new_unknown_source", key=record.source_ip, title=f"New unknown failing source for {report.domain}", summary=f"{record.source_ip} is newly observed and failed DMARC.", details=details, ) ) if not existing_source and record.dmarc_pass and not record.is_known_sender: alert = _new_authenticated_path_alert(session, record, report, details) if alert: created.append(alert) created.extend(_rate_alerts(session, settings, report)) created.extend(_reporter_alerts(session, settings, report)) if llm and settings.llm.generate_alert_explanations: for alert, is_new, severity_increased in created: if (is_new or severity_increased) and alert.severity in {"warning", "critical"}: explanation = llm.explain_alert(alert) alert.llm_summary = explanation.summary alert.llm_risk = explanation.risk alert.llm_recommended_action = explanation.recommended_action return created def _rate_alerts(session: Session, settings: Settings, report: Report) -> list[tuple[Alert, bool, bool]]: thresholds = settings.alerts.thresholds period_start = _as_utc(report.date_begin) or _report_time(report).replace(hour=0, minute=0, second=0, microsecond=0) period_end = _as_utc(report.date_end) or (period_start + timedelta(days=1)) current_rows = session.execute( select(Record, Report) .join(Report) .where( Report.domain == report.domain, func.coalesce(Report.date_end, Report.date_begin, Report.created_at) >= period_start, func.coalesce(Report.date_end, Report.date_begin, Report.created_at) <= period_end, ) ).all() current_records = [row for row, _ in current_rows] total = sum(row.count for row in current_records) if total < thresholds.min_messages_for_rate_alert: return _repeated_failure_alerts(session, settings, report, current_records) alerts: list[tuple[Alert, bool, bool]] = [] evidence = _report_evidence(report, link_report=False) unknown_fail = sum(row.count for row in current_records if not row.is_known_sender and not row.dmarc_pass) unknown_fail_rate = unknown_fail / total * 100 if total else 0 if unknown_fail >= thresholds.unknown_source_fail_count and unknown_fail_rate >= thresholds.unknown_source_fail_rate_percent: alerts.append( create_or_update_alert( session, inbox_id=report.inbox_id, domain=report.domain, severity="critical", alert_type="high_unknown_source_failure_rate", key="global", title=f"High unknown source failure rate for {report.domain}", summary=f"Unknown sources failed DMARC for {unknown_fail} of {total} messages ({unknown_fail_rate:.1f}%).", details={**evidence, "failed_messages": unknown_fail, "total_messages": total, "failure_rate_percent": unknown_fail_rate}, ) ) known_total = sum(row.count for row in current_records if row.is_known_sender) known_fail = sum(row.count for row in current_records if row.is_known_sender and not row.dmarc_pass) known_fail_rate = known_fail / known_total * 100 if known_total else 0 if known_total >= thresholds.min_messages_for_rate_alert and known_fail_rate >= thresholds.known_source_fail_rate_percent: alerts.append( create_or_update_alert( session, inbox_id=report.inbox_id, domain=report.domain, severity="critical", alert_type="high_known_source_failure_rate", key="global", title=f"High known sender failure rate for {report.domain}", summary=f"Known senders failed DMARC for {known_fail} of {known_total} messages ({known_fail_rate:.1f}%).", details={ **evidence, "failed_messages": known_fail, "known_sender_messages": known_total, "failure_rate_percent": known_fail_rate, }, ) ) report_time = _report_time(report) recent_start = report_time - timedelta(days=1) trailing_start = report_time - timedelta(days=8) trend_rows = session.execute( select(Record, Report) .join(Report) .where( Report.domain == report.domain, func.coalesce(Report.date_end, Report.date_begin, Report.created_at) >= trailing_start, func.coalesce(Report.date_end, Report.date_begin, Report.created_at) <= report_time, ) ).all() current_unknown = sum( row.count for row, row_report in trend_rows if not row.is_known_sender and not row.dmarc_pass and _report_time(row_report) >= recent_start ) trailing = sum( row.count for row, row_report in trend_rows if not row.is_known_sender and not row.dmarc_pass and trailing_start <= _report_time(row_report) < recent_start ) trailing_avg = trailing / 7 if trailing else 0 if trailing_avg and current_unknown > thresholds.total_volume_spike_multiplier * trailing_avg and current_unknown >= thresholds.unknown_source_fail_count: alerts.append( create_or_update_alert( session, inbox_id=report.inbox_id, domain=report.domain, severity="critical", alert_type="sudden_unknown_failure_spike", key="global", title=f"Unknown failure spike for {report.domain}", summary=f"Unknown failed volume is {current_unknown}, above the trailing 7-day average of {trailing_avg:.1f}.", details={**evidence, "current_24h": current_unknown, "trailing_7d_avg": trailing_avg}, ) ) trailing_volume = sum(row.count for row, row_report in trend_rows if trailing_start <= _report_time(row_report) < recent_start) trailing_volume_avg = trailing_volume / 7 if trailing_volume else 0 drop_threshold = max(0, 1 - thresholds.total_volume_drop_percent / 100) if trailing_volume_avg and total <= trailing_volume_avg * drop_threshold: alerts.append( create_or_update_alert( session, inbox_id=report.inbox_id, domain=report.domain, severity="warning", alert_type="total_volume_drop", key="global", title=f"DMARC report volume dropped for {report.domain}", summary=f"Current report volume is {total}, below the trailing 7-day average of {trailing_volume_avg:.1f}.", details={**evidence, "current_messages": total, "trailing_7d_avg": trailing_volume_avg}, ) ) alerts.extend(_repeated_failure_alerts(session, settings, report, current_records)) return alerts def _repeated_failure_alerts( session: Session, settings: Settings, report: Report, current_records: list[Record], ) -> list[tuple[Alert, bool, bool]]: thresholds = settings.alerts.thresholds days = max(1, thresholds.repeated_failure_days) if days <= 1: return [] report_day = _report_day(report) start = report_day - timedelta(days=days - 1) end = report_day + timedelta(days=1) alerts: list[tuple[Alert, bool, bool]] = [] sources = {row.source_ip: row for row in current_records if not row.dmarc_pass} for source_ip, current_record in sources.items(): rows = session.execute( select(Record, Report) .join(Report) .where( Report.domain == report.domain, Record.source_ip == source_ip, Record.dmarc_pass.is_(False), func.coalesce(Report.date_end, Report.date_begin, Report.created_at) >= start, func.coalesce(Report.date_end, Report.date_begin, Report.created_at) < end, ) ).all() failure_days = sorted({_report_day(row_report).date().isoformat() for _, row_report in rows}) if len(failure_days) < days: continue failed_messages = sum(row.count for row, _ in rows) severity = "critical" if current_record.is_known_sender else "warning" sender_label = current_record.known_sender_name or source_ip alerts.append( create_or_update_alert( session, inbox_id=report.inbox_id, domain=report.domain, severity=severity, alert_type="repeated_dmarc_failure", key=source_ip, title=f"Repeated DMARC failure for {sender_label}", summary=f"{sender_label} failed DMARC on {len(failure_days)} report days in the last {days} days.", details={ **_record_details(current_record, report), "failure_days": failure_days, "window_days": days, "failed_messages": failed_messages, }, ) ) return alerts def _reporter_alerts(session: Session, settings: Settings, report: Report) -> list[tuple[Alert, bool, bool]]: alerts: list[tuple[Alert, bool, bool]] = [] if report.org_name: existing = session.scalar( select(func.count(Report.id)).where(Report.domain == report.domain, Report.org_name == report.org_name, Report.id != report.id) ) if not existing: alerts.append( create_or_update_alert( session, inbox_id=report.inbox_id, domain=report.domain, severity="info", alert_type="new_reporter", key=report.org_name, title=f"New DMARC reporter for {report.domain}", summary=f"{report.org_name} sent its first observed aggregate report.", details={**_report_evidence(report), "reporter": report.org_name}, ) ) first_domain_report = session.scalar(select(func.count(Report.id)).where(Report.domain == report.domain, Report.id != report.id)) if not first_domain_report: alerts.append( create_or_update_alert( session, inbox_id=report.inbox_id, domain=report.domain, severity="info", alert_type="policy_seen", key="policy", title=f"DMARC policy seen for {report.domain}", summary=f"Published policy p={report.policy_p}, sp={report.policy_sp}, pct={report.policy_pct}.", details={**_report_evidence(report), "policy_p": report.policy_p, "policy_sp": report.policy_sp, "policy_pct": report.policy_pct}, ) ) return alerts