import argparse import json import os from pyspark.sql import SparkSession from pyspark.sql import functions as F def main() -> None: p = argparse.ArgumentParser(description="Query latest IMAP UID checkpoint from messages table") p.add_argument("--table", default=os.getenv("MESSAGES_TABLE", "lake.db1.messages")) p.add_argument("--host", required=True) p.add_argument("--mailbox", required=True) p.add_argument("--username", required=True) args = p.parse_args() spark = SparkSession.builder.appName("query-imap-checkpoint").getOrCreate() df = spark.table(args.table) md = F.col("metadata_json") uid_col = F.get_json_object(md, "$.imap_uid") host_col = F.get_json_object(md, "$.host") mailbox_col = F.get_json_object(md, "$.mailbox") username_col = F.get_json_object(md, "$.username") filtered = ( df.where(F.col("channel") == "email-imap") .where(host_col == args.host) .where(mailbox_col == args.mailbox) .where((username_col == args.username) | username_col.isNull() | (username_col == "")) .where(uid_col.isNotNull()) ) row = filtered.select(F.max(uid_col.cast("long")).alias("max_uid")).collect() max_uid = None if row and row[0]["max_uid"] is not None: max_uid = int(row[0]["max_uid"]) print(json.dumps({"max_uid": max_uid}, ensure_ascii=False)) if __name__ == "__main__": main()