import argparse import base64 import json from pyspark.sql import SparkSession, types as T def d(s: str) -> str: if not s: return "" return base64.b64decode(s.encode("ascii")).decode("utf-8") def main() -> None: p = argparse.ArgumentParser(description="Write assistant proposal set rows via Spark DataFrame") p.add_argument("--table", required=True) p.add_argument("--proposal-set-id", required=True) p.add_argument("--created-at-utc", required=True) p.add_argument("--objective-b64", default="") p.add_argument("--release-name", default="") p.add_argument("--summary-b64", default="") p.add_argument("--signals-b64", default="") p.add_argument("--proposals-b64", default="") args = p.parse_args() objective = d(args.objective_b64) summary = d(args.summary_b64) signals_json = d(args.signals_b64) or "{}" proposals_json = d(args.proposals_b64) or "[]" try: signals_obj = json.loads(signals_json) if not isinstance(signals_obj, dict): signals_obj = {} except Exception: signals_obj = {} signals_json = json.dumps(signals_obj, ensure_ascii=False, sort_keys=True) try: proposals_obj = json.loads(proposals_json) if not isinstance(proposals_obj, list): proposals_obj = [] except Exception: proposals_obj = [] spark = SparkSession.builder.appName("write-assistant-proposals").getOrCreate() spark.sql( f""" CREATE TABLE IF NOT EXISTS {args.table} ( proposal_set_id STRING, created_at_utc STRING, objective STRING, release_name STRING, summary STRING, signals_json STRING, proposal_id STRING, title STRING, problem STRING, change_text STRING, files_json STRING, risk STRING, tests_json STRING, auto_apply_safe BOOLEAN ) USING iceberg """ ) schema = T.StructType( [ T.StructField("proposal_set_id", T.StringType(), False), T.StructField("created_at_utc", T.StringType(), False), T.StructField("objective", T.StringType(), True), T.StructField("release_name", T.StringType(), True), T.StructField("summary", T.StringType(), True), T.StructField("signals_json", T.StringType(), True), T.StructField("proposal_id", T.StringType(), False), T.StructField("title", T.StringType(), True), T.StructField("problem", T.StringType(), True), T.StructField("change_text", T.StringType(), True), T.StructField("files_json", T.StringType(), True), T.StructField("risk", T.StringType(), True), T.StructField("tests_json", T.StringType(), True), T.StructField("auto_apply_safe", T.BooleanType(), False), ] ) rows = [] for idx, p_obj in enumerate(proposals_obj): if not isinstance(p_obj, dict): continue files = p_obj.get("files") tests = p_obj.get("tests") if not isinstance(files, list): files = [] if not isinstance(tests, list): tests = [] proposal_id = str(p_obj.get("proposal_id") or f"P{idx+1}") rows.append( ( args.proposal_set_id, args.created_at_utc, objective, args.release_name or "", summary, signals_json, proposal_id, str(p_obj.get("title") or ""), str(p_obj.get("problem") or ""), str(p_obj.get("change") or ""), json.dumps(files, ensure_ascii=False), str(p_obj.get("risk") or "medium"), json.dumps(tests, ensure_ascii=False), bool(p_obj.get("auto_apply_safe", False)), ) ) if not rows: rows.append( ( args.proposal_set_id, args.created_at_utc, objective, args.release_name or "", summary, signals_json, "P0", "No proposals", "No proposals returned", "", "[]", "low", "[]", False, ) ) df = spark.createDataFrame(rows, schema=schema) df.writeTo(args.table).append() print(f"[DONE] Recorded proposal set {args.proposal_set_id} with {len(rows)} row(s) into {args.table}") if __name__ == "__main__": main()