From c68ccc806552f8684562ff3f674d11266beed3e5 Mon Sep 17 00:00:00 2001 From: Beda Schmid Date: Wed, 20 May 2026 14:12:57 -0300 Subject: [PATCH] DKIM should never be inferred from reports --- app/dns_policy.py | 41 --------------------------------------- app/main.py | 38 +++++------------------------------- app/templates/domain.html | 26 ------------------------- tests/test_dns_policy.py | 11 ++++------- 4 files changed, 9 insertions(+), 107 deletions(-) diff --git a/app/dns_policy.py b/app/dns_policy.py index f9b8e28..93a07c0 100644 --- a/app/dns_policy.py +++ b/app/dns_policy.py @@ -29,21 +29,11 @@ class ParsedSpfRecord: all_mechanism: str | None = None -@dataclass -class DkimRecord: - selector: str - domain: str - query_name: str - record: str | None = None - error: str | None = None - - @dataclass class DomainDnsPolicy: domain: str dmarc: ParsedDmarcRecord = field(default_factory=ParsedDmarcRecord) spf: ParsedSpfRecord = field(default_factory=ParsedSpfRecord) - dkim: list[DkimRecord] = field(default_factory=list) mx_records: list[str] = field(default_factory=list) errors: list[str] = field(default_factory=list) @@ -139,7 +129,6 @@ def parse_spf_records(records: list[str]) -> tuple[ParsedSpfRecord, list[str]]: def collect_domain_dns_policy( domain: str, *, - selectors: list[str | tuple[str, str]] | None = None, txt_lookup: TxtLookup | None = None, mx_lookup: MxLookup | None = None, ) -> DomainDnsPolicy: @@ -165,34 +154,4 @@ def collect_domain_dns_policy( except Exception as exc: policy.errors.append(f"MX lookup failed: {exc}") - selector_domains: set[tuple[str, str]] = set() - for item in selectors or []: - if isinstance(item, tuple): - selector, dkim_domain = item - else: - selector, dkim_domain = item, domain - selector = (selector or "").strip().lower() - dkim_domain = (dkim_domain or domain).strip().lower().rstrip(".") - if selector and dkim_domain: - selector_domains.add((selector, dkim_domain)) - - for selector, dkim_domain in sorted(selector_domains): - query_name = f"{selector}._domainkey.{dkim_domain}" - try: - records = txt_lookup(query_name) - dkim_records = [record for record in records if record.strip().lower().startswith("v=dkim1")] - policy.dkim.append( - DkimRecord( - selector=selector, - domain=dkim_domain, - query_name=query_name, - record=dkim_records[0] if dkim_records else None, - ) - ) - if not dkim_records: - policy.errors.append(f"DKIM record not found for selector {selector} on {dkim_domain}") - except Exception as exc: - policy.dkim.append(DkimRecord(selector=selector, domain=dkim_domain, query_name=query_name, error=str(exc))) - policy.errors.append(f"DKIM lookup failed for selector {selector} on {dkim_domain}: {exc}") - return policy diff --git a/app/main.py b/app/main.py index d79fdb3..0c7916c 100644 --- a/app/main.py +++ b/app/main.py @@ -22,7 +22,7 @@ from app.dns_policy import DomainDnsPolicy, collect_domain_dns_policy from app.homepage import domain_homepage_summary, domain_metrics, homepage_summary, latest_summary, resolve_date_range, traffic_distribution from app.inbox_locks import InboxRunLease, inbox_run_locks from app.jobs import import_jobs -from app.models import Alert, AuthResult, DailyStat, DomainDnsSnapshot, InboxStatus, LLMReport, Record, Report, SkippedReportPayload, utcnow +from app.models import Alert, DailyStat, DomainDnsSnapshot, InboxStatus, LLMReport, Record, Report, SkippedReportPayload, utcnow from app.scheduler import generate_open_posture_summaries, scheduler_ok, start_scheduler from app.schemas import BacklogRequest, ProcessNowRequest from app.message_processor import process_inbox @@ -400,24 +400,6 @@ def _json_list(value: str | None) -> list: return data if isinstance(data, list) else [] -def _observed_dkim_selectors(session: Session, domain: str) -> list[tuple[str, str]]: - rows = session.execute( - select(AuthResult.selector, AuthResult.domain) - .select_from(AuthResult) - .join(Record) - .join(Report) - .where( - Report.domain == domain, - AuthResult.auth_type == "dkim", - AuthResult.selector.is_not(None), - AuthResult.domain.is_not(None), - ) - .distinct() - .order_by(AuthResult.domain, AuthResult.selector) - ).all() - return [(selector, auth_domain) for selector, auth_domain in rows if selector and auth_domain] - - def _snapshot_model(domain: str, policy: DomainDnsPolicy) -> DomainDnsSnapshot: return DomainDnsSnapshot( domain=domain, @@ -433,7 +415,7 @@ def _snapshot_model(domain: str, policy: DomainDnsPolicy) -> DomainDnsSnapshot: spf_record=policy.spf.raw, spf_all=policy.spf.all_mechanism, spf_includes_json=json.dumps(policy.spf.includes, sort_keys=True), - dkim_records_json=json.dumps([item.__dict__ for item in policy.dkim], sort_keys=True), + dkim_records_json="[]", mx_records_json=json.dumps(policy.mx_records, sort_keys=True), errors_json=json.dumps(policy.errors, sort_keys=True), ) @@ -448,14 +430,6 @@ def _latest_dns_snapshot(session: Session, domain: str) -> SimpleNamespace | Non ) if not snapshot: return None - dkim_records = _json_list(snapshot.dkim_records_json) - dkim_found = [item for item in dkim_records if isinstance(item, dict) and item.get("record")] - dkim_missing = [item for item in dkim_records if isinstance(item, dict) and not item.get("record")] - dns_errors = [ - item - for item in _json_list(snapshot.errors_json) - if isinstance(item, str) and not item.startswith("DKIM lookup failed") and not item.startswith("DKIM record not found") - ] return SimpleNamespace( id=snapshot.id, domain=snapshot.domain, @@ -472,11 +446,9 @@ def _latest_dns_snapshot(session: Session, domain: str) -> SimpleNamespace | Non spf_record=snapshot.spf_record, spf_all=snapshot.spf_all, spf_includes=_json_list(snapshot.spf_includes_json), - dkim_records=dkim_records, - dkim_found=dkim_found, - dkim_missing=dkim_missing, + dkim_records=_json_list(snapshot.dkim_records_json), mx_records=_json_list(snapshot.mx_records_json), - errors=dns_errors, + errors=_json_list(snapshot.errors_json), ) @@ -550,7 +522,7 @@ def domain_page(domain: str, request: Request, source_page: int = 1, alert_page: @app.post("/domains/{domain}/dns/refresh", dependencies=dashboard_post_auth) def refresh_domain_dns(domain: str, session: Session = Depends(get_db)): - policy = collect_domain_dns_policy(domain, selectors=_observed_dkim_selectors(session, domain)) + policy = collect_domain_dns_policy(domain) session.add(_snapshot_model(domain, policy)) session.commit() return RedirectResponse(url=f"/domains/{domain}", status_code=303) diff --git a/app/templates/domain.html b/app/templates/domain.html index acec7b0..03dee7c 100644 --- a/app/templates/domain.html +++ b/app/templates/domain.html @@ -64,32 +64,6 @@ {{ dns_snapshot.spf_record or "No SPF record found." }} -
-
- DKIM - {{ dns_snapshot.dkim_found | length }} found{% if dns_snapshot.dkim_missing %}, {{ dns_snapshot.dkim_missing | length }} missing{% endif %} -
-
- {% for item in dns_snapshot.dkim_found %} -
- {{ item.selector }}._domainkey.{{ item.domain or domain }} - {{ item.record }} -
- {% else %} - No currently resolvable observed DKIM selectors. - {% endfor %} -
- {% if dns_snapshot.dkim_missing %} -
- {{ dns_snapshot.dkim_missing | length }} observed selector lookups did not resolve -
- {% for item in dns_snapshot.dkim_missing %} - {{ item.query_name }}{% if item.error %}: {{ item.error }}{% endif %} - {% endfor %} -
-
- {% endif %} -
MX diff --git a/tests/test_dns_policy.py b/tests/test_dns_policy.py index a635800..e21b7f7 100644 --- a/tests/test_dns_policy.py +++ b/tests/test_dns_policy.py @@ -20,21 +20,21 @@ def test_parse_spf_record_extracts_includes_and_all_mechanism(): assert parsed.all_mechanism == "-all" -def test_collect_domain_dns_policy_uses_observed_dkim_selectors(): +def test_collect_domain_dns_policy_queries_only_domain_dns_records(): txt_records = { "_dmarc.example.com": ["v=DMARC1; p=reject; pct=100"], "example.com": ["v=spf1 include:_spf.example.net -all"], - "s1._domainkey.mail.example.net": ["v=DKIM1; k=rsa; p=abc"], } + queried = [] def txt_lookup(name: str) -> list[str]: + queried.append(name) if name not in txt_records: raise RuntimeError("not found") return txt_records[name] policy = collect_domain_dns_policy( "example.com", - selectors=[("s1", "mail.example.net")], txt_lookup=txt_lookup, mx_lookup=lambda name: ["10 mail.example.com"], ) @@ -42,8 +42,5 @@ def test_collect_domain_dns_policy_uses_observed_dkim_selectors(): assert policy.dmarc.p == "reject" assert policy.spf.all_mechanism == "-all" assert policy.mx_records == ["10 mail.example.com"] - assert policy.dkim[0].selector == "s1" - assert policy.dkim[0].domain == "mail.example.net" - assert policy.dkim[0].query_name == "s1._domainkey.mail.example.net" - assert policy.dkim[0].record == "v=DKIM1; k=rsa; p=abc" + assert queried == ["_dmarc.example.com", "example.com"] assert policy.errors == []