jecio/record-run-event-via-spark-container.sh

68 lines
1.7 KiB
Bash
Raw Permalink Normal View History

#!/usr/bin/env bash
set -euo pipefail
# Args:
# 1 run_id
# 2 event_type
# 3 event_at_utc
# 4 detail_json_b64
RUN_ID="${1:-}"
EVENT_TYPE="${2:-}"
EVENT_AT_UTC="${3:-}"
DETAIL_JSON_B64="${4:-}"
if [[ -z "$RUN_ID" || -z "$EVENT_TYPE" || -z "$EVENT_AT_UTC" ]]; then
echo "usage: $0 <run_id> <event_type> <event_at_utc> <detail_json_b64>" >&2
exit 1
fi
CONTAINER_NAME="${SPARK_CONTAINER_NAME:-spark}"
SPARK_PROPS="${SPARK_PROPS:-/opt/lakehouse/spark-conf/lakehouse-spark-defaults.conf}"
PACKAGES="${SPARK_PACKAGES:-org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.10.1,org.apache.iceberg:iceberg-aws-bundle:1.10.1,org.projectnessie.nessie-integrations:nessie-spark-extensions-3.5_2.12:0.104.5}"
RUN_EVENTS_TABLE="${RUN_EVENTS_TABLE:-lake.db1.run_events}"
decode_b64() {
local s="$1"
if [[ -z "$s" ]]; then
printf ""
return
fi
printf '%s' "$s" | base64 -d
}
escape_sql() {
sed "s/'/''/g"
}
DETAIL_JSON="$(decode_b64 "$DETAIL_JSON_B64" | escape_sql)"
RUN_ID_ESC="$(printf '%s' "$RUN_ID" | escape_sql)"
EVENT_TYPE_ESC="$(printf '%s' "$EVENT_TYPE" | escape_sql)"
EVENT_AT_ESC="$(printf '%s' "$EVENT_AT_UTC" | escape_sql)"
SQL="
CREATE TABLE IF NOT EXISTS ${RUN_EVENTS_TABLE} (
run_id STRING,
event_type STRING,
event_at_utc STRING,
detail_json STRING,
ingested_at_utc STRING
) USING iceberg;
INSERT INTO ${RUN_EVENTS_TABLE} VALUES (
'${RUN_ID_ESC}',
'${EVENT_TYPE_ESC}',
'${EVENT_AT_ESC}',
'${DETAIL_JSON}',
'${EVENT_AT_ESC}'
);
"
docker exec \
-e AWS_REGION="${AWS_REGION:-us-east-1}" \
-e AWS_DEFAULT_REGION="${AWS_DEFAULT_REGION:-us-east-1}" \
"$CONTAINER_NAME" \
/opt/spark/bin/spark-sql \
--properties-file "$SPARK_PROPS" \
--packages "$PACKAGES" \
-e "$SQL"