from __future__ import annotations import re from dataclasses import dataclass, field from typing import Callable TxtLookup = Callable[[str], list[str]] MxLookup = Callable[[str], list[str]] @dataclass class ParsedDmarcRecord: raw: str | None = None p: str | None = None sp: str | None = None pct: int | None = None adkim: str | None = None aspf: str | None = None fo: str | None = None rua: str | None = None ruf: str | None = None @dataclass class ParsedSpfRecord: raw: str | None = None includes: list[str] = field(default_factory=list) all_mechanism: str | None = None @dataclass class DomainDnsPolicy: domain: str dmarc: ParsedDmarcRecord = field(default_factory=ParsedDmarcRecord) spf: ParsedSpfRecord = field(default_factory=ParsedSpfRecord) mx_records: list[str] = field(default_factory=list) errors: list[str] = field(default_factory=list) def _default_txt_lookup(name: str) -> list[str]: try: import dns.resolver except ImportError as exc: raise RuntimeError("dnspython is not installed") from exc answers = dns.resolver.resolve(name, "TXT", lifetime=10) values = [] for answer in answers: parts = getattr(answer, "strings", None) if parts is None: values.append(str(answer).strip('"')) else: values.append("".join(part.decode("utf-8", errors="replace") for part in parts)) return values def _default_mx_lookup(name: str) -> list[str]: try: import dns.resolver except ImportError as exc: raise RuntimeError("dnspython is not installed") from exc answers = dns.resolver.resolve(name, "MX", lifetime=10) return [f"{answer.preference} {str(answer.exchange).rstrip('.')}" for answer in answers] def _tag_map(record: str) -> dict[str, str]: tags: dict[str, str] = {} for part in record.split(";"): if "=" not in part: continue key, value = part.split("=", 1) key = key.strip().lower() value = value.strip() if key: tags[key] = value return tags def _int(value: str | None) -> int | None: if not value: return None try: return int(value) except ValueError: return None def parse_dmarc_records(records: list[str]) -> tuple[ParsedDmarcRecord, list[str]]: dmarc_records = [record.strip() for record in records if record.strip().lower().startswith("v=dmarc1")] errors = [] if not dmarc_records: return ParsedDmarcRecord(), ["DMARC TXT record not found"] if len(dmarc_records) > 1: errors.append("Multiple DMARC TXT records found") raw = dmarc_records[0] tags = _tag_map(raw) return ( ParsedDmarcRecord( raw=raw, p=tags.get("p"), sp=tags.get("sp"), pct=_int(tags.get("pct")), adkim=tags.get("adkim"), aspf=tags.get("aspf"), fo=tags.get("fo"), rua=tags.get("rua"), ruf=tags.get("ruf"), ), errors, ) def parse_spf_records(records: list[str]) -> tuple[ParsedSpfRecord, list[str]]: spf_records = [record.strip() for record in records if record.strip().lower().startswith("v=spf1")] errors = [] if not spf_records: return ParsedSpfRecord(), ["SPF TXT record not found"] if len(spf_records) > 1: errors.append("Multiple SPF TXT records found") raw = spf_records[0] tokens = raw.split() includes = [token.split(":", 1)[1] for token in tokens if token.lower().startswith("include:") and ":" in token] all_mechanism = next((token for token in tokens if re.fullmatch(r"[+?~-]?all", token, flags=re.IGNORECASE)), None) return ParsedSpfRecord(raw=raw, includes=includes, all_mechanism=all_mechanism), errors def collect_domain_dns_policy( domain: str, *, txt_lookup: TxtLookup | None = None, mx_lookup: MxLookup | None = None, ) -> DomainDnsPolicy: txt_lookup = txt_lookup or _default_txt_lookup mx_lookup = mx_lookup or _default_mx_lookup domain = domain.lower().rstrip(".") policy = DomainDnsPolicy(domain=domain) try: policy.dmarc, errors = parse_dmarc_records(txt_lookup(f"_dmarc.{domain}")) policy.errors.extend(errors) except Exception as exc: policy.errors.append(f"DMARC lookup failed: {exc}") try: policy.spf, errors = parse_spf_records(txt_lookup(domain)) policy.errors.extend(errors) except Exception as exc: policy.errors.append(f"SPF lookup failed: {exc}") try: policy.mx_records = mx_lookup(domain) except Exception as exc: policy.errors.append(f"MX lookup failed: {exc}") return policy