jecio/query_assistant_actions.py

46 lines
1.4 KiB
Python
Raw Permalink Normal View History

import argparse
import json
import os
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
def main() -> None:
p = argparse.ArgumentParser(description="Query assistant actions")
p.add_argument("--table", default=os.getenv("ACTION_TABLE", "lake.db1.assistant_actions"))
p.add_argument("--status", default="")
p.add_argument("--task-type", default="")
p.add_argument("--release-name", default="")
p.add_argument("--step-id", default="")
p.add_argument("--action-type", default="")
p.add_argument("--limit", type=int, default=50)
args = p.parse_args()
spark = SparkSession.builder.appName("query-assistant-actions").getOrCreate()
df = spark.table(args.table)
if args.status:
df = df.where(F.col("status") == args.status)
if args.task_type:
df = df.where(F.col("task_type") == args.task_type)
if args.release_name:
df = df.where(F.col("release_name") == args.release_name)
if args.step_id:
df = df.where(F.col("step_id") == args.step_id)
if args.action_type:
df = df.where(F.col("action_type") == args.action_type)
rows = (
df.orderBy(F.col("created_at_utc").desc_nulls_last())
.limit(max(1, min(args.limit, 500)))
.collect()
)
out = [r.asDict(recursive=True) for r in rows]
print(json.dumps(out, ensure_ascii=False))
if __name__ == "__main__":
main()