Files
2026-05-16 12:05:36 -03:00

92 lines
3.4 KiB
Python

from __future__ import annotations
import json
import logging
import os
import smtplib
from email.message import EmailMessage
from app.config import Settings
from app.models import Alert
logger = logging.getLogger(__name__)
def send_alert_email(settings: Settings, alert: Alert, severity_increased: bool = False) -> bool:
if alert.severity not in {"warning", "critical"}:
return False
if not settings.alerts.email.enabled:
return False
host = os.getenv(settings.alerts.email.smtp_host_env)
to_addr = os.getenv(settings.alerts.email.to_env)
from_addr = os.getenv(settings.alerts.email.from_env)
if not host or not to_addr or not from_addr:
logger.warning("Email alert not sent because SMTP environment is incomplete")
return False
msg = EmailMessage()
msg["Subject"] = f"[DMARC Sentinel] {alert.severity.upper()} {alert.title}"
msg["From"] = from_addr
msg["To"] = to_addr
details = json.dumps(json.loads(alert.details_json or "{}"), indent=2, sort_keys=True)
msg.set_content(
"\n".join(
[
f"Severity: {alert.severity}",
f"Domain: {alert.domain}",
f"Alert type: {alert.type}",
f"Title: {alert.title}",
"",
"Deterministic facts:",
details,
"",
f"LLM summary: {alert.llm_summary or alert.summary}",
f"LLM risk: {alert.llm_risk or 'Unavailable'}",
f"LLM recommended action: {alert.llm_recommended_action or 'Review the deterministic facts.'}",
"",
f"Dashboard: {settings.app.base_url}/alerts",
f"Severity increased: {severity_increased}",
]
)
)
port = int(os.getenv(settings.alerts.email.smtp_port_env, "587"))
user = os.getenv(settings.alerts.email.smtp_user_env)
password = os.getenv(settings.alerts.email.smtp_password_env)
try:
with smtplib.SMTP(host, port, timeout=30) as smtp:
smtp.starttls()
if user and password:
smtp.login(user, password)
smtp.send_message(msg)
logger.info("Email alert delivered for %s", alert.fingerprint)
return True
except Exception as exc:
logger.warning("Email alert delivery failed for %s: %s", alert.fingerprint, exc)
return False
def send_digest_email(settings: Settings, subject: str, body: str) -> bool:
if not settings.alerts.email.enabled:
return False
host = os.getenv(settings.alerts.email.smtp_host_env)
to_addr = os.getenv(settings.alerts.email.to_env)
from_addr = os.getenv(settings.alerts.email.from_env)
if not host or not to_addr or not from_addr:
return False
msg = EmailMessage()
msg["Subject"] = subject
msg["From"] = from_addr
msg["To"] = to_addr
msg.set_content(body)
try:
with smtplib.SMTP(host, int(os.getenv(settings.alerts.email.smtp_port_env, "587")), timeout=30) as smtp:
smtp.starttls()
user = os.getenv(settings.alerts.email.smtp_user_env)
password = os.getenv(settings.alerts.email.smtp_password_env)
if user and password:
smtp.login(user, password)
smtp.send_message(msg)
return True
except Exception as exc:
logger.warning("Digest email delivery failed: %s", exc)
return False