48 lines
1.5 KiB
Python
48 lines
1.5 KiB
Python
from __future__ import annotations
|
|
|
|
from dataclasses import dataclass
|
|
from ipaddress import ip_address, ip_network
|
|
|
|
from app.config import KnownSenderConfig, Settings
|
|
from app.dmarc_parser import ParsedRecord
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class SenderMatch:
|
|
id: str | None
|
|
name: str | None
|
|
is_known: bool
|
|
|
|
|
|
def _domain_equal(a: str | None, b: str | None) -> bool:
|
|
return (a or "").lower().rstrip(".") == (b or "").lower().rstrip(".")
|
|
|
|
|
|
def _ip_matches(source_ip: str, sender: KnownSenderConfig) -> bool:
|
|
try:
|
|
ip = ip_address(source_ip)
|
|
except ValueError:
|
|
return False
|
|
for cidr in sender.ip_allowlist:
|
|
try:
|
|
if ip in ip_network(cidr, strict=False):
|
|
return True
|
|
except ValueError:
|
|
continue
|
|
return False
|
|
|
|
|
|
def classify_record(settings: Settings, domain: str, record: ParsedRecord) -> SenderMatch:
|
|
senders = settings.known_senders.get(domain, [])
|
|
for sender in senders:
|
|
if _ip_matches(record.source_ip, sender):
|
|
return SenderMatch(sender.id, sender.name, True)
|
|
if sender.ip_allowlist:
|
|
continue
|
|
for auth in record.auth_results:
|
|
if auth.auth_type == "dkim" and any(_domain_equal(auth.domain, item) for item in sender.dkim_domains):
|
|
return SenderMatch(sender.id, sender.name, True)
|
|
if auth.auth_type == "spf" and any(_domain_equal(auth.domain, item) for item in sender.spf_domains):
|
|
return SenderMatch(sender.id, sender.name, True)
|
|
return SenderMatch(None, None, False)
|