201 lines
6.4 KiB
Python
201 lines
6.4 KiB
Python
from __future__ import annotations
|
|
|
|
import logging
|
|
import os
|
|
from functools import lru_cache
|
|
from pathlib import Path
|
|
from typing import Any
|
|
|
|
import yaml
|
|
from pydantic import BaseModel, Field
|
|
|
|
|
|
class AppConfig(BaseModel):
|
|
name: str = "DMARC Sentinel"
|
|
base_url: str = "https://sentinel.tukutoi.com"
|
|
timezone: str = "Europe/Zurich"
|
|
poll_interval_minutes: int = 30
|
|
database_url: str = "sqlite:////app/data/dmarc-sentinel.sqlite3"
|
|
log_level: str = "INFO"
|
|
max_attachment_decompressed_mb: int = 20
|
|
max_attachment_compressed_mb: int = 10
|
|
max_attachments_per_message: int = 20
|
|
max_reports_per_message: int = 20
|
|
max_reports_per_archive: int = 20
|
|
max_archive_compression_ratio: int = 100
|
|
max_xml_records_per_report: int = 10000
|
|
max_record_count: int = 10000000
|
|
max_report_future_days: int = 3
|
|
max_report_past_days: int = 3650
|
|
max_reports_per_poll: int = 200
|
|
|
|
|
|
class SecurityConfig(BaseModel):
|
|
dashboard_auth_enabled: bool = True
|
|
dashboard_username_env: str = "DASHBOARD_USERNAME"
|
|
dashboard_password_env: str = "DASHBOARD_PASSWORD"
|
|
api_token_required: bool = True
|
|
homepage_token_env: str = "HOMEPAGE_API_TOKEN"
|
|
|
|
|
|
class LLMConfig(BaseModel):
|
|
provider: str = "openai"
|
|
api_key_env: str = "OPENAI_API_KEY"
|
|
model: str = "gpt-4.1-mini"
|
|
temperature: float = 0.2
|
|
timeout_seconds: int = 45
|
|
max_retries: int = 2
|
|
generate_alert_explanations: bool = True
|
|
generate_daily_summary: bool = True
|
|
generate_weekly_summary: bool = True
|
|
store_llm_outputs: bool = True
|
|
send_raw_xml_to_llm: bool = False
|
|
send_raw_email_to_llm: bool = False
|
|
system_prompt_path: str = "config/prompts/system.md"
|
|
alert_prompt_path: str = "config/prompts/alert_explanation.md"
|
|
digest_prompt_path: str = "config/prompts/posture_digest.md"
|
|
weekly_prompt_path: str = "config/prompts/weekly_summary.md"
|
|
|
|
|
|
class InboxConfig(BaseModel):
|
|
id: str
|
|
label: str
|
|
domain: str
|
|
imap_host: str
|
|
imap_port: int = 993
|
|
imap_ssl: bool = True
|
|
username_env: str
|
|
password_env: str
|
|
folder: str = "DMARC"
|
|
recipient: str
|
|
processed_folder: str | None = None
|
|
failed_folder: str | None = None
|
|
move_after_success: bool = False
|
|
move_after_failure: bool = False
|
|
mark_seen_after_success: bool = True
|
|
enabled: bool = True
|
|
|
|
@property
|
|
def username(self) -> str | None:
|
|
return os.getenv(self.username_env)
|
|
|
|
@property
|
|
def password(self) -> str | None:
|
|
return os.getenv(self.password_env)
|
|
|
|
|
|
class KnownSenderConfig(BaseModel):
|
|
id: str
|
|
name: str
|
|
ip_allowlist: list[str] = Field(default_factory=list)
|
|
dkim_domains: list[str] = Field(default_factory=list)
|
|
spf_domains: list[str] = Field(default_factory=list)
|
|
|
|
|
|
class EmailAlertConfig(BaseModel):
|
|
enabled: bool = True
|
|
smtp_host_env: str = "ALERT_SMTP_HOST"
|
|
smtp_port_env: str = "ALERT_SMTP_PORT"
|
|
smtp_user_env: str = "ALERT_SMTP_USER"
|
|
smtp_password_env: str = "ALERT_SMTP_PASSWORD"
|
|
from_env: str = "ALERT_EMAIL_FROM"
|
|
to_env: str = "ALERT_EMAIL_TO"
|
|
|
|
|
|
class AlertThresholds(BaseModel):
|
|
unknown_source_fail_count: int = 10
|
|
unknown_source_fail_rate_percent: float = 5
|
|
known_source_fail_rate_percent: float = 2
|
|
total_volume_spike_multiplier: float = 3
|
|
total_volume_drop_percent: float = 80
|
|
min_messages_for_rate_alert: int = 20
|
|
repeated_failure_days: int = 2
|
|
missing_reporter_days: int = 3
|
|
|
|
|
|
class AlertsConfig(BaseModel):
|
|
email: EmailAlertConfig = Field(default_factory=EmailAlertConfig)
|
|
thresholds: AlertThresholds = Field(default_factory=AlertThresholds)
|
|
|
|
|
|
class Settings(BaseModel):
|
|
app: AppConfig = Field(default_factory=AppConfig)
|
|
security: SecurityConfig = Field(default_factory=SecurityConfig)
|
|
llm: LLMConfig = Field(default_factory=LLMConfig)
|
|
inboxes: list[InboxConfig] = Field(default_factory=list)
|
|
known_senders: dict[str, list[KnownSenderConfig]] = Field(default_factory=dict)
|
|
alerts: AlertsConfig = Field(default_factory=AlertsConfig)
|
|
|
|
def enabled_inboxes(self) -> list[InboxConfig]:
|
|
return [inbox for inbox in self.inboxes if inbox.enabled]
|
|
|
|
def get_inbox(self, inbox_id: str) -> InboxConfig:
|
|
for inbox in self.inboxes:
|
|
if inbox.id == inbox_id:
|
|
return inbox
|
|
raise KeyError(f"Unknown inbox: {inbox_id}")
|
|
|
|
|
|
def _default_config_path() -> Path:
|
|
explicit = os.getenv("DMARC_SENTINEL_CONFIG")
|
|
if explicit:
|
|
return Path(explicit)
|
|
return Path("config/config.yml")
|
|
|
|
|
|
def load_settings(path: str | Path | None = None) -> Settings:
|
|
config_path = Path(path) if path else _default_config_path()
|
|
if not config_path.exists():
|
|
raise FileNotFoundError(
|
|
f"Runtime config not found at {config_path}. "
|
|
"Create config/config.yml from config/config.example.yml or set DMARC_SENTINEL_CONFIG."
|
|
)
|
|
with config_path.open("r", encoding="utf-8") as handle:
|
|
raw: dict[str, Any] = yaml.safe_load(handle) or {}
|
|
settings = Settings.model_validate(raw)
|
|
validate_llm_environment(settings)
|
|
return settings
|
|
|
|
|
|
def validate_llm_environment(settings: Settings) -> None:
|
|
if settings.llm.provider != "openai":
|
|
return
|
|
if not any(
|
|
[
|
|
settings.llm.generate_alert_explanations,
|
|
settings.llm.generate_daily_summary,
|
|
settings.llm.generate_weekly_summary,
|
|
]
|
|
):
|
|
return
|
|
if os.getenv("DMARC_SENTINEL_ALLOW_NO_LLM_FOR_TESTS", "").lower() == "true":
|
|
return
|
|
if not os.getenv(settings.llm.api_key_env):
|
|
raise RuntimeError(
|
|
f"{settings.llm.api_key_env} is required when llm.provider=openai. "
|
|
"Set DMARC_SENTINEL_ALLOW_NO_LLM_FOR_TESTS=true only for tests."
|
|
)
|
|
|
|
|
|
@lru_cache(maxsize=1)
|
|
def get_settings() -> Settings:
|
|
return load_settings()
|
|
|
|
|
|
def configure_logging(settings: Settings) -> None:
|
|
Path("logs").mkdir(exist_ok=True)
|
|
level = getattr(logging, settings.app.log_level.upper(), logging.INFO)
|
|
formatter = logging.Formatter("%(asctime)s %(levelname)s %(name)s %(message)s")
|
|
root = logging.getLogger()
|
|
root.setLevel(level)
|
|
root.handlers.clear()
|
|
stream = logging.StreamHandler()
|
|
stream.setFormatter(formatter)
|
|
root.addHandler(stream)
|
|
try:
|
|
file_handler = logging.FileHandler("logs/dmarc-sentinel.log")
|
|
file_handler.setFormatter(formatter)
|
|
root.addHandler(file_handler)
|
|
except OSError:
|
|
logging.getLogger(__name__).warning("Could not open logs/dmarc-sentinel.log")
|