import argparse import json import base64 from pyspark.sql import SparkSession, types as T def d(s: str) -> str: if not s: return "" return base64.b64decode(s.encode("ascii")).decode("utf-8") def main() -> None: p = argparse.ArgumentParser(description="Write assistant action row via Spark DataFrame") p.add_argument("--table", required=True) p.add_argument("--action-id", required=True) p.add_argument("--created-at-utc", required=True) p.add_argument("--task-type", required=True) p.add_argument("--release-name", default="") p.add_argument("--objective-b64", default="") p.add_argument("--step-id", required=True) p.add_argument("--step-title-b64", default="") p.add_argument("--action-type", required=True) p.add_argument("--requires-approval", default="false") p.add_argument("--approved", default="false") p.add_argument("--status", required=True) p.add_argument("--output-b64", default="") p.add_argument("--error-b64", default="") args = p.parse_args() requires_approval = str(args.requires_approval).lower() == "true" approved = str(args.approved).lower() == "true" objective = d(args.objective_b64) step_title = d(args.step_title_b64) output_json = d(args.output_b64) error_text = d(args.error_b64) if not output_json: output_json = "{}" try: json.loads(output_json) except Exception: output_json = "{}" spark = SparkSession.builder.appName("write-assistant-action").getOrCreate() spark.sql( f""" CREATE TABLE IF NOT EXISTS {args.table} ( action_id STRING, created_at_utc STRING, task_type STRING, release_name STRING, objective STRING, step_id STRING, step_title STRING, action_type STRING, requires_approval BOOLEAN, approved BOOLEAN, status STRING, output_json STRING, error_text STRING ) USING iceberg """ ) schema = T.StructType( [ T.StructField("action_id", T.StringType(), False), T.StructField("created_at_utc", T.StringType(), False), T.StructField("task_type", T.StringType(), False), T.StructField("release_name", T.StringType(), True), T.StructField("objective", T.StringType(), True), T.StructField("step_id", T.StringType(), False), T.StructField("step_title", T.StringType(), True), T.StructField("action_type", T.StringType(), False), T.StructField("requires_approval", T.BooleanType(), False), T.StructField("approved", T.BooleanType(), False), T.StructField("status", T.StringType(), False), T.StructField("output_json", T.StringType(), True), T.StructField("error_text", T.StringType(), True), ] ) row = [ ( args.action_id, args.created_at_utc, args.task_type, args.release_name or "", objective, args.step_id, step_title, args.action_type, requires_approval, approved, args.status, output_json, error_text, ) ] df = spark.createDataFrame(row, schema=schema) df.writeTo(args.table).append() print(f"[DONE] Recorded assistant action {args.action_id} into {args.table}") if __name__ == "__main__": main()