194 lines
7.2 KiB
Python
194 lines
7.2 KiB
Python
|
|
import email
|
||
|
|
import hashlib
|
||
|
|
import imaplib
|
||
|
|
import os
|
||
|
|
from datetime import timezone
|
||
|
|
from email import policy
|
||
|
|
from email.utils import parseaddr, parsedate_to_datetime
|
||
|
|
from typing import Any, Dict, List, Optional
|
||
|
|
|
||
|
|
|
||
|
|
def clean_header_id(v: Optional[str]) -> str:
|
||
|
|
if not v:
|
||
|
|
return ""
|
||
|
|
return v.strip().strip("<>").strip()
|
||
|
|
|
||
|
|
|
||
|
|
def normalize_thread_id(msg_id: str, refs: str, in_reply_to: str, subject: str, sender: str) -> str:
|
||
|
|
refs_clean = clean_header_id(refs.split()[-1] if refs else "")
|
||
|
|
in_reply_clean = clean_header_id(in_reply_to)
|
||
|
|
if refs_clean:
|
||
|
|
return f"thread:{refs_clean}"
|
||
|
|
if in_reply_clean:
|
||
|
|
return f"thread:{in_reply_clean}"
|
||
|
|
seed = f"{subject.strip().lower()}|{sender.strip().lower()}"
|
||
|
|
if not seed.strip("|"):
|
||
|
|
seed = msg_id
|
||
|
|
return "thread:" + hashlib.sha256(seed.encode("utf-8")).hexdigest()[:24]
|
||
|
|
|
||
|
|
|
||
|
|
def extract_body_text(msg: email.message.Message) -> str:
|
||
|
|
try:
|
||
|
|
if msg.is_multipart():
|
||
|
|
for part in msg.walk():
|
||
|
|
ctype = (part.get_content_type() or "").lower()
|
||
|
|
disp = (part.get("Content-Disposition") or "").lower()
|
||
|
|
if ctype == "text/plain" and "attachment" not in disp:
|
||
|
|
payload_obj = part.get_content()
|
||
|
|
if isinstance(payload_obj, str):
|
||
|
|
return payload_obj.strip()
|
||
|
|
if isinstance(payload_obj, bytes):
|
||
|
|
return payload_obj.decode(part.get_content_charset() or "utf-8", errors="replace").strip()
|
||
|
|
for part in msg.walk():
|
||
|
|
ctype = (part.get_content_type() or "").lower()
|
||
|
|
if ctype == "text/html":
|
||
|
|
html_obj = part.get_content()
|
||
|
|
if isinstance(html_obj, bytes):
|
||
|
|
html_obj = html_obj.decode(part.get_content_charset() or "utf-8", errors="replace")
|
||
|
|
if isinstance(html_obj, str):
|
||
|
|
return html_obj.strip()
|
||
|
|
return ""
|
||
|
|
payload_obj = msg.get_content()
|
||
|
|
if isinstance(payload_obj, str):
|
||
|
|
return payload_obj.strip()
|
||
|
|
if isinstance(payload_obj, bytes):
|
||
|
|
return payload_obj.decode(msg.get_content_charset() or "utf-8", errors="replace").strip()
|
||
|
|
return ""
|
||
|
|
except Exception:
|
||
|
|
return ""
|
||
|
|
|
||
|
|
|
||
|
|
def fetch_imap_messages_blocking(
|
||
|
|
payload: Any,
|
||
|
|
effective_search_criteria: str,
|
||
|
|
since_uid: Optional[int],
|
||
|
|
) -> List[Dict[str, Any]]:
|
||
|
|
password = getattr(payload, "password", None) or os.getenv("IMAP_PASSWORD", "")
|
||
|
|
if not password:
|
||
|
|
raise ValueError("IMAP password missing: provide payload.password or set IMAP_PASSWORD")
|
||
|
|
|
||
|
|
host = str(getattr(payload, "host"))
|
||
|
|
port = int(getattr(payload, "port"))
|
||
|
|
use_ssl = bool(getattr(payload, "use_ssl"))
|
||
|
|
username = str(getattr(payload, "username"))
|
||
|
|
mailbox = str(getattr(payload, "mailbox"))
|
||
|
|
max_messages = int(getattr(payload, "max_messages"))
|
||
|
|
channel = str(getattr(payload, "channel"))
|
||
|
|
|
||
|
|
if use_ssl:
|
||
|
|
client = imaplib.IMAP4_SSL(host, port)
|
||
|
|
else:
|
||
|
|
client = imaplib.IMAP4(host, port)
|
||
|
|
|
||
|
|
try:
|
||
|
|
status, _ = client.login(username, password)
|
||
|
|
if status != "OK":
|
||
|
|
raise RuntimeError("IMAP login failed")
|
||
|
|
status, _ = client.select(mailbox, readonly=True)
|
||
|
|
if status != "OK":
|
||
|
|
raise RuntimeError(f"IMAP select mailbox failed: {mailbox}")
|
||
|
|
|
||
|
|
if since_uid is not None:
|
||
|
|
status, search_data = client.uid("search", None, "UID", f"{int(since_uid) + 1}:*")
|
||
|
|
else:
|
||
|
|
status, search_data = client.uid("search", None, effective_search_criteria)
|
||
|
|
if status != "OK":
|
||
|
|
raise RuntimeError(f"IMAP search failed: {effective_search_criteria}")
|
||
|
|
uid_bytes = search_data[0] if search_data else b""
|
||
|
|
uid_list = [u for u in uid_bytes.decode("utf-8", errors="replace").split() if u]
|
||
|
|
if since_uid is not None:
|
||
|
|
filtered: List[str] = []
|
||
|
|
for u in uid_list:
|
||
|
|
try:
|
||
|
|
if int(u) > int(since_uid):
|
||
|
|
filtered.append(u)
|
||
|
|
except Exception:
|
||
|
|
continue
|
||
|
|
uid_list = filtered
|
||
|
|
if not uid_list:
|
||
|
|
return []
|
||
|
|
is_uid_window = since_uid is not None
|
||
|
|
if is_uid_window:
|
||
|
|
selected_uids = uid_list[:max_messages]
|
||
|
|
else:
|
||
|
|
selected_uids = uid_list[-max_messages:]
|
||
|
|
|
||
|
|
out: List[Dict[str, Any]] = []
|
||
|
|
for uid in selected_uids:
|
||
|
|
status, msg_data = client.uid("fetch", uid, "(RFC822)")
|
||
|
|
if status != "OK" or not msg_data:
|
||
|
|
continue
|
||
|
|
raw_bytes = None
|
||
|
|
for part in msg_data:
|
||
|
|
if isinstance(part, tuple) and len(part) >= 2 and isinstance(part[1], (bytes, bytearray)):
|
||
|
|
raw_bytes = bytes(part[1])
|
||
|
|
break
|
||
|
|
if not raw_bytes:
|
||
|
|
continue
|
||
|
|
msg = email.message_from_bytes(raw_bytes, policy=policy.default)
|
||
|
|
|
||
|
|
subject = str(msg.get("Subject") or "").strip()
|
||
|
|
from_raw = str(msg.get("From") or "").strip()
|
||
|
|
to_raw = str(msg.get("To") or "").strip()
|
||
|
|
date_raw = str(msg.get("Date") or "").strip()
|
||
|
|
msg_id_raw = str(msg.get("Message-Id") or msg.get("Message-ID") or "").strip()
|
||
|
|
refs_raw = str(msg.get("References") or "").strip()
|
||
|
|
in_reply_raw = str(msg.get("In-Reply-To") or "").strip()
|
||
|
|
|
||
|
|
sender_email = parseaddr(from_raw)[1] or from_raw or "unknown"
|
||
|
|
msg_id_clean = clean_header_id(msg_id_raw)
|
||
|
|
if not msg_id_clean:
|
||
|
|
seed = f"{uid}|{subject}|{sender_email}|{date_raw}"
|
||
|
|
msg_id_clean = "imap-" + hashlib.sha256(seed.encode("utf-8")).hexdigest()[:24]
|
||
|
|
|
||
|
|
thread_id = normalize_thread_id(
|
||
|
|
msg_id=msg_id_clean,
|
||
|
|
refs=refs_raw,
|
||
|
|
in_reply_to=in_reply_raw,
|
||
|
|
subject=subject,
|
||
|
|
sender=sender_email,
|
||
|
|
)
|
||
|
|
|
||
|
|
sent_at_iso = None
|
||
|
|
if date_raw:
|
||
|
|
try:
|
||
|
|
dt = parsedate_to_datetime(date_raw)
|
||
|
|
if dt.tzinfo is None:
|
||
|
|
dt = dt.replace(tzinfo=timezone.utc)
|
||
|
|
sent_at_iso = dt.astimezone(timezone.utc).isoformat()
|
||
|
|
except Exception:
|
||
|
|
sent_at_iso = None
|
||
|
|
|
||
|
|
body = extract_body_text(msg)
|
||
|
|
if not body:
|
||
|
|
body = f"(no body) {subject}".strip()
|
||
|
|
|
||
|
|
metadata = {
|
||
|
|
"subject": subject,
|
||
|
|
"from": from_raw,
|
||
|
|
"to": to_raw,
|
||
|
|
"date": date_raw,
|
||
|
|
"imap_uid": uid,
|
||
|
|
"mailbox": mailbox,
|
||
|
|
"host": host,
|
||
|
|
"username": username,
|
||
|
|
}
|
||
|
|
out.append(
|
||
|
|
{
|
||
|
|
"thread_id": thread_id,
|
||
|
|
"message_id": msg_id_clean,
|
||
|
|
"sender": sender_email,
|
||
|
|
"channel": channel,
|
||
|
|
"sent_at": sent_at_iso,
|
||
|
|
"body": body,
|
||
|
|
"metadata": metadata,
|
||
|
|
}
|
||
|
|
)
|
||
|
|
return out
|
||
|
|
finally:
|
||
|
|
try:
|
||
|
|
client.logout()
|
||
|
|
except Exception:
|
||
|
|
pass
|
||
|
|
|