jecio/services/imap_ingest.py

194 lines
7.2 KiB
Python
Raw Permalink Normal View History

import email
import hashlib
import imaplib
import os
from datetime import timezone
from email import policy
from email.utils import parseaddr, parsedate_to_datetime
from typing import Any, Dict, List, Optional
def clean_header_id(v: Optional[str]) -> str:
if not v:
return ""
return v.strip().strip("<>").strip()
def normalize_thread_id(msg_id: str, refs: str, in_reply_to: str, subject: str, sender: str) -> str:
refs_clean = clean_header_id(refs.split()[-1] if refs else "")
in_reply_clean = clean_header_id(in_reply_to)
if refs_clean:
return f"thread:{refs_clean}"
if in_reply_clean:
return f"thread:{in_reply_clean}"
seed = f"{subject.strip().lower()}|{sender.strip().lower()}"
if not seed.strip("|"):
seed = msg_id
return "thread:" + hashlib.sha256(seed.encode("utf-8")).hexdigest()[:24]
def extract_body_text(msg: email.message.Message) -> str:
try:
if msg.is_multipart():
for part in msg.walk():
ctype = (part.get_content_type() or "").lower()
disp = (part.get("Content-Disposition") or "").lower()
if ctype == "text/plain" and "attachment" not in disp:
payload_obj = part.get_content()
if isinstance(payload_obj, str):
return payload_obj.strip()
if isinstance(payload_obj, bytes):
return payload_obj.decode(part.get_content_charset() or "utf-8", errors="replace").strip()
for part in msg.walk():
ctype = (part.get_content_type() or "").lower()
if ctype == "text/html":
html_obj = part.get_content()
if isinstance(html_obj, bytes):
html_obj = html_obj.decode(part.get_content_charset() or "utf-8", errors="replace")
if isinstance(html_obj, str):
return html_obj.strip()
return ""
payload_obj = msg.get_content()
if isinstance(payload_obj, str):
return payload_obj.strip()
if isinstance(payload_obj, bytes):
return payload_obj.decode(msg.get_content_charset() or "utf-8", errors="replace").strip()
return ""
except Exception:
return ""
def fetch_imap_messages_blocking(
payload: Any,
effective_search_criteria: str,
since_uid: Optional[int],
) -> List[Dict[str, Any]]:
password = getattr(payload, "password", None) or os.getenv("IMAP_PASSWORD", "")
if not password:
raise ValueError("IMAP password missing: provide payload.password or set IMAP_PASSWORD")
host = str(getattr(payload, "host"))
port = int(getattr(payload, "port"))
use_ssl = bool(getattr(payload, "use_ssl"))
username = str(getattr(payload, "username"))
mailbox = str(getattr(payload, "mailbox"))
max_messages = int(getattr(payload, "max_messages"))
channel = str(getattr(payload, "channel"))
if use_ssl:
client = imaplib.IMAP4_SSL(host, port)
else:
client = imaplib.IMAP4(host, port)
try:
status, _ = client.login(username, password)
if status != "OK":
raise RuntimeError("IMAP login failed")
status, _ = client.select(mailbox, readonly=True)
if status != "OK":
raise RuntimeError(f"IMAP select mailbox failed: {mailbox}")
if since_uid is not None:
status, search_data = client.uid("search", None, "UID", f"{int(since_uid) + 1}:*")
else:
status, search_data = client.uid("search", None, effective_search_criteria)
if status != "OK":
raise RuntimeError(f"IMAP search failed: {effective_search_criteria}")
uid_bytes = search_data[0] if search_data else b""
uid_list = [u for u in uid_bytes.decode("utf-8", errors="replace").split() if u]
if since_uid is not None:
filtered: List[str] = []
for u in uid_list:
try:
if int(u) > int(since_uid):
filtered.append(u)
except Exception:
continue
uid_list = filtered
if not uid_list:
return []
is_uid_window = since_uid is not None
if is_uid_window:
selected_uids = uid_list[:max_messages]
else:
selected_uids = uid_list[-max_messages:]
out: List[Dict[str, Any]] = []
for uid in selected_uids:
status, msg_data = client.uid("fetch", uid, "(RFC822)")
if status != "OK" or not msg_data:
continue
raw_bytes = None
for part in msg_data:
if isinstance(part, tuple) and len(part) >= 2 and isinstance(part[1], (bytes, bytearray)):
raw_bytes = bytes(part[1])
break
if not raw_bytes:
continue
msg = email.message_from_bytes(raw_bytes, policy=policy.default)
subject = str(msg.get("Subject") or "").strip()
from_raw = str(msg.get("From") or "").strip()
to_raw = str(msg.get("To") or "").strip()
date_raw = str(msg.get("Date") or "").strip()
msg_id_raw = str(msg.get("Message-Id") or msg.get("Message-ID") or "").strip()
refs_raw = str(msg.get("References") or "").strip()
in_reply_raw = str(msg.get("In-Reply-To") or "").strip()
sender_email = parseaddr(from_raw)[1] or from_raw or "unknown"
msg_id_clean = clean_header_id(msg_id_raw)
if not msg_id_clean:
seed = f"{uid}|{subject}|{sender_email}|{date_raw}"
msg_id_clean = "imap-" + hashlib.sha256(seed.encode("utf-8")).hexdigest()[:24]
thread_id = normalize_thread_id(
msg_id=msg_id_clean,
refs=refs_raw,
in_reply_to=in_reply_raw,
subject=subject,
sender=sender_email,
)
sent_at_iso = None
if date_raw:
try:
dt = parsedate_to_datetime(date_raw)
if dt.tzinfo is None:
dt = dt.replace(tzinfo=timezone.utc)
sent_at_iso = dt.astimezone(timezone.utc).isoformat()
except Exception:
sent_at_iso = None
body = extract_body_text(msg)
if not body:
body = f"(no body) {subject}".strip()
metadata = {
"subject": subject,
"from": from_raw,
"to": to_raw,
"date": date_raw,
"imap_uid": uid,
"mailbox": mailbox,
"host": host,
"username": username,
}
out.append(
{
"thread_id": thread_id,
"message_id": msg_id_clean,
"sender": sender_email,
"channel": channel,
"sent_at": sent_at_iso,
"body": body,
"metadata": metadata,
}
)
return out
finally:
try:
client.logout()
except Exception:
pass