Files
DMARC-Sentinel/app/message_processor.py
2026-05-16 12:05:36 -03:00

415 lines
17 KiB
Python

from __future__ import annotations
import logging
import hashlib
import json
from dataclasses import dataclass
from datetime import date
from email.message import Message
from email.utils import getaddresses, parsedate_to_datetime
from typing import Callable
from sqlalchemy import select
from sqlalchemy.orm import Session
from app.alerts import send_alert_email
from app.analyzer import analyze_report
from app.attachment_extractor import AttachmentExtractionError, extract_dmarc_attachments, message_has_candidate_attachment
from app.config import InboxConfig, Settings
from app.dmarc_parser import DMARCParseError, parse_dmarc_xml
from app.imap_client import IMAPClient
from app.known_senders import classify_record
from app.llm import LLMClient
from app.models import Alert, AuthResult, InboxStatus, MailMessage, Record, Report, SkippedReportPayload, utcnow
logger = logging.getLogger(__name__)
@dataclass
class ProcessingSummary:
inbox_id: str
folder: str
scanned_messages: int = 0
processed_messages: int = 0
candidate_messages: int = 0
valid_reports_imported: int = 0
duplicate_messages_skipped: int = 0
duplicate_reports_skipped: int = 0
failed_messages: int = 0
records_imported: int = 0
alerts_created: int = 0
llm_explanations_generated: int = 0
rejected_messages: int = 0
duplicate_report_samples: list[dict[str, str | int | None]] | None = None
@property
def duplicates_skipped(self) -> int:
return self.duplicate_messages_skipped + self.duplicate_reports_skipped
def ensure_inbox_status(session: Session, inbox: InboxConfig) -> InboxStatus:
status = session.scalar(select(InboxStatus).where(InboxStatus.inbox_id == inbox.id))
if not status:
status = InboxStatus(
inbox_id=inbox.id,
label=inbox.label,
domain=inbox.domain,
folder=inbox.folder,
recipient=inbox.recipient,
enabled=inbox.enabled,
)
session.add(status)
session.flush()
else:
status.label = inbox.label
status.domain = inbox.domain
status.folder = inbox.folder
status.recipient = inbox.recipient
status.enabled = inbox.enabled
return status
def _headers(message: Message, names: list[str]) -> str:
return " ".join(str(message.get(name, "")) for name in names)
def is_candidate_message(message: Message, inbox: InboxConfig) -> bool:
recipients = _headers(message, ["To", "Cc", "Bcc", "Delivered-To", "X-Original-To", "Envelope-To"]).lower()
subject = str(message.get("Subject", ""))
return (
inbox.recipient.lower() in recipients
or "dmarc" in subject.lower()
or inbox.domain.lower() in subject.lower()
or "report domain" in subject.lower()
or message_has_candidate_attachment(message)
)
def _message_date(message: Message):
try:
parsed = parsedate_to_datetime(message.get("Date"))
return parsed if parsed.tzinfo else parsed.replace(tzinfo=utcnow().tzinfo)
except Exception:
return None
def _recipient(message: Message) -> str | None:
values = _headers(message, ["To", "Cc", "Bcc", "Delivered-To", "X-Original-To"])
addrs = [addr for _, addr in getaddresses([values]) if addr]
return ", ".join(addrs) or None
def _upsert_mail_message(session: Session, inbox: InboxConfig, folder: str, imap_message, status: str = "skipped") -> MailMessage:
existing = session.scalar(
select(MailMessage).where(
MailMessage.inbox_id == inbox.id,
MailMessage.folder == folder,
MailMessage.imap_uid == imap_message.uid,
)
)
if existing:
return existing
message = imap_message.message
mail = MailMessage(
inbox_id=inbox.id,
imap_uid=imap_message.uid,
message_id=message.get("Message-ID"),
folder=folder,
subject=message.get("Subject"),
sender=message.get("From"),
recipient=_recipient(message),
message_date=_message_date(message),
seen=imap_message.seen,
status=status,
)
session.add(mail)
session.flush()
return mail
def _domain_equal(a: str | None, b: str | None) -> bool:
return (a or "").lower().rstrip(".") == (b or "").lower().rstrip(".")
def _record_ingestion_rejection(
session: Session,
inbox: InboxConfig,
mail: MailMessage,
reason: str,
*,
stage: str,
) -> tuple[Alert, bool]:
digest = hashlib.sha256(f"{mail.inbox_id}:{mail.folder}:{mail.imap_uid}:{stage}:{reason}".encode()).hexdigest()[:24]
fingerprint = f"{inbox.domain}:ingestion_rejected:{digest}"
details = {
"stage": stage,
"reason": reason,
"inbox_id": inbox.id,
"folder": mail.folder,
"imap_uid": mail.imap_uid,
"message_id": mail.message_id,
"subject": mail.subject,
"sender": mail.sender,
}
existing = session.scalar(select(Alert).where(Alert.fingerprint == fingerprint, Alert.status == "open"))
now = utcnow()
if existing:
existing.last_seen_at = now
existing.updated_at = now
existing.details_json = json.dumps(details, sort_keys=True, default=str)
return existing, False
alert = Alert(
fingerprint=fingerprint,
inbox_id=inbox.id,
domain=inbox.domain,
severity="warning",
type="ingestion_rejected",
title=f"DMARC payload rejected for {inbox.label}",
summary=f"A message in {inbox.folder} was rejected during {stage}: {reason}",
details_json=json.dumps(details, sort_keys=True, default=str),
first_seen_at=now,
last_seen_at=now,
)
session.add(alert)
session.flush()
return alert, True
def _duplicate_report_sample(existing: Report, mail: MailMessage) -> dict[str, str | int | None]:
return {
"existing_report_db_id": existing.id,
"existing_report_id": existing.report_id,
"reporting_org": existing.org_name,
"report_date": (existing.date_end or existing.date_begin).date().isoformat() if (existing.date_end or existing.date_begin) else None,
"duplicate_message_uid": mail.imap_uid,
"duplicate_message_id": mail.message_id,
}
def _record_duplicate_report_payload(session: Session, inbox: InboxConfig, mail: MailMessage, existing: Report, sha256: str) -> None:
skipped = session.scalar(
select(SkippedReportPayload).where(
SkippedReportPayload.inbox_id == inbox.id,
SkippedReportPayload.folder == mail.folder,
SkippedReportPayload.imap_uid == mail.imap_uid,
SkippedReportPayload.raw_xml_sha256 == sha256,
SkippedReportPayload.reason == "duplicate_report_payload",
)
)
report_date = (existing.date_end or existing.date_begin).date() if (existing.date_end or existing.date_begin) else None
if not skipped:
skipped = SkippedReportPayload(
inbox_id=inbox.id,
folder=mail.folder,
imap_uid=mail.imap_uid,
message_id=mail.message_id,
mail_message_id=mail.id,
reason="duplicate_report_payload",
raw_xml_sha256=sha256,
)
session.add(skipped)
skipped.message_id = mail.message_id
skipped.mail_message_id = mail.id
skipped.existing_report_id = existing.id
skipped.report_identifier = existing.report_id
skipped.reporting_org = existing.org_name
skipped.report_date = report_date
def _store_report(session: Session, settings: Settings, inbox: InboxConfig, mail: MailMessage, extracted) -> tuple[Report | None, Report | None]:
existing = session.scalar(select(Report).where(Report.raw_xml_sha256 == extracted.sha256))
if existing:
return None, existing
parsed = parse_dmarc_xml(
extracted.payload,
max_records=settings.app.max_xml_records_per_report,
max_record_count=settings.app.max_record_count,
max_future_days=settings.app.max_report_future_days,
max_past_days=settings.app.max_report_past_days,
)
if not _domain_equal(parsed.domain, inbox.domain):
raise DMARCParseError(f"Report domain {parsed.domain} does not match inbox domain {inbox.domain}")
report = Report(
inbox_id=inbox.id,
mail_message_id=mail.id,
raw_xml_sha256=extracted.sha256,
report_id=parsed.report_id,
org_name=parsed.org_name,
org_email=parsed.org_email,
extra_contact_info=parsed.extra_contact_info,
domain=parsed.domain,
date_begin=parsed.date_begin,
date_end=parsed.date_end,
policy_p=parsed.policy_p,
policy_sp=parsed.policy_sp,
policy_pct=parsed.policy_pct,
adkim=parsed.adkim,
aspf=parsed.aspf,
fo=parsed.fo,
)
session.add(report)
session.flush()
for parsed_record in parsed.records:
match = classify_record(settings, parsed.domain, parsed_record)
record = Record(
report=report,
source_ip=parsed_record.source_ip,
count=parsed_record.count,
disposition=parsed_record.disposition,
policy_dkim=parsed_record.policy_dkim,
policy_spf=parsed_record.policy_spf,
dkim_aligned=parsed_record.dkim_aligned,
spf_aligned=parsed_record.spf_aligned,
dmarc_pass=parsed_record.dmarc_pass,
header_from=parsed_record.header_from,
reason_type=parsed_record.reason_type,
reason_comment=parsed_record.reason_comment,
known_sender_id=match.id,
known_sender_name=match.name,
is_known_sender=match.is_known,
)
session.add(record)
session.flush()
for auth in parsed_record.auth_results:
session.add(
AuthResult(
record=record,
auth_type=auth.auth_type,
domain=auth.domain,
selector=auth.selector,
scope=auth.scope,
result=auth.result,
human_result=auth.human_result,
)
)
session.flush()
return report, None
def process_inbox(
session: Session,
settings: Settings,
inbox: InboxConfig,
*,
folder: str | None = None,
mode: str = "new",
since: date | None = None,
before: date | None = None,
limit: int | None = None,
dry_run: bool = False,
reprocess: bool = False,
mark_seen: bool = False,
progress_callback: Callable[[ProcessingSummary], None] | None = None,
) -> ProcessingSummary:
folder = folder or inbox.folder
summary = ProcessingSummary(inbox_id=inbox.id, folder=folder)
status = ensure_inbox_status(session, inbox)
status.last_check_at = utcnow()
llm = LLMClient(settings)
try:
with IMAPClient(inbox) as client:
client.select_folder(folder)
uids = client.search_uids(unread_only=mode == "new", since=since, before=before, limit=limit or settings.app.max_reports_per_poll)
summary.scanned_messages = len(uids)
if progress_callback:
progress_callback(summary)
for uid in uids:
try:
imap_message = client.fetch_message(uid)
mail = _upsert_mail_message(session, inbox, folder, imap_message)
if mail.status == "success" and not reprocess:
summary.duplicate_messages_skipped += 1
continue
if not is_candidate_message(imap_message.message, inbox):
mail.status = "skipped"
mail.processed_at = utcnow()
continue
summary.candidate_messages += 1
if dry_run:
continue
reports = extract_dmarc_attachments(
imap_message.message,
settings.app.max_attachment_decompressed_mb,
max_compressed_mb=settings.app.max_attachment_compressed_mb,
max_attachments=settings.app.max_attachments_per_message,
max_reports_per_message=settings.app.max_reports_per_message,
max_reports_per_archive=settings.app.max_reports_per_archive,
max_compression_ratio=settings.app.max_archive_compression_ratio,
)
imported_any = False
for extracted in reports:
report, duplicate_report = _store_report(session, settings, inbox, mail, extracted)
if duplicate_report:
_record_duplicate_report_payload(session, inbox, mail, duplicate_report, extracted.sha256)
summary.duplicate_reports_skipped += 1
if summary.duplicate_report_samples is None:
summary.duplicate_report_samples = []
if len(summary.duplicate_report_samples) < 100:
summary.duplicate_report_samples.append(_duplicate_report_sample(duplicate_report, mail))
continue
if report:
imported_any = True
summary.valid_reports_imported += 1
summary.records_imported += len(report.records)
alerts = analyze_report(session, settings, report, llm=llm)
new_alerts = [item for item in alerts if item[1]]
summary.alerts_created += len(new_alerts)
summary.llm_explanations_generated += len([item for item in alerts if item[1] and item[0].severity in {"warning", "critical"}])
for alert, is_new, severity_increased in alerts:
if is_new or severity_increased:
send_alert_email(settings, alert, severity_increased=severity_increased)
mail.status = "success" if imported_any else "skipped"
mail.error = None
mail.processed_at = utcnow()
if inbox.mark_seen_after_success or mark_seen:
client.mark_seen(uid)
if imported_any and inbox.move_after_success and inbox.processed_folder:
client.move(uid, inbox.processed_folder)
session.commit()
except Exception as exc:
session.rollback()
summary.failed_messages += 1
logger.exception("Message UID %s failed: %s", uid, exc)
try:
imap_message = client.fetch_message(uid)
mail = _upsert_mail_message(session, inbox, folder, imap_message)
mail.status = "failed"
mail.error = str(exc)
mail.processed_at = utcnow()
if isinstance(exc, (AttachmentExtractionError, DMARCParseError)):
alert, is_new = _record_ingestion_rejection(
session,
inbox,
mail,
str(exc),
stage="attachment extraction" if isinstance(exc, AttachmentExtractionError) else "DMARC XML validation",
)
summary.rejected_messages += 1
if is_new:
summary.alerts_created += 1
send_alert_email(settings, alert)
session.commit()
if inbox.move_after_failure and inbox.failed_folder:
client.move(uid, inbox.failed_folder)
except Exception:
session.rollback()
finally:
summary.processed_messages += 1
if progress_callback:
progress_callback(summary)
status.last_success_at = utcnow()
status.last_error = None
status.last_new_messages = summary.scanned_messages
status.last_reports_imported = summary.valid_reports_imported
session.commit()
logger.info("Poll complete for %s: %s", inbox.id, summary)
except Exception as exc:
session.rollback()
status = ensure_inbox_status(session, inbox)
status.last_error_at = utcnow()
status.last_error = str(exc)
session.commit()
logger.exception("Inbox processing failed for %s: %s", inbox.id, exc)
raise
return summary