57 lines
1.9 KiB
Bash
Executable file
57 lines
1.9 KiB
Bash
Executable file
#!/usr/bin/env bash
|
|
set -euo pipefail
|
|
|
|
TABLE="${1:-lake.db1.messages}"
|
|
THREAD_ID="${2:-}"
|
|
MESSAGE_ID="${3:-}"
|
|
SENDER="${4:-}"
|
|
CHANNEL="${5:-}"
|
|
SENT_AT="${6:-}"
|
|
BODY_B64="${7:-}"
|
|
METADATA_B64="${8:-}"
|
|
|
|
if [[ -z "$THREAD_ID" || -z "$MESSAGE_ID" || -z "$SENDER" || -z "$CHANNEL" || -z "$BODY_B64" ]]; then
|
|
echo "Usage: $0 <table> <thread_id> <message_id> <sender> <channel> <sent_at_or_empty> <body_b64> <metadata_json_b64>" >&2
|
|
exit 1
|
|
fi
|
|
|
|
CONTAINER_NAME="${SPARK_CONTAINER_NAME:-spark}"
|
|
SPARK_PROPS="${SPARK_PROPS:-/opt/lakehouse/spark-conf/lakehouse-spark-defaults.conf}"
|
|
PACKAGES="${SPARK_PACKAGES:-org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.10.1,org.apache.iceberg:iceberg-aws-bundle:1.10.1,org.projectnessie.nessie-integrations:nessie-spark-extensions-3.5_2.12:0.104.5}"
|
|
|
|
BODY="$(printf '%s' "$BODY_B64" | base64 -d)"
|
|
METADATA_JSON="{}"
|
|
if [[ -n "$METADATA_B64" ]]; then
|
|
METADATA_JSON="$(printf '%s' "$METADATA_B64" | base64 -d)"
|
|
fi
|
|
|
|
sql_escape() {
|
|
printf "%s" "$1" | sed "s/'/''/g"
|
|
}
|
|
|
|
THREAD_ID_ESC="$(sql_escape "$THREAD_ID")"
|
|
MESSAGE_ID_ESC="$(sql_escape "$MESSAGE_ID")"
|
|
SENDER_ESC="$(sql_escape "$SENDER")"
|
|
CHANNEL_ESC="$(sql_escape "$CHANNEL")"
|
|
BODY_ESC="$(sql_escape "$BODY")"
|
|
METADATA_ESC="$(sql_escape "$METADATA_JSON")"
|
|
|
|
if [[ -n "$SENT_AT" ]]; then
|
|
SENT_AT_EXPR="TIMESTAMP '$(sql_escape "$SENT_AT")'"
|
|
else
|
|
SENT_AT_EXPR="current_timestamp()"
|
|
fi
|
|
|
|
SQL="INSERT INTO ${TABLE} (thread_id, message_id, sender, channel, sent_at, body, metadata_json) VALUES ('${THREAD_ID_ESC}', '${MESSAGE_ID_ESC}', '${SENDER_ESC}', '${CHANNEL_ESC}', ${SENT_AT_EXPR}, '${BODY_ESC}', '${METADATA_ESC}')"
|
|
|
|
docker exec \
|
|
-e AWS_REGION="${AWS_REGION:-us-east-1}" \
|
|
-e AWS_DEFAULT_REGION="${AWS_DEFAULT_REGION:-us-east-1}" \
|
|
"$CONTAINER_NAME" \
|
|
/opt/spark/bin/spark-sql \
|
|
--properties-file "$SPARK_PROPS" \
|
|
--packages "$PACKAGES" \
|
|
-e "$SQL"
|
|
|
|
echo "[DONE] Inserted message_id=${MESSAGE_ID} thread_id=${THREAD_ID} into ${TABLE}"
|