from __future__ import annotations import email import imaplib import logging import socket from dataclasses import dataclass from datetime import date from email.message import Message from typing import Any from app.config import InboxConfig logger = logging.getLogger(__name__) class IMAPError(Exception): pass @dataclass class IMAPMessage: uid: str raw: bytes seen: bool message: Message def _resolved_addresses(host: str, port: int) -> list[tuple[Any, tuple]]: return [(family, sockaddr) for family, _, _, _, sockaddr in socket.getaddrinfo(host, port, 0, socket.SOCK_STREAM)] class _ResolvedIMAP4(imaplib.IMAP4): def __init__(self, host: str, port: int, addresses: list[tuple[Any, tuple]]): self._resolved_addresses = addresses super().__init__(host, port) def _create_socket(self, timeout): last_error: Exception | None = None for family, sockaddr in self._resolved_addresses: try: if timeout is not None: return socket.create_connection(sockaddr, timeout) return socket.create_connection(sockaddr) except OSError as exc: last_error = exc logger.warning("IMAP connect failed for %s via %s: %s", self.host, sockaddr[0], exc) raise IMAPError(f"Could not connect to IMAP host {self.host}: {last_error}") class _ResolvedIMAP4SSL(imaplib.IMAP4_SSL): def __init__(self, host: str, port: int, addresses: list[tuple[Any, tuple]]): self._resolved_addresses = addresses super().__init__(host, port) def _create_socket(self, timeout): last_error: Exception | None = None for family, sockaddr in self._resolved_addresses: try: if timeout is not None: sock = socket.create_connection(sockaddr, timeout) else: sock = socket.create_connection(sockaddr) return self.ssl_context.wrap_socket(sock, server_hostname=self.host) except OSError as exc: last_error = exc logger.warning("IMAP SSL connect failed for %s via %s: %s", self.host, sockaddr[0], exc) raise IMAPError(f"Could not connect to IMAP host {self.host}: {last_error}") class IMAPClient: def __init__(self, inbox: InboxConfig): self.inbox = inbox self.conn: imaplib.IMAP4 | imaplib.IMAP4_SSL | None = None def __enter__(self) -> "IMAPClient": if not self.inbox.username or not self.inbox.password: raise IMAPError(f"Missing IMAP credentials for inbox {self.inbox.id}") try: addresses = _resolved_addresses(self.inbox.imap_host, self.inbox.imap_port) except socket.gaierror as exc: raise IMAPError(f"Could not resolve IMAP host {self.inbox.imap_host!r} for inbox {self.inbox.id}: {exc}") from exc logger.info("IMAP host resolved for inbox %s: %r -> %s", self.inbox.id, self.inbox.imap_host, [item[1][0] for item in addresses]) cls = _ResolvedIMAP4SSL if self.inbox.imap_ssl else _ResolvedIMAP4 self.conn = cls(self.inbox.imap_host, self.inbox.imap_port, addresses) typ, _ = self.conn.login(self.inbox.username, self.inbox.password) if typ != "OK": raise IMAPError(f"IMAP login failed for {self.inbox.id}") logger.info("IMAP connection succeeded for inbox %s", self.inbox.id) return self def __exit__(self, exc_type, exc, tb) -> None: if self.conn is None: return try: self.conn.logout() except Exception: pass def select_folder(self, folder: str) -> None: assert self.conn is not None typ, data = self.conn.select(f'"{folder}"', readonly=False) if typ != "OK": raise IMAPError(f"IMAP folder does not exist or cannot be selected: {folder}") def search_uids(self, *, unread_only: bool, since: date | None = None, before: date | None = None, limit: int | None = None) -> list[str]: assert self.conn is not None criteria: list[str] = ["UNSEEN" if unread_only else "ALL"] if since: criteria.extend(["SINCE", since.strftime("%d-%b-%Y")]) if before: criteria.extend(["BEFORE", before.strftime("%d-%b-%Y")]) typ, data = self.conn.uid("SEARCH", None, *criteria) if typ != "OK": raise IMAPError("IMAP UID search failed") uids = data[0].decode().split() if data and data[0] else [] return uids[:limit] if limit else uids def fetch_message(self, uid: str) -> IMAPMessage: assert self.conn is not None typ, data = self.conn.uid("FETCH", uid, "(RFC822 FLAGS)") if typ != "OK" or not data: raise IMAPError(f"Could not fetch UID {uid}") raw = b"" flags = b"" for item in data: if isinstance(item, tuple): flags += item[0] raw = item[1] return IMAPMessage(uid=uid, raw=raw, seen=b"\\Seen" in flags, message=email.message_from_bytes(raw)) def mark_seen(self, uid: str) -> None: assert self.conn is not None self.conn.uid("STORE", uid, "+FLAGS", "(\\Seen)") def move(self, uid: str, folder: str) -> None: assert self.conn is not None typ, _ = self.conn.uid("COPY", uid, f'"{folder}"') if typ == "OK": self.conn.uid("STORE", uid, "+FLAGS", "(\\Deleted)") self.conn.expunge()