from __future__ import annotations from dataclasses import dataclass from ipaddress import ip_address, ip_network from app.config import KnownSenderConfig, Settings from app.dmarc_parser import ParsedRecord @dataclass(frozen=True) class SenderMatch: id: str | None name: str | None is_known: bool def _domain_equal(a: str | None, b: str | None) -> bool: return (a or "").lower().rstrip(".") == (b or "").lower().rstrip(".") def _ip_matches(source_ip: str, sender: KnownSenderConfig) -> bool: try: ip = ip_address(source_ip) except ValueError: return False for cidr in sender.ip_allowlist: try: if ip in ip_network(cidr, strict=False): return True except ValueError: continue return False def classify_record(settings: Settings, domain: str, record: ParsedRecord) -> SenderMatch: senders = settings.known_senders.get(domain, []) for sender in senders: if _ip_matches(record.source_ip, sender): return SenderMatch(sender.id, sender.name, True) if sender.ip_allowlist: continue for auth in record.auth_results: if auth.auth_type == "dkim" and any(_domain_equal(auth.domain, item) for item in sender.dkim_domains): return SenderMatch(sender.id, sender.name, True) if auth.auth_type == "spf" and any(_domain_equal(auth.domain, item) for item in sender.spf_domains): return SenderMatch(sender.id, sender.name, True) return SenderMatch(None, None, False)