107 lines
3.4 KiB
Python
107 lines
3.4 KiB
Python
|
|
import argparse
|
||
|
|
import json
|
||
|
|
import base64
|
||
|
|
|
||
|
|
from pyspark.sql import SparkSession, types as T
|
||
|
|
|
||
|
|
|
||
|
|
def d(s: str) -> str:
|
||
|
|
if not s:
|
||
|
|
return ""
|
||
|
|
return base64.b64decode(s.encode("ascii")).decode("utf-8")
|
||
|
|
|
||
|
|
|
||
|
|
def main() -> None:
|
||
|
|
p = argparse.ArgumentParser(description="Write assistant action row via Spark DataFrame")
|
||
|
|
p.add_argument("--table", required=True)
|
||
|
|
p.add_argument("--action-id", required=True)
|
||
|
|
p.add_argument("--created-at-utc", required=True)
|
||
|
|
p.add_argument("--task-type", required=True)
|
||
|
|
p.add_argument("--release-name", default="")
|
||
|
|
p.add_argument("--objective-b64", default="")
|
||
|
|
p.add_argument("--step-id", required=True)
|
||
|
|
p.add_argument("--step-title-b64", default="")
|
||
|
|
p.add_argument("--action-type", required=True)
|
||
|
|
p.add_argument("--requires-approval", default="false")
|
||
|
|
p.add_argument("--approved", default="false")
|
||
|
|
p.add_argument("--status", required=True)
|
||
|
|
p.add_argument("--output-b64", default="")
|
||
|
|
p.add_argument("--error-b64", default="")
|
||
|
|
args = p.parse_args()
|
||
|
|
|
||
|
|
requires_approval = str(args.requires_approval).lower() == "true"
|
||
|
|
approved = str(args.approved).lower() == "true"
|
||
|
|
objective = d(args.objective_b64)
|
||
|
|
step_title = d(args.step_title_b64)
|
||
|
|
output_json = d(args.output_b64)
|
||
|
|
error_text = d(args.error_b64)
|
||
|
|
if not output_json:
|
||
|
|
output_json = "{}"
|
||
|
|
try:
|
||
|
|
json.loads(output_json)
|
||
|
|
except Exception:
|
||
|
|
output_json = "{}"
|
||
|
|
|
||
|
|
spark = SparkSession.builder.appName("write-assistant-action").getOrCreate()
|
||
|
|
spark.sql(
|
||
|
|
f"""
|
||
|
|
CREATE TABLE IF NOT EXISTS {args.table} (
|
||
|
|
action_id STRING,
|
||
|
|
created_at_utc STRING,
|
||
|
|
task_type STRING,
|
||
|
|
release_name STRING,
|
||
|
|
objective STRING,
|
||
|
|
step_id STRING,
|
||
|
|
step_title STRING,
|
||
|
|
action_type STRING,
|
||
|
|
requires_approval BOOLEAN,
|
||
|
|
approved BOOLEAN,
|
||
|
|
status STRING,
|
||
|
|
output_json STRING,
|
||
|
|
error_text STRING
|
||
|
|
) USING iceberg
|
||
|
|
"""
|
||
|
|
)
|
||
|
|
|
||
|
|
schema = T.StructType(
|
||
|
|
[
|
||
|
|
T.StructField("action_id", T.StringType(), False),
|
||
|
|
T.StructField("created_at_utc", T.StringType(), False),
|
||
|
|
T.StructField("task_type", T.StringType(), False),
|
||
|
|
T.StructField("release_name", T.StringType(), True),
|
||
|
|
T.StructField("objective", T.StringType(), True),
|
||
|
|
T.StructField("step_id", T.StringType(), False),
|
||
|
|
T.StructField("step_title", T.StringType(), True),
|
||
|
|
T.StructField("action_type", T.StringType(), False),
|
||
|
|
T.StructField("requires_approval", T.BooleanType(), False),
|
||
|
|
T.StructField("approved", T.BooleanType(), False),
|
||
|
|
T.StructField("status", T.StringType(), False),
|
||
|
|
T.StructField("output_json", T.StringType(), True),
|
||
|
|
T.StructField("error_text", T.StringType(), True),
|
||
|
|
]
|
||
|
|
)
|
||
|
|
row = [
|
||
|
|
(
|
||
|
|
args.action_id,
|
||
|
|
args.created_at_utc,
|
||
|
|
args.task_type,
|
||
|
|
args.release_name or "",
|
||
|
|
objective,
|
||
|
|
args.step_id,
|
||
|
|
step_title,
|
||
|
|
args.action_type,
|
||
|
|
requires_approval,
|
||
|
|
approved,
|
||
|
|
args.status,
|
||
|
|
output_json,
|
||
|
|
error_text,
|
||
|
|
)
|
||
|
|
]
|
||
|
|
df = spark.createDataFrame(row, schema=schema)
|
||
|
|
df.writeTo(args.table).append()
|
||
|
|
print(f"[DONE] Recorded assistant action {args.action_id} into {args.table}")
|
||
|
|
|
||
|
|
|
||
|
|
if __name__ == "__main__":
|
||
|
|
main()
|