jecio/write_assistant_feedback.py
2026-02-14 21:10:26 +01:00

104 lines
3.3 KiB
Python

import argparse
import base64
import json
from pyspark.sql import SparkSession, types as T
def d(s: str) -> str:
if not s:
return ""
return base64.b64decode(s.encode("ascii")).decode("utf-8")
def main() -> None:
p = argparse.ArgumentParser(description="Write assistant feedback row via Spark DataFrame")
p.add_argument("--table", required=True)
p.add_argument("--feedback-id", required=True)
p.add_argument("--created-at-utc", required=True)
p.add_argument("--outcome", required=True)
p.add_argument("--task-type", required=True)
p.add_argument("--release-name", default="")
p.add_argument("--confidence", type=float, default=0.0)
p.add_argument("--needs-review", default="true")
p.add_argument("--goal-b64", default="")
p.add_argument("--draft-b64", default="")
p.add_argument("--final-b64", default="")
p.add_argument("--sources-b64", default="")
p.add_argument("--notes-b64", default="")
args = p.parse_args()
needs_review = str(args.needs_review).lower() == "true"
goal = d(args.goal_b64)
draft_text = d(args.draft_b64)
final_text = d(args.final_b64)
sources_json = d(args.sources_b64)
notes = d(args.notes_b64)
if not sources_json:
sources_json = "[]"
# Validate JSON shape but keep raw string in table.
try:
json.loads(sources_json)
except Exception:
sources_json = "[]"
spark = SparkSession.builder.appName("write-assistant-feedback").getOrCreate()
spark.sql(
f"""
CREATE TABLE IF NOT EXISTS {args.table} (
feedback_id STRING,
created_at_utc STRING,
outcome STRING,
task_type STRING,
release_name STRING,
confidence DOUBLE,
needs_review BOOLEAN,
goal STRING,
draft_text STRING,
final_text STRING,
sources_json STRING,
notes STRING
) USING iceberg
"""
)
schema = T.StructType(
[
T.StructField("feedback_id", T.StringType(), False),
T.StructField("created_at_utc", T.StringType(), False),
T.StructField("outcome", T.StringType(), False),
T.StructField("task_type", T.StringType(), False),
T.StructField("release_name", T.StringType(), True),
T.StructField("confidence", T.DoubleType(), True),
T.StructField("needs_review", T.BooleanType(), False),
T.StructField("goal", T.StringType(), True),
T.StructField("draft_text", T.StringType(), True),
T.StructField("final_text", T.StringType(), True),
T.StructField("sources_json", T.StringType(), True),
T.StructField("notes", T.StringType(), True),
]
)
row = [
(
args.feedback_id,
args.created_at_utc,
args.outcome,
args.task_type,
args.release_name or "",
float(args.confidence),
needs_review,
goal,
draft_text,
final_text,
sources_json,
notes,
)
]
df = spark.createDataFrame(row, schema=schema)
df.writeTo(args.table).append()
print(f"[DONE] Recorded assistant feedback {args.feedback_id} into {args.table}")
if __name__ == "__main__":
main()