amduat-api/scripts/changes_consumer.sh

146 lines
2.9 KiB
Bash
Raw Normal View History

#!/usr/bin/env bash
set -euo pipefail
ROOT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")/.." && pwd)"
# shellcheck source=/dev/null
source "${ROOT_DIR}/src/app_v2.sh"
usage() {
cat <<USAGE
usage: $0 [--once]
options:
--once Process a single /v2/graph/changes poll and exit.
env:
CHANGES_EVENT_HANDLER Optional command string; executed per event with EVENT_JSON set.
CHANGES_LOOP_SLEEP_SEC Sleep between polls (default: 1).
USAGE
}
require_jq() {
if ! command -v jq >/dev/null 2>&1; then
echo "changes_consumer.sh: jq is required" >&2
exit 2
fi
}
read_cursor() {
if [[ -f "${CURSOR_FILE}" ]]; then
cat "${CURSOR_FILE}"
fi
}
write_cursor() {
local next="$1"
printf '%s' "${next}" > "${CURSOR_FILE}"
}
clear_cursor() {
rm -f "${CURSOR_FILE}"
}
handle_event() {
local event_json="$1"
if [[ -n "${CHANGES_EVENT_HANDLER:-}" ]]; then
EVENT_JSON="${event_json}" bash -lc "${CHANGES_EVENT_HANDLER}"
return $?
fi
local kind ref
kind="$(printf '%s' "${event_json}" | jq -r '.kind // "unknown"')"
ref="$(printf '%s' "${event_json}" | jq -r '.edge_ref // .node_ref // "n/a"')"
echo "apply event kind=${kind} ref=${ref}"
}
poll_once() {
local cursor="$1"
local path="/v2/graph/changes?limit=${SYNC_LIMIT}&wait_ms=${SYNC_WAIT_MS}"
if [[ -n "${cursor}" ]]; then
path+="&since_cursor=${cursor}"
fi
if ! amduat_api_call GET "${path}"; then
if [[ "${AMDUAT_LAST_STATUS}" == "410" ]]; then
echo "changes cursor expired; rebootstrap required" >&2
clear_cursor
printf '\n'
return 10
fi
echo "changes request failed: HTTP ${AMDUAT_LAST_STATUS}" >&2
return 1
fi
local ok=1
local event_json
while IFS= read -r event_json; do
if ! handle_event "${event_json}"; then
ok=0
break
fi
done < <(printf '%s' "${AMDUAT_LAST_BODY}" | jq -c '.events[]?')
if [[ "${ok}" != "1" ]]; then
echo "event handler failed; cursor not advanced" >&2
return 2
fi
local next_cursor
next_cursor="$(printf '%s' "${AMDUAT_LAST_BODY}" | jq -r '.next_cursor // empty')"
if [[ -n "${next_cursor}" ]]; then
write_cursor "${next_cursor}"
fi
printf '%s\n' "${AMDUAT_LAST_BODY}"
return 0
}
main() {
local once=0
if [[ $# -gt 1 ]]; then
usage >&2
return 2
fi
if [[ $# -eq 1 ]]; then
case "$1" in
--once) once=1 ;;
-h|--help)
usage
return 0
;;
*)
usage >&2
return 2
;;
esac
fi
require_jq
app_init
local loop_sleep="${CHANGES_LOOP_SLEEP_SEC:-1}"
while true; do
local cursor
cursor="$(read_cursor)"
poll_once "${cursor}" || {
status=$?
if [[ "${status}" != "10" && "${once}" == "1" ]]; then
return "${status}"
fi
if [[ "${once}" == "1" ]]; then
return 0
fi
sleep "${loop_sleep}"
continue
}
if [[ "${once}" == "1" ]]; then
return 0
fi
sleep "${loop_sleep}"
done
}
main "$@"