jecio/record-run-via-spark-container.sh

93 lines
2.4 KiB
Bash
Raw Normal View History

#!/usr/bin/env bash
set -euo pipefail
# Args:
# 1 run_id
# 2 run_type
# 3 status
# 4 started_at_utc
# 5 finished_at_utc (or empty)
# 6 actor
# 7 input_json_b64
# 8 output_json_b64
# 9 error_text_b64
RUN_ID="${1:-}"
RUN_TYPE="${2:-}"
STATUS="${3:-}"
STARTED_AT_UTC="${4:-}"
FINISHED_AT_UTC="${5:-}"
ACTOR="${6:-}"
INPUT_JSON_B64="${7:-}"
OUTPUT_JSON_B64="${8:-}"
ERROR_TEXT_B64="${9:-}"
if [[ -z "$RUN_ID" || -z "$RUN_TYPE" || -z "$STATUS" || -z "$STARTED_AT_UTC" ]]; then
echo "usage: $0 <run_id> <run_type> <status> <started_at_utc> <finished_at_utc> <actor> <input_json_b64> <output_json_b64> <error_text_b64>" >&2
exit 1
fi
CONTAINER_NAME="${SPARK_CONTAINER_NAME:-spark}"
SPARK_PROPS="${SPARK_PROPS:-/opt/lakehouse/spark-conf/lakehouse-spark-defaults.conf}"
PACKAGES="${SPARK_PACKAGES:-org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.10.1,org.apache.iceberg:iceberg-aws-bundle:1.10.1,org.projectnessie.nessie-integrations:nessie-spark-extensions-3.5_2.12:0.104.5}"
RUNS_TABLE="${RUNS_TABLE:-lake.db1.runs}"
decode_b64() {
local s="$1"
if [[ -z "$s" ]]; then
printf ""
return
fi
printf '%s' "$s" | base64 -d
}
escape_sql() {
sed "s/'/''/g"
}
INPUT_JSON="$(decode_b64 "$INPUT_JSON_B64" | escape_sql)"
OUTPUT_JSON="$(decode_b64 "$OUTPUT_JSON_B64" | escape_sql)"
ERROR_TEXT="$(decode_b64 "$ERROR_TEXT_B64" | escape_sql)"
RUN_ID_ESC="$(printf '%s' "$RUN_ID" | escape_sql)"
RUN_TYPE_ESC="$(printf '%s' "$RUN_TYPE" | escape_sql)"
STATUS_ESC="$(printf '%s' "$STATUS" | escape_sql)"
STARTED_ESC="$(printf '%s' "$STARTED_AT_UTC" | escape_sql)"
FINISHED_ESC="$(printf '%s' "$FINISHED_AT_UTC" | escape_sql)"
ACTOR_ESC="$(printf '%s' "$ACTOR" | escape_sql)"
SQL="
CREATE TABLE IF NOT EXISTS ${RUNS_TABLE} (
run_id STRING,
run_type STRING,
status STRING,
started_at_utc STRING,
finished_at_utc STRING,
actor STRING,
input_json STRING,
output_json STRING,
error_text STRING,
ingested_at_utc STRING
) USING iceberg;
INSERT INTO ${RUNS_TABLE} VALUES (
'${RUN_ID_ESC}',
'${RUN_TYPE_ESC}',
'${STATUS_ESC}',
'${STARTED_ESC}',
'${FINISHED_ESC}',
'${ACTOR_ESC}',
'${INPUT_JSON}',
'${OUTPUT_JSON}',
'${ERROR_TEXT}',
'${STARTED_ESC}'
);
"
docker exec \
-e AWS_REGION="${AWS_REGION:-us-east-1}" \
-e AWS_DEFAULT_REGION="${AWS_DEFAULT_REGION:-us-east-1}" \
"$CONTAINER_NAME" \
/opt/spark/bin/spark-sql \
--properties-file "$SPARK_PROPS" \
--packages "$PACKAGES" \
-e "$SQL"