jecio/ingest-message-via-spark-container.sh

57 lines
1.9 KiB
Bash
Raw Normal View History

#!/usr/bin/env bash
set -euo pipefail
TABLE="${1:-lake.db1.messages}"
THREAD_ID="${2:-}"
MESSAGE_ID="${3:-}"
SENDER="${4:-}"
CHANNEL="${5:-}"
SENT_AT="${6:-}"
BODY_B64="${7:-}"
METADATA_B64="${8:-}"
if [[ -z "$THREAD_ID" || -z "$MESSAGE_ID" || -z "$SENDER" || -z "$CHANNEL" || -z "$BODY_B64" ]]; then
echo "Usage: $0 <table> <thread_id> <message_id> <sender> <channel> <sent_at_or_empty> <body_b64> <metadata_json_b64>" >&2
exit 1
fi
CONTAINER_NAME="${SPARK_CONTAINER_NAME:-spark}"
SPARK_PROPS="${SPARK_PROPS:-/opt/lakehouse/spark-conf/lakehouse-spark-defaults.conf}"
PACKAGES="${SPARK_PACKAGES:-org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.10.1,org.apache.iceberg:iceberg-aws-bundle:1.10.1,org.projectnessie.nessie-integrations:nessie-spark-extensions-3.5_2.12:0.104.5}"
BODY="$(printf '%s' "$BODY_B64" | base64 -d)"
METADATA_JSON="{}"
if [[ -n "$METADATA_B64" ]]; then
METADATA_JSON="$(printf '%s' "$METADATA_B64" | base64 -d)"
fi
sql_escape() {
printf "%s" "$1" | sed "s/'/''/g"
}
THREAD_ID_ESC="$(sql_escape "$THREAD_ID")"
MESSAGE_ID_ESC="$(sql_escape "$MESSAGE_ID")"
SENDER_ESC="$(sql_escape "$SENDER")"
CHANNEL_ESC="$(sql_escape "$CHANNEL")"
BODY_ESC="$(sql_escape "$BODY")"
METADATA_ESC="$(sql_escape "$METADATA_JSON")"
if [[ -n "$SENT_AT" ]]; then
SENT_AT_EXPR="TIMESTAMP '$(sql_escape "$SENT_AT")'"
else
SENT_AT_EXPR="current_timestamp()"
fi
SQL="INSERT INTO ${TABLE} (thread_id, message_id, sender, channel, sent_at, body, metadata_json) VALUES ('${THREAD_ID_ESC}', '${MESSAGE_ID_ESC}', '${SENDER_ESC}', '${CHANNEL_ESC}', ${SENT_AT_EXPR}, '${BODY_ESC}', '${METADATA_ESC}')"
docker exec \
-e AWS_REGION="${AWS_REGION:-us-east-1}" \
-e AWS_DEFAULT_REGION="${AWS_DEFAULT_REGION:-us-east-1}" \
"$CONTAINER_NAME" \
/opt/spark/bin/spark-sql \
--properties-file "$SPARK_PROPS" \
--packages "$PACKAGES" \
-e "$SQL"
echo "[DONE] Inserted message_id=${MESSAGE_ID} thread_id=${THREAD_ID} into ${TABLE}"