from __future__ import annotations import logging import os from functools import lru_cache from pathlib import Path from typing import Any import yaml from pydantic import BaseModel, Field class AppConfig(BaseModel): name: str = "DMARC Sentinel" base_url: str = "https://sentinel.tukutoi.com" timezone: str = "Europe/Zurich" poll_interval_minutes: int = 30 database_url: str = "sqlite:////app/data/dmarc-sentinel.sqlite3" log_level: str = "INFO" max_attachment_decompressed_mb: int = 20 max_attachment_compressed_mb: int = 10 max_attachments_per_message: int = 20 max_reports_per_message: int = 20 max_reports_per_archive: int = 20 max_archive_compression_ratio: int = 100 max_xml_records_per_report: int = 10000 max_record_count: int = 10000000 max_report_future_days: int = 3 max_report_past_days: int = 3650 max_reports_per_poll: int = 200 class SecurityConfig(BaseModel): dashboard_auth_enabled: bool = True dashboard_username_env: str = "DASHBOARD_USERNAME" dashboard_password_env: str = "DASHBOARD_PASSWORD" api_token_required: bool = True homepage_token_env: str = "HOMEPAGE_API_TOKEN" class LLMConfig(BaseModel): provider: str = "openai" api_key_env: str = "OPENAI_API_KEY" model: str = "gpt-4.1-mini" temperature: float = 0.2 timeout_seconds: int = 45 max_retries: int = 2 generate_alert_explanations: bool = True generate_daily_summary: bool = True generate_weekly_summary: bool = True store_llm_outputs: bool = True send_raw_xml_to_llm: bool = False send_raw_email_to_llm: bool = False system_prompt_path: str = "config/prompts/system.md" alert_prompt_path: str = "config/prompts/alert_explanation.md" digest_prompt_path: str = "config/prompts/posture_digest.md" weekly_prompt_path: str = "config/prompts/weekly_summary.md" class InboxConfig(BaseModel): id: str label: str domain: str imap_host: str imap_port: int = 993 imap_ssl: bool = True username_env: str password_env: str folder: str = "DMARC" recipient: str processed_folder: str | None = None failed_folder: str | None = None move_after_success: bool = False move_after_failure: bool = False mark_seen_after_success: bool = True enabled: bool = True @property def username(self) -> str | None: return os.getenv(self.username_env) @property def password(self) -> str | None: return os.getenv(self.password_env) class KnownSenderConfig(BaseModel): id: str name: str ip_allowlist: list[str] = Field(default_factory=list) dkim_domains: list[str] = Field(default_factory=list) spf_domains: list[str] = Field(default_factory=list) class EmailAlertConfig(BaseModel): enabled: bool = True smtp_host_env: str = "ALERT_SMTP_HOST" smtp_port_env: str = "ALERT_SMTP_PORT" smtp_user_env: str = "ALERT_SMTP_USER" smtp_password_env: str = "ALERT_SMTP_PASSWORD" from_env: str = "ALERT_EMAIL_FROM" to_env: str = "ALERT_EMAIL_TO" class AlertThresholds(BaseModel): unknown_source_fail_count: int = 10 unknown_source_fail_rate_percent: float = 5 known_source_fail_rate_percent: float = 2 total_volume_spike_multiplier: float = 3 total_volume_drop_percent: float = 80 min_messages_for_rate_alert: int = 20 repeated_failure_days: int = 2 class AlertsConfig(BaseModel): email: EmailAlertConfig = Field(default_factory=EmailAlertConfig) thresholds: AlertThresholds = Field(default_factory=AlertThresholds) class Settings(BaseModel): app: AppConfig = Field(default_factory=AppConfig) security: SecurityConfig = Field(default_factory=SecurityConfig) llm: LLMConfig = Field(default_factory=LLMConfig) inboxes: list[InboxConfig] = Field(default_factory=list) known_senders: dict[str, list[KnownSenderConfig]] = Field(default_factory=dict) alerts: AlertsConfig = Field(default_factory=AlertsConfig) def enabled_inboxes(self) -> list[InboxConfig]: return [inbox for inbox in self.inboxes if inbox.enabled] def get_inbox(self, inbox_id: str) -> InboxConfig: for inbox in self.inboxes: if inbox.id == inbox_id: return inbox raise KeyError(f"Unknown inbox: {inbox_id}") def _default_config_path() -> Path: explicit = os.getenv("DMARC_SENTINEL_CONFIG") if explicit: return Path(explicit) return Path("config/config.yml") def load_settings(path: str | Path | None = None) -> Settings: config_path = Path(path) if path else _default_config_path() if not config_path.exists(): raise FileNotFoundError( f"Runtime config not found at {config_path}. " "Create config/config.yml from config/config.example.yml or set DMARC_SENTINEL_CONFIG." ) with config_path.open("r", encoding="utf-8") as handle: raw: dict[str, Any] = yaml.safe_load(handle) or {} settings = Settings.model_validate(raw) validate_llm_environment(settings) return settings def validate_llm_environment(settings: Settings) -> None: if settings.llm.provider != "openai": return if not any( [ settings.llm.generate_alert_explanations, settings.llm.generate_daily_summary, settings.llm.generate_weekly_summary, ] ): return if os.getenv("DMARC_SENTINEL_ALLOW_NO_LLM_FOR_TESTS", "").lower() == "true": return if not os.getenv(settings.llm.api_key_env): raise RuntimeError( f"{settings.llm.api_key_env} is required when llm.provider=openai. " "Set DMARC_SENTINEL_ALLOW_NO_LLM_FOR_TESTS=true only for tests." ) @lru_cache(maxsize=1) def get_settings() -> Settings: return load_settings() def configure_logging(settings: Settings) -> None: Path("logs").mkdir(exist_ok=True) level = getattr(logging, settings.app.log_level.upper(), logging.INFO) formatter = logging.Formatter("%(asctime)s %(levelname)s %(name)s %(message)s") root = logging.getLogger() root.setLevel(level) root.handlers.clear() stream = logging.StreamHandler() stream.setFormatter(formatter) root.addHandler(stream) try: file_handler = logging.FileHandler("logs/dmarc-sentinel.log") file_handler.setFormatter(formatter) root.addHandler(file_handler) except OSError: logging.getLogger(__name__).warning("Could not open logs/dmarc-sentinel.log")