From c6dad7427a23ff6d9b1c0ecba97dc1fbafac24f6 Mon Sep 17 00:00:00 2001 From: Carl Niklas Rydberg Date: Sat, 14 Feb 2026 21:54:36 +0100 Subject: [PATCH] feat(self-improve): persist proposal sets to Iceberg with history and apply-by-id --- app.py | 279 +++++++++++++++++- ...assistant-proposals-via-spark-container.sh | 33 +++ query_assistant_proposals.py | 36 +++ ...assistant-proposals-via-spark-container.sh | 48 +++ ui/assets/app.js | 122 +++++--- ui/index.html | 1 + write_assistant_proposals.py | 143 +++++++++ 7 files changed, 621 insertions(+), 41 deletions(-) create mode 100755 query-assistant-proposals-via-spark-container.sh create mode 100644 query_assistant_proposals.py create mode 100755 record-assistant-proposals-via-spark-container.sh create mode 100644 write_assistant_proposals.py diff --git a/app.py b/app.py index cbd2f01..d869be3 100644 --- a/app.py +++ b/app.py @@ -76,6 +76,14 @@ ASSISTANT_ACTIONS_QUERY_REMOTE_SCRIPT = os.getenv( "ASSISTANT_ACTIONS_QUERY_REMOTE_SCRIPT", "./query-assistant-actions-via-spark-container.sh", ) +ASSISTANT_PROPOSALS_REMOTE_SCRIPT = os.getenv( + "ASSISTANT_PROPOSALS_REMOTE_SCRIPT", + "./record-assistant-proposals-via-spark-container.sh", +) +ASSISTANT_PROPOSALS_QUERY_REMOTE_SCRIPT = os.getenv( + "ASSISTANT_PROPOSALS_QUERY_REMOTE_SCRIPT", + "./query-assistant-proposals-via-spark-container.sh", +) IMAP_CHECKPOINT_REMOTE_SCRIPT = os.getenv( "IMAP_CHECKPOINT_REMOTE_SCRIPT", "./query-imap-checkpoint-via-spark-container.sh", @@ -316,6 +324,8 @@ class AssistantSelfImproveProposal(BaseModel): class AssistantSelfImproveResponse(BaseModel): objective: str release_name: Optional[str] = None + proposal_set_id: str + created_at_utc: str summary: str proposals: List[AssistantSelfImproveProposal] signals: Dict[str, Any] @@ -326,7 +336,9 @@ class AssistantSelfImproveResponse(BaseModel): class AssistantSelfImproveApplyPayload(BaseModel): objective: Optional[str] = None release_name: Optional[str] = None - proposal: AssistantSelfImproveProposal + proposal_set_id: Optional[str] = None + proposal_id: Optional[str] = None + proposal: Optional[AssistantSelfImproveProposal] = None dry_run: bool = False @@ -1941,6 +1953,109 @@ async def run_remote_query_assistant_actions( } +async def run_remote_record_assistant_proposals( + proposal_set_id: str, + created_at_utc: str, + objective: str, + release_name: Optional[str], + summary: str, + signals: Dict[str, Any], + proposals: List[Dict[str, Any]], +) -> Dict[str, Any]: + parts = [ + ASSISTANT_PROPOSALS_REMOTE_SCRIPT, + proposal_set_id, + created_at_utc, + _b64(objective or ""), + release_name or "", + _b64(summary or ""), + _b64(json.dumps(signals or {}, ensure_ascii=False)), + _b64(json.dumps(proposals or [], ensure_ascii=False)), + ] + command = f"cd {shlex.quote(PROJECTOR_REMOTE_DIR)} && {' '.join(shlex.quote(p) for p in parts)}" + ssh_args = [PROJECTOR_SSH_BIN, *shlex.split(PROJECTOR_SSH_OPTS), PROJECTOR_SSH_HOST, command] + proc = await asyncio.create_subprocess_exec( + *ssh_args, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE, + ) + try: + stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=PROJECTOR_TIMEOUT_SEC) + except asyncio.TimeoutError: + proc.kill() + await proc.wait() + raise HTTPException(status_code=504, detail="Assistant proposals logging timed out") + + out = stdout.decode("utf-8", errors="replace") + err = stderr.decode("utf-8", errors="replace") + result = { + "host": PROJECTOR_SSH_HOST, + "remote_dir": PROJECTOR_REMOTE_DIR, + "exit_code": proc.returncode, + "stdout_tail": _tail(out), + "stderr_tail": _tail(err), + } + if proc.returncode != 0: + raise HTTPException(status_code=502, detail=result) + return result + + +async def run_remote_query_assistant_proposals( + release_name: Optional[str], + proposal_set_id: Optional[str], + limit: int, +) -> Dict[str, Any]: + parts = [ + ASSISTANT_PROPOSALS_QUERY_REMOTE_SCRIPT, + release_name or "", + proposal_set_id or "", + str(limit), + ] + command = f"cd {shlex.quote(PROJECTOR_REMOTE_DIR)} && {' '.join(shlex.quote(p) for p in parts)}" + ssh_args = [PROJECTOR_SSH_BIN, *shlex.split(PROJECTOR_SSH_OPTS), PROJECTOR_SSH_HOST, command] + proc = await asyncio.create_subprocess_exec( + *ssh_args, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE, + ) + try: + stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=PROJECTOR_TIMEOUT_SEC) + except asyncio.TimeoutError: + proc.kill() + await proc.wait() + raise HTTPException(status_code=504, detail="Assistant proposals query timed out") + + out = stdout.decode("utf-8", errors="replace") + err = stderr.decode("utf-8", errors="replace") + if proc.returncode != 0: + raise HTTPException( + status_code=502, + detail={ + "host": PROJECTOR_SSH_HOST, + "remote_dir": PROJECTOR_REMOTE_DIR, + "exit_code": proc.returncode, + "stdout_tail": _tail(out), + "stderr_tail": _tail(err), + }, + ) + try: + rows = _extract_json_array_from_text(out) + except Exception as e: + raise HTTPException( + status_code=502, + detail={ + "message": f"Unable to parse proposals query output: {e}", + "stdout_tail": _tail(out), + "stderr_tail": _tail(err), + }, + ) + return { + "host": PROJECTOR_SSH_HOST, + "remote_dir": PROJECTOR_REMOTE_DIR, + "rows": rows, + } + + async def run_remote_record_run( run_id: str, run_type: str, @@ -2567,6 +2682,26 @@ async def assistant_self_improve( if not proposals: proposals = fallback_self_improve_proposals()[: payload.max_proposals] + created_at_utc = now_iso() + proposal_set_id = str(uuid.uuid4()) + signals = { + "feedback_rows": len(feedback_rows), + "blocked_action_rows": len(action_rows), + } + proposal_dicts = [p.model_dump() for p in proposals[: payload.max_proposals]] + try: + await run_remote_record_assistant_proposals( + proposal_set_id=proposal_set_id, + created_at_utc=created_at_utc, + objective=payload.objective, + release_name=payload.release_name, + summary=summary, + signals=signals, + proposals=proposal_dicts, + ) + except Exception as e: + print(f"[WARN] self-improve proposal persistence failed: {e}") + apply_blocked = payload.apply apply_block_reason: Optional[str] = None if payload.apply: @@ -2575,17 +2710,98 @@ async def assistant_self_improve( return AssistantSelfImproveResponse( objective=payload.objective, release_name=payload.release_name, + proposal_set_id=proposal_set_id, + created_at_utc=created_at_utc, summary=summary, proposals=proposals[: payload.max_proposals], - signals={ - "feedback_rows": len(feedback_rows), - "blocked_action_rows": len(action_rows), - }, + signals=signals, apply_blocked=apply_blocked, apply_block_reason=apply_block_reason, ) +@app.get("/assistant/self-improve/history") +async def assistant_self_improve_history( + release_name: Optional[str] = None, + proposal_set_id: Optional[str] = None, + limit: int = 200, + x_admin_api_key: Optional[str] = Header(default=None), +): + check_admin_api_key(x_admin_api_key) + bounded_limit = max(1, min(limit, 2000)) + result = await run_remote_query_assistant_proposals( + release_name=release_name, + proposal_set_id=proposal_set_id, + limit=bounded_limit, + ) + rows = result["rows"] + + grouped: Dict[str, Dict[str, Any]] = {} + for r in rows: + set_id = str(r.get("proposal_set_id") or "") + if not set_id: + continue + g = grouped.get(set_id) + if g is None: + signals_json = r.get("signals_json") or "{}" + try: + signals = json.loads(signals_json) + if not isinstance(signals, dict): + signals = {} + except Exception: + signals = {} + g = { + "proposal_set_id": set_id, + "created_at_utc": r.get("created_at_utc"), + "objective": r.get("objective"), + "release_name": r.get("release_name"), + "summary": r.get("summary"), + "signals": signals, + "proposals": [], + } + grouped[set_id] = g + + files_json = r.get("files_json") or "[]" + tests_json = r.get("tests_json") or "[]" + try: + files = json.loads(files_json) + if not isinstance(files, list): + files = [] + except Exception: + files = [] + try: + tests = json.loads(tests_json) + if not isinstance(tests, list): + tests = [] + except Exception: + tests = [] + + g["proposals"].append( + { + "proposal_id": r.get("proposal_id"), + "title": r.get("title"), + "problem": r.get("problem"), + "change": r.get("change_text"), + "files": files, + "risk": r.get("risk") or "medium", + "tests": tests, + "auto_apply_safe": bool(r.get("auto_apply_safe", False)), + } + ) + + sets = list(grouped.values()) + sets.sort(key=lambda x: str(x.get("created_at_utc") or ""), reverse=True) + return { + "count": len(sets), + "filters": { + "release_name": release_name, + "proposal_set_id": proposal_set_id, + "limit": bounded_limit, + }, + "rows": sets, + } + + @app.post("/assistant/self-improve/apply", response_model=AssistantSelfImproveApplyResponse) async def assistant_self_improve_apply( payload: AssistantSelfImproveApplyPayload, @@ -2593,7 +2809,58 @@ async def assistant_self_improve_apply( ): check_admin_api_key(x_admin_api_key) repo_dir = Path(__file__).resolve().parent - proposal = payload.proposal + proposal: Optional[AssistantSelfImproveProposal] = payload.proposal + if proposal is None: + if not (payload.proposal_set_id and payload.proposal_id): + raise HTTPException( + status_code=400, + detail="Provide either proposal object or both proposal_set_id and proposal_id.", + ) + hist = await run_remote_query_assistant_proposals( + release_name=payload.release_name, + proposal_set_id=payload.proposal_set_id, + limit=500, + ) + matched: Optional[AssistantSelfImproveProposal] = None + for r in hist.get("rows", []): + if str(r.get("proposal_id") or "") != payload.proposal_id: + continue + files_json = r.get("files_json") or "[]" + tests_json = r.get("tests_json") or "[]" + try: + files = json.loads(files_json) + if not isinstance(files, list): + files = [] + except Exception: + files = [] + try: + tests = json.loads(tests_json) + if not isinstance(tests, list): + tests = [] + except Exception: + tests = [] + risk_raw = str(r.get("risk") or "medium").strip().lower() + risk: Literal["low", "medium", "high"] = "medium" + if risk_raw in ("low", "medium", "high"): + risk = risk_raw # type: ignore[assignment] + matched = AssistantSelfImproveProposal( + proposal_id=str(r.get("proposal_id") or payload.proposal_id), + title=str(r.get("title") or ""), + problem=str(r.get("problem") or ""), + change=str(r.get("change_text") or ""), + files=[str(x) for x in files if str(x).strip()], + risk=risk, + tests=[str(x) for x in tests if str(x).strip()], + auto_apply_safe=bool(r.get("auto_apply_safe", False)), + ) + break + if matched is None: + raise HTTPException( + status_code=404, + detail=f"Proposal not found for set_id={payload.proposal_set_id} proposal_id={payload.proposal_id}", + ) + proposal = matched + assert proposal is not None branch_slug = _slugify(f"{proposal.proposal_id}-{proposal.title}") branch_name = f"assistant/{branch_slug}" diff --git a/query-assistant-proposals-via-spark-container.sh b/query-assistant-proposals-via-spark-container.sh new file mode 100755 index 0000000..4d5ef58 --- /dev/null +++ b/query-assistant-proposals-via-spark-container.sh @@ -0,0 +1,33 @@ +#!/usr/bin/env bash +set -euo pipefail + +RELEASE_NAME="${1:-}" +PROPOSAL_SET_ID="${2:-}" +LIMIT="${3:-200}" +PROPOSAL_TABLE="${PROPOSAL_TABLE:-lake.db1.assistant_proposals}" + +CONTAINER_NAME="${SPARK_CONTAINER_NAME:-spark}" +SPARK_PROPS="${SPARK_PROPS:-/opt/lakehouse/spark-conf/lakehouse-spark-defaults.conf}" +PACKAGES="${SPARK_PACKAGES:-org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.10.1,org.apache.iceberg:iceberg-aws-bundle:1.10.1,org.projectnessie.nessie-integrations:nessie-spark-extensions-3.5_2.12:0.104.5}" +SCRIPT_LOCAL="${SCRIPT_LOCAL:-./query_assistant_proposals.py}" +SCRIPT_REMOTE="/tmp/query_assistant_proposals.py" + +if [[ ! -f "$SCRIPT_LOCAL" ]]; then + echo "query_assistant_proposals.py not found at: $SCRIPT_LOCAL" >&2 + exit 1 +fi + +docker cp "$SCRIPT_LOCAL" "$CONTAINER_NAME":"$SCRIPT_REMOTE" + +docker exec \ + -e AWS_REGION="${AWS_REGION:-us-east-1}" \ + -e AWS_DEFAULT_REGION="${AWS_DEFAULT_REGION:-us-east-1}" \ + "$CONTAINER_NAME" \ + /opt/spark/bin/spark-submit \ + --properties-file "$SPARK_PROPS" \ + --packages "$PACKAGES" \ + "$SCRIPT_REMOTE" \ + --table "$PROPOSAL_TABLE" \ + --release-name "$RELEASE_NAME" \ + --proposal-set-id "$PROPOSAL_SET_ID" \ + --limit "$LIMIT" diff --git a/query_assistant_proposals.py b/query_assistant_proposals.py new file mode 100644 index 0000000..277220e --- /dev/null +++ b/query_assistant_proposals.py @@ -0,0 +1,36 @@ +import argparse +import json +import os + +from pyspark.sql import SparkSession +from pyspark.sql import functions as F + + +def main() -> None: + p = argparse.ArgumentParser(description="Query assistant proposal rows") + p.add_argument("--table", default=os.getenv("PROPOSAL_TABLE", "lake.db1.assistant_proposals")) + p.add_argument("--release-name", default="") + p.add_argument("--proposal-set-id", default="") + p.add_argument("--limit", type=int, default=200) + args = p.parse_args() + + spark = SparkSession.builder.appName("query-assistant-proposals").getOrCreate() + df = spark.table(args.table) + + if args.release_name: + df = df.where(F.col("release_name") == args.release_name) + if args.proposal_set_id: + df = df.where(F.col("proposal_set_id") == args.proposal_set_id) + + rows = ( + df.orderBy(F.col("created_at_utc").desc_nulls_last(), F.col("proposal_id").asc_nulls_last()) + .limit(max(1, min(args.limit, 2000))) + .collect() + ) + + out = [r.asDict(recursive=True) for r in rows] + print(json.dumps(out, ensure_ascii=False)) + + +if __name__ == "__main__": + main() diff --git a/record-assistant-proposals-via-spark-container.sh b/record-assistant-proposals-via-spark-container.sh new file mode 100755 index 0000000..a96b658 --- /dev/null +++ b/record-assistant-proposals-via-spark-container.sh @@ -0,0 +1,48 @@ +#!/usr/bin/env bash +set -euo pipefail + +PROPOSAL_TABLE="${PROPOSAL_TABLE:-lake.db1.assistant_proposals}" +PROPOSAL_SET_ID="${1:-}" +CREATED_AT_UTC="${2:-}" +OBJECTIVE_B64="${3:-}" +RELEASE_NAME="${4:-}" +SUMMARY_B64="${5:-}" +SIGNALS_B64="${6:-}" +PROPOSALS_B64="${7:-}" + +if [[ -z "$PROPOSAL_SET_ID" || -z "$CREATED_AT_UTC" ]]; then + echo "Usage: $0 " >&2 + exit 1 +fi + +CONTAINER_NAME="${SPARK_CONTAINER_NAME:-spark}" +SPARK_PROPS="${SPARK_PROPS:-/opt/lakehouse/spark-conf/lakehouse-spark-defaults.conf}" +PACKAGES="${SPARK_PACKAGES:-org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.10.1,org.apache.iceberg:iceberg-aws-bundle:1.10.1,org.projectnessie.nessie-integrations:nessie-spark-extensions-3.5_2.12:0.104.5}" +SCRIPT_LOCAL="${SCRIPT_LOCAL:-./write_assistant_proposals.py}" +SCRIPT_REMOTE="/tmp/write_assistant_proposals.py" + +if [[ ! -f "$SCRIPT_LOCAL" ]]; then + echo "write_assistant_proposals.py not found at: $SCRIPT_LOCAL" >&2 + exit 1 +fi + +docker cp "$SCRIPT_LOCAL" "$CONTAINER_NAME":"$SCRIPT_REMOTE" + +docker exec \ + -e AWS_REGION="${AWS_REGION:-us-east-1}" \ + -e AWS_DEFAULT_REGION="${AWS_DEFAULT_REGION:-us-east-1}" \ + "$CONTAINER_NAME" \ + /opt/spark/bin/spark-submit \ + --properties-file "$SPARK_PROPS" \ + --packages "$PACKAGES" \ + "$SCRIPT_REMOTE" \ + --table "$PROPOSAL_TABLE" \ + --proposal-set-id "$PROPOSAL_SET_ID" \ + --created-at-utc "$CREATED_AT_UTC" \ + --objective-b64 "$OBJECTIVE_B64" \ + --release-name "$RELEASE_NAME" \ + --summary-b64 "$SUMMARY_B64" \ + --signals-b64 "$SIGNALS_B64" \ + --proposals-b64 "$PROPOSALS_B64" + +echo "[DONE] Recorded proposal set ${PROPOSAL_SET_ID} into ${PROPOSAL_TABLE}" diff --git a/ui/assets/app.js b/ui/assets/app.js index 61e8792..04fa883 100644 --- a/ui/assets/app.js +++ b/ui/assets/app.js @@ -237,42 +237,92 @@ async function runSelfImprove() { include_blocked_actions: true, apply: false, }); - window.selfImproveLast = data.proposals || []; - summary.textContent = `${data.summary || ""}\n\nsignals: feedback=${data.signals?.feedback_rows ?? 0}, blocked_actions=${data.signals?.blocked_action_rows ?? 0}`; - renderRows(list, data.proposals || [], (p, idx) => ` -
${p.proposal_id}: ${p.title}
-
${p.problem}
-
risk=${p.risk} | auto_apply_safe=${p.auto_apply_safe}
-
files=${(p.files || []).join(", ") || "-"}
-
tests=${(p.tests || []).join(" | ") || "-"}
-
${p.change || ""}
-
- `); - document.querySelectorAll(".apply-proposal").forEach((btn) => { - btn.addEventListener("click", async () => { - const proposalId = btn.getAttribute("data-proposal-id"); - const found = (window.selfImproveLast || []).find((x) => x.proposal_id === proposalId); - if (!found) return; - const dryRun = document.getElementById("improveDryRun").checked; - summary.textContent = `Applying ${proposalId}...`; - try { - const applied = await apiPost("/assistant/self-improve/apply", { - objective, - release_name: cfg.releaseName || null, - proposal: found, - dry_run: dryRun, - }); - summary.textContent = - `apply result: applied=${applied.applied} dry_run=${applied.dry_run}\n` + - `branch=${applied.branch_name}\n` + - `proposal_file=${applied.proposal_file}\n` + - `commit=${applied.commit || "-"}\n` + - `${applied.detail || ""}`; - } catch (e) { - summary.textContent = `Apply error: ${String(e)}`; - } - }); + summary.textContent = + `${data.summary || ""}\n\nproposal_set_id=${data.proposal_set_id}\n` + + `signals: feedback=${data.signals?.feedback_rows ?? 0}, blocked_actions=${data.signals?.blocked_action_rows ?? 0}`; + renderProposalSets( + [{ proposal_set_id: data.proposal_set_id, created_at_utc: data.created_at_utc, objective, release_name: cfg.releaseName || "", summary: data.summary, signals: data.signals || {}, proposals: data.proposals || [] }], + list, + summary, + cfg.releaseName || null + ); + } catch (e) { + summary.textContent = `Error: ${String(e)}`; + } +} + +function renderProposalSets(sets, listEl, summaryEl, releaseName) { + listEl.innerHTML = ""; + if (!sets || sets.length === 0) { + listEl.innerHTML = '
No proposal sets.
'; + return; + } + sets.forEach((setObj) => { + const wrap = document.createElement("div"); + wrap.className = "row"; + const proposals = setObj.proposals || []; + const proposalsHtml = proposals.map((p) => ` +
+
${p.proposal_id}: ${p.title}
+
${p.problem || ""}
+
risk=${p.risk} | auto_apply_safe=${p.auto_apply_safe}
+
files=${(p.files || []).join(", ") || "-"}
+
tests=${(p.tests || []).join(" | ") || "-"}
+
${p.change || ""}
+
+ +
+
+ `).join(""); + wrap.innerHTML = ` +
Set ${setObj.proposal_set_id}
+
${setObj.created_at_utc || ""} | release=${setObj.release_name || ""}
+
${setObj.summary || ""}
+ ${proposalsHtml} + `; + listEl.appendChild(wrap); + }); + + document.querySelectorAll(".apply-proposal").forEach((btn) => { + btn.addEventListener("click", async () => { + const proposalSetId = btn.getAttribute("data-proposal-set-id"); + const proposalId = btn.getAttribute("data-proposal-id"); + if (!proposalSetId || !proposalId) return; + const dryRun = document.getElementById("improveDryRun").checked; + summaryEl.textContent = `Applying ${proposalId} from set ${proposalSetId}...`; + try { + const applied = await apiPost("/assistant/self-improve/apply", { + objective: document.getElementById("improveObjective").value.trim() || null, + release_name: releaseName || null, + proposal_set_id: proposalSetId, + proposal_id: proposalId, + dry_run: dryRun, + }); + summaryEl.textContent = + `apply result: applied=${applied.applied} dry_run=${applied.dry_run}\n` + + `branch=${applied.branch_name}\n` + + `proposal_file=${applied.proposal_file}\n` + + `commit=${applied.commit || "-"}\n` + + `${applied.detail || ""}`; + } catch (e) { + summaryEl.textContent = `Apply error: ${String(e)}`; + } }); + }); +} + +async function loadImproveHistory() { + const cfg = getConfig(); + const summary = document.getElementById("improveSummary"); + const list = document.getElementById("improveProposals"); + summary.textContent = "Loading proposal history..."; + try { + const data = await apiGet("/assistant/self-improve/history", { + release_name: cfg.releaseName || null, + limit: 200, + }); + summary.textContent = `history sets=${data.count || 0}`; + renderProposalSets(data.rows || [], list, summary, cfg.releaseName || null); } catch (e) { summary.textContent = `Error: ${String(e)}`; } @@ -286,6 +336,8 @@ document.getElementById("makeDraft").addEventListener("click", makeDraft); document.getElementById("saveLearn").addEventListener("click", saveLearn); document.getElementById("sendChat").addEventListener("click", sendChat); document.getElementById("runImprove").addEventListener("click", runSelfImprove); +document.getElementById("loadImproveHistory").addEventListener("click", loadImproveHistory); loadConfig(); loadMeta(); +loadImproveHistory(); diff --git a/ui/index.html b/ui/index.html index 330030e..d0cdd45 100644 --- a/ui/index.html +++ b/ui/index.html @@ -86,6 +86,7 @@ +

diff --git a/write_assistant_proposals.py b/write_assistant_proposals.py
new file mode 100644
index 0000000..fc84636
--- /dev/null
+++ b/write_assistant_proposals.py
@@ -0,0 +1,143 @@
+import argparse
+import base64
+import json
+
+from pyspark.sql import SparkSession, types as T
+
+
+def d(s: str) -> str:
+    if not s:
+        return ""
+    return base64.b64decode(s.encode("ascii")).decode("utf-8")
+
+
+def main() -> None:
+    p = argparse.ArgumentParser(description="Write assistant proposal set rows via Spark DataFrame")
+    p.add_argument("--table", required=True)
+    p.add_argument("--proposal-set-id", required=True)
+    p.add_argument("--created-at-utc", required=True)
+    p.add_argument("--objective-b64", default="")
+    p.add_argument("--release-name", default="")
+    p.add_argument("--summary-b64", default="")
+    p.add_argument("--signals-b64", default="")
+    p.add_argument("--proposals-b64", default="")
+    args = p.parse_args()
+
+    objective = d(args.objective_b64)
+    summary = d(args.summary_b64)
+    signals_json = d(args.signals_b64) or "{}"
+    proposals_json = d(args.proposals_b64) or "[]"
+
+    try:
+        signals_obj = json.loads(signals_json)
+        if not isinstance(signals_obj, dict):
+            signals_obj = {}
+    except Exception:
+        signals_obj = {}
+    signals_json = json.dumps(signals_obj, ensure_ascii=False, sort_keys=True)
+
+    try:
+        proposals_obj = json.loads(proposals_json)
+        if not isinstance(proposals_obj, list):
+            proposals_obj = []
+    except Exception:
+        proposals_obj = []
+
+    spark = SparkSession.builder.appName("write-assistant-proposals").getOrCreate()
+    spark.sql(
+        f"""
+        CREATE TABLE IF NOT EXISTS {args.table} (
+          proposal_set_id STRING,
+          created_at_utc STRING,
+          objective STRING,
+          release_name STRING,
+          summary STRING,
+          signals_json STRING,
+          proposal_id STRING,
+          title STRING,
+          problem STRING,
+          change_text STRING,
+          files_json STRING,
+          risk STRING,
+          tests_json STRING,
+          auto_apply_safe BOOLEAN
+        ) USING iceberg
+        """
+    )
+
+    schema = T.StructType(
+        [
+            T.StructField("proposal_set_id", T.StringType(), False),
+            T.StructField("created_at_utc", T.StringType(), False),
+            T.StructField("objective", T.StringType(), True),
+            T.StructField("release_name", T.StringType(), True),
+            T.StructField("summary", T.StringType(), True),
+            T.StructField("signals_json", T.StringType(), True),
+            T.StructField("proposal_id", T.StringType(), False),
+            T.StructField("title", T.StringType(), True),
+            T.StructField("problem", T.StringType(), True),
+            T.StructField("change_text", T.StringType(), True),
+            T.StructField("files_json", T.StringType(), True),
+            T.StructField("risk", T.StringType(), True),
+            T.StructField("tests_json", T.StringType(), True),
+            T.StructField("auto_apply_safe", T.BooleanType(), False),
+        ]
+    )
+
+    rows = []
+    for idx, p_obj in enumerate(proposals_obj):
+        if not isinstance(p_obj, dict):
+            continue
+        files = p_obj.get("files")
+        tests = p_obj.get("tests")
+        if not isinstance(files, list):
+            files = []
+        if not isinstance(tests, list):
+            tests = []
+        proposal_id = str(p_obj.get("proposal_id") or f"P{idx+1}")
+        rows.append(
+            (
+                args.proposal_set_id,
+                args.created_at_utc,
+                objective,
+                args.release_name or "",
+                summary,
+                signals_json,
+                proposal_id,
+                str(p_obj.get("title") or ""),
+                str(p_obj.get("problem") or ""),
+                str(p_obj.get("change") or ""),
+                json.dumps(files, ensure_ascii=False),
+                str(p_obj.get("risk") or "medium"),
+                json.dumps(tests, ensure_ascii=False),
+                bool(p_obj.get("auto_apply_safe", False)),
+            )
+        )
+
+    if not rows:
+        rows.append(
+            (
+                args.proposal_set_id,
+                args.created_at_utc,
+                objective,
+                args.release_name or "",
+                summary,
+                signals_json,
+                "P0",
+                "No proposals",
+                "No proposals returned",
+                "",
+                "[]",
+                "low",
+                "[]",
+                False,
+            )
+        )
+
+    df = spark.createDataFrame(rows, schema=schema)
+    df.writeTo(args.table).append()
+    print(f"[DONE] Recorded proposal set {args.proposal_set_id} with {len(rows)} row(s) into {args.table}")
+
+
+if __name__ == "__main__":
+    main()