146 lines
2.9 KiB
Bash
Executable file
146 lines
2.9 KiB
Bash
Executable file
#!/usr/bin/env bash
|
|
set -euo pipefail
|
|
|
|
ROOT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")/.." && pwd)"
|
|
# shellcheck source=/dev/null
|
|
source "${ROOT_DIR}/src/app_v2.sh"
|
|
|
|
usage() {
|
|
cat <<USAGE
|
|
usage: $0 [--once]
|
|
|
|
options:
|
|
--once Process a single /v2/graph/changes poll and exit.
|
|
|
|
env:
|
|
CHANGES_EVENT_HANDLER Optional command string; executed per event with EVENT_JSON set.
|
|
CHANGES_LOOP_SLEEP_SEC Sleep between polls (default: 1).
|
|
USAGE
|
|
}
|
|
|
|
require_jq() {
|
|
if ! command -v jq >/dev/null 2>&1; then
|
|
echo "changes_consumer.sh: jq is required" >&2
|
|
exit 2
|
|
fi
|
|
}
|
|
|
|
read_cursor() {
|
|
if [[ -f "${CURSOR_FILE}" ]]; then
|
|
cat "${CURSOR_FILE}"
|
|
fi
|
|
}
|
|
|
|
write_cursor() {
|
|
local next="$1"
|
|
printf '%s' "${next}" > "${CURSOR_FILE}"
|
|
}
|
|
|
|
clear_cursor() {
|
|
rm -f "${CURSOR_FILE}"
|
|
}
|
|
|
|
handle_event() {
|
|
local event_json="$1"
|
|
if [[ -n "${CHANGES_EVENT_HANDLER:-}" ]]; then
|
|
EVENT_JSON="${event_json}" bash -lc "${CHANGES_EVENT_HANDLER}"
|
|
return $?
|
|
fi
|
|
|
|
local kind ref
|
|
kind="$(printf '%s' "${event_json}" | jq -r '.kind // "unknown"')"
|
|
ref="$(printf '%s' "${event_json}" | jq -r '.edge_ref // .node_ref // "n/a"')"
|
|
echo "apply event kind=${kind} ref=${ref}"
|
|
}
|
|
|
|
poll_once() {
|
|
local cursor="$1"
|
|
local path="/v2/graph/changes?limit=${SYNC_LIMIT}&wait_ms=${SYNC_WAIT_MS}"
|
|
if [[ -n "${cursor}" ]]; then
|
|
path+="&since_cursor=${cursor}"
|
|
fi
|
|
|
|
if ! amduat_api_call GET "${path}"; then
|
|
if [[ "${AMDUAT_LAST_STATUS}" == "410" ]]; then
|
|
echo "changes cursor expired; rebootstrap required" >&2
|
|
clear_cursor
|
|
printf '\n'
|
|
return 10
|
|
fi
|
|
echo "changes request failed: HTTP ${AMDUAT_LAST_STATUS}" >&2
|
|
return 1
|
|
fi
|
|
|
|
local ok=1
|
|
local event_json
|
|
while IFS= read -r event_json; do
|
|
if ! handle_event "${event_json}"; then
|
|
ok=0
|
|
break
|
|
fi
|
|
done < <(printf '%s' "${AMDUAT_LAST_BODY}" | jq -c '.events[]?')
|
|
|
|
if [[ "${ok}" != "1" ]]; then
|
|
echo "event handler failed; cursor not advanced" >&2
|
|
return 2
|
|
fi
|
|
|
|
local next_cursor
|
|
next_cursor="$(printf '%s' "${AMDUAT_LAST_BODY}" | jq -r '.next_cursor // empty')"
|
|
if [[ -n "${next_cursor}" ]]; then
|
|
write_cursor "${next_cursor}"
|
|
fi
|
|
|
|
printf '%s\n' "${AMDUAT_LAST_BODY}"
|
|
return 0
|
|
}
|
|
|
|
main() {
|
|
local once=0
|
|
if [[ $# -gt 1 ]]; then
|
|
usage >&2
|
|
return 2
|
|
fi
|
|
if [[ $# -eq 1 ]]; then
|
|
case "$1" in
|
|
--once) once=1 ;;
|
|
-h|--help)
|
|
usage
|
|
return 0
|
|
;;
|
|
*)
|
|
usage >&2
|
|
return 2
|
|
;;
|
|
esac
|
|
fi
|
|
|
|
require_jq
|
|
app_init
|
|
|
|
local loop_sleep="${CHANGES_LOOP_SLEEP_SEC:-1}"
|
|
while true; do
|
|
local cursor
|
|
cursor="$(read_cursor)"
|
|
|
|
poll_once "${cursor}" || {
|
|
status=$?
|
|
if [[ "${status}" != "10" && "${once}" == "1" ]]; then
|
|
return "${status}"
|
|
fi
|
|
if [[ "${once}" == "1" ]]; then
|
|
return 0
|
|
fi
|
|
sleep "${loop_sleep}"
|
|
continue
|
|
}
|
|
|
|
if [[ "${once}" == "1" ]]; then
|
|
return 0
|
|
fi
|
|
sleep "${loop_sleep}"
|
|
done
|
|
}
|
|
|
|
main "$@"
|