From d3bb7a0783cba6bee4032d5f8ffb0e25e115f5d3 Mon Sep 17 00:00:00 2001 From: Carl Niklas Rydberg Date: Sat, 7 Feb 2026 21:20:16 +0100 Subject: [PATCH] Canonicalize v2 changes consumer flow and tests --- README.md | 13 ++- docs/v2-app-developer-guide.md | 43 ++++++++- scripts/changes_consumer.sh | 145 ++++++++++++++++++++++++++++++ scripts/sync_loop.sh | 38 +------- scripts/v2_app.sh | 9 +- src/app_v2.sh | 8 +- tests/changes_consumer_410.sh | 43 +++++++++ tests/changes_consumer_handler.sh | 67 ++++++++++++++ tests/integration_v2.sh | 4 + 9 files changed, 322 insertions(+), 48 deletions(-) create mode 100755 scripts/changes_consumer.sh create mode 100755 tests/changes_consumer_410.sh create mode 100755 tests/changes_consumer_handler.sh diff --git a/README.md b/README.md index 81c110f..e91e010 100644 --- a/README.md +++ b/README.md @@ -38,10 +38,10 @@ unhealthy, it automatically falls back to `fs` (configurable via ./scripts/ingest_example.sh ``` -5. Run sample changes sync loop: +5. Run sample changes consumer (recommended): ```sh -./scripts/sync_loop.sh +./scripts/v2_app.sh consume-changes ``` ## v2 Vertical Slice CLI @@ -52,6 +52,7 @@ Use the integrated v2 app flow wrapper: ./scripts/v2_app.sh startup-check ./scripts/v2_app.sh ingest '{"idempotency_key":"k1","mode":"continue_on_error","nodes":[{"name":"doc-1"}]}' ./scripts/v2_app.sh sync-once +./scripts/v2_app.sh consume-changes --once ./scripts/v2_app.sh retrieve 'doc-1' 'ms.within_domain' ./scripts/v2_app.sh tombstone '' ``` @@ -62,6 +63,13 @@ Run integration coverage (requires running `amduatd` + `jq`): ./tests/integration_v2.sh ``` +Run local cursor/handler semantics checks (no daemon required): + +```sh +./tests/changes_consumer_410.sh +./tests/changes_consumer_handler.sh +``` + Run a fast end-to-end smoke (startup + ingest + sync + retrieve + tombstone): ```sh @@ -72,4 +80,5 @@ Run a fast end-to-end smoke (startup + ingest + sync + retrieve + tombstone): - This scaffold assumes local Unix-socket access to `amduatd`. - Graph cursors are opaque and must be persisted exactly as returned. +- `./scripts/sync_loop.sh` and `sync-loop` CLI are deprecated compatibility aliases. - Keep `contracts/amduatd-api-contract.v2.json` in sync with upstream when you pull updates. diff --git a/docs/v2-app-developer-guide.md b/docs/v2-app-developer-guide.md index 40afb12..89dc9a6 100644 --- a/docs/v2-app-developer-guide.md +++ b/docs/v2-app-developer-guide.md @@ -150,6 +150,13 @@ For integration tests/examples: - `scripts/test_graph_queries.sh` - `scripts/test_graph_contract.sh` +- `scripts/changes_consumer.sh` (durable changes loop with event handler hook) +- `tests/changes_consumer_410.sh` (forced cursor-expiry path) +- `tests/changes_consumer_handler.sh` (cursor advances only on handler success) + +Canonical CLI entrypoint for ongoing sync: + +- `./scripts/v2_app.sh consume-changes` ## 10) Copy/Paste Integration Skeleton @@ -196,14 +203,42 @@ while true; do path="/v2/graph/changes?limit=200&wait_ms=15000" fi - resp="$(curl --unix-socket "${SOCK}" -sS "${BASE}${path}" -H "X-Amduat-Space: ${SPACE}")" || break + # Capture response body and status for explicit 410 handling. + raw="$(curl --unix-socket "${SOCK}" -sS -w '\n%{http_code}' "${BASE}${path}" -H "X-Amduat-Space: ${SPACE}")" || break + code="$(printf '%s\n' "${raw}" | tail -n1)" + resp="$(printf '%s\n' "${raw}" | sed '$d')" - # TODO: parse and process resp.events[] in your app. - next="$(printf '%s\n' "${resp}" | sed -n 's/.*"next_cursor":"\([^"]*\)".*/\1/p')" - [ -n "${next}" ] && cursor="${next}" + if [ "${code}" = "410" ]; then + echo "changes cursor expired; rebootstrap required" >&2 + cursor="" # Option: switch to a since_as_of bootstrap strategy here. + sleep 1 + continue + fi + [ "${code}" = "200" ] || { echo "changes failed: HTTP ${code}" >&2; sleep 1; continue; } + + # Process each event. Replace this with app-specific handler logic. + ok=1 + while IFS= read -r ev; do + kind="$(printf '%s' "${ev}" | jq -r '.kind // "unknown"')" + ref="$(printf '%s' "${ev}" | jq -r '.edge_ref // .node_ref // "n/a"')" + echo "apply event kind=${kind} ref=${ref}" + # handle_event "${ev}" || { ok=0; break; } + done < <(printf '%s' "${resp}" | jq -c '.events[]?') + + # Advance cursor only after all events in this batch are handled successfully. + if [ "${ok}" = "1" ]; then + next="$(printf '%s' "${resp}" | jq -r '.next_cursor // empty')" + [ -n "${next}" ] && cursor="${next}" + fi done ``` +Failure semantics for the loop above: + +- Keep `cursor` in durable storage (file/db) and load it at process startup. +- Update stored cursor only after successful event processing for that response. +- On `410 Gone`, your stored cursor is outside replay retention; reset and rebootstrap with `since_as_of` or a full sync. + Agent retrieval call: ```sh diff --git a/scripts/changes_consumer.sh b/scripts/changes_consumer.sh new file mode 100755 index 0000000..2361d88 --- /dev/null +++ b/scripts/changes_consumer.sh @@ -0,0 +1,145 @@ +#!/usr/bin/env bash +set -euo pipefail + +ROOT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")/.." && pwd)" +# shellcheck source=/dev/null +source "${ROOT_DIR}/src/app_v2.sh" + +usage() { + cat </dev/null 2>&1; then + echo "changes_consumer.sh: jq is required" >&2 + exit 2 + fi +} + +read_cursor() { + if [[ -f "${CURSOR_FILE}" ]]; then + cat "${CURSOR_FILE}" + fi +} + +write_cursor() { + local next="$1" + printf '%s' "${next}" > "${CURSOR_FILE}" +} + +clear_cursor() { + rm -f "${CURSOR_FILE}" +} + +handle_event() { + local event_json="$1" + if [[ -n "${CHANGES_EVENT_HANDLER:-}" ]]; then + EVENT_JSON="${event_json}" bash -lc "${CHANGES_EVENT_HANDLER}" + return $? + fi + + local kind ref + kind="$(printf '%s' "${event_json}" | jq -r '.kind // "unknown"')" + ref="$(printf '%s' "${event_json}" | jq -r '.edge_ref // .node_ref // "n/a"')" + echo "apply event kind=${kind} ref=${ref}" +} + +poll_once() { + local cursor="$1" + local path="/v2/graph/changes?limit=${SYNC_LIMIT}&wait_ms=${SYNC_WAIT_MS}" + if [[ -n "${cursor}" ]]; then + path+="&since_cursor=${cursor}" + fi + + if ! amduat_api_call GET "${path}"; then + if [[ "${AMDUAT_LAST_STATUS}" == "410" ]]; then + echo "changes cursor expired; rebootstrap required" >&2 + clear_cursor + printf '\n' + return 10 + fi + echo "changes request failed: HTTP ${AMDUAT_LAST_STATUS}" >&2 + return 1 + fi + + local ok=1 + local event_json + while IFS= read -r event_json; do + if ! handle_event "${event_json}"; then + ok=0 + break + fi + done < <(printf '%s' "${AMDUAT_LAST_BODY}" | jq -c '.events[]?') + + if [[ "${ok}" != "1" ]]; then + echo "event handler failed; cursor not advanced" >&2 + return 2 + fi + + local next_cursor + next_cursor="$(printf '%s' "${AMDUAT_LAST_BODY}" | jq -r '.next_cursor // empty')" + if [[ -n "${next_cursor}" ]]; then + write_cursor "${next_cursor}" + fi + + printf '%s\n' "${AMDUAT_LAST_BODY}" + return 0 +} + +main() { + local once=0 + if [[ $# -gt 1 ]]; then + usage >&2 + return 2 + fi + if [[ $# -eq 1 ]]; then + case "$1" in + --once) once=1 ;; + -h|--help) + usage + return 0 + ;; + *) + usage >&2 + return 2 + ;; + esac + fi + + require_jq + app_init + + local loop_sleep="${CHANGES_LOOP_SLEEP_SEC:-1}" + while true; do + local cursor + cursor="$(read_cursor)" + + poll_once "${cursor}" || { + status=$? + if [[ "${status}" != "10" && "${once}" == "1" ]]; then + return "${status}" + fi + if [[ "${once}" == "1" ]]; then + return 0 + fi + sleep "${loop_sleep}" + continue + } + + if [[ "${once}" == "1" ]]; then + return 0 + fi + sleep "${loop_sleep}" + done +} + +main "$@" diff --git a/scripts/sync_loop.sh b/scripts/sync_loop.sh index dc6cb62..63e1737 100755 --- a/scripts/sync_loop.sh +++ b/scripts/sync_loop.sh @@ -2,39 +2,5 @@ set -euo pipefail ROOT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")/.." && pwd)" -ENV_FILE="${ROOT_DIR}/config/env.local" -if [[ ! -f "${ENV_FILE}" ]]; then - ENV_FILE="${ROOT_DIR}/config/env.example" -fi -# shellcheck source=/dev/null -source "${ENV_FILE}" -# shellcheck source=/dev/null -source "${ROOT_DIR}/src/client.sh" - -limit="${SYNC_LIMIT:-200}" -wait_ms="${SYNC_WAIT_MS:-15000}" -cursor_file="${ROOT_DIR}/.cursor" -cursor="" - -if [[ -f "${cursor_file}" ]]; then - cursor="$(cat "${cursor_file}")" -fi - -while true; do - if [[ -n "${cursor}" ]]; then - path="/v2/graph/changes?since_cursor=${cursor}&limit=${limit}&wait_ms=${wait_ms}" - else - path="/v2/graph/changes?limit=${limit}&wait_ms=${wait_ms}" - fi - - resp="$(api_get "${path}")" - echo "${resp}" - - next="$(printf '%s\n' "${resp}" | sed -n 's/.*"next_cursor":"\([^"]*\)".*/\1/p')" - if [[ -n "${next}" ]]; then - cursor="${next}" - printf '%s' "${cursor}" > "${cursor_file}" - fi - - sleep 1 - done +echo "sync_loop.sh is deprecated; forwarding to scripts/changes_consumer.sh" >&2 +exec "${ROOT_DIR}/scripts/changes_consumer.sh" "$@" diff --git a/scripts/v2_app.sh b/scripts/v2_app.sh index 28cf138..a3f2bc9 100755 --- a/scripts/v2_app.sh +++ b/scripts/v2_app.sh @@ -13,7 +13,8 @@ commands: startup-check ingest PAYLOAD_JSON sync-once - sync-loop + consume-changes [--once] + sync-loop (deprecated alias for consume-changes) retrieve ROOTS_CSV [GOAL_PREDICATES_CSV] tombstone EDGE_REF USAGE @@ -43,8 +44,12 @@ case "${cmd}" in sync-once) app_sync_once ;; + consume-changes) + "${ROOT_DIR}/scripts/changes_consumer.sh" "$@" + ;; sync-loop) - app_sync_loop + echo "sync-loop is deprecated; use consume-changes" >&2 + "${ROOT_DIR}/scripts/changes_consumer.sh" "$@" ;; retrieve) if [[ $# -lt 1 || $# -gt 2 ]]; then diff --git a/src/app_v2.sh b/src/app_v2.sh index 165243a..e35fc41 100755 --- a/src/app_v2.sh +++ b/src/app_v2.sh @@ -53,10 +53,10 @@ app_sync_once() { } app_sync_loop() { - while true; do - app_sync_once - sleep 1 - done + local root_dir + root_dir="$(cd "$(dirname "${BASH_SOURCE[0]}")/.." && pwd)" + echo "app_sync_loop is deprecated; use scripts/changes_consumer.sh" >&2 + "${root_dir}/scripts/changes_consumer.sh" } app_retrieve_with_fallback() { diff --git a/tests/changes_consumer_410.sh b/tests/changes_consumer_410.sh new file mode 100755 index 0000000..a7b71e0 --- /dev/null +++ b/tests/changes_consumer_410.sh @@ -0,0 +1,43 @@ +#!/usr/bin/env bash +set -euo pipefail + +ROOT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")/.." && pwd)" + +fail() { + echo "changes_consumer_410.sh: FAIL: $1" >&2 + exit 1 +} + +tmp_dir="$(mktemp -d /tmp/changes-consumer-410.XXXXXX)" +cleanup() { + rm -rf "${tmp_dir}" +} +trap cleanup EXIT + +cursor_file="${tmp_dir}/cursor" +printf '%s' "g1_expired_cursor" > "${cursor_file}" + +mkdir -p "${tmp_dir}/bin" +cat > "${tmp_dir}/bin/curl" <<'MOCK' +#!/usr/bin/env bash +set -euo pipefail +printf '%s\n%s' '{"error":"cursor expired"}' '410' +MOCK +chmod +x "${tmp_dir}/bin/curl" + +out_file="${tmp_dir}/out.log" +err_file="${tmp_dir}/err.log" + +PATH="${tmp_dir}/bin:${PATH}" \ +CURSOR_FILE="${cursor_file}" \ +SYNC_LIMIT=10 \ +SYNC_WAIT_MS=1 \ +SOCK="${tmp_dir}/fake.sock" \ +BASE="http://localhost" \ +SPACE="app1" \ +"${ROOT_DIR}/scripts/changes_consumer.sh" --once >"${out_file}" 2>"${err_file}" || fail "consumer returned non-zero for forced 410 path" + +[[ ! -f "${cursor_file}" ]] || fail "cursor file should be cleared on 410" +grep -q "cursor expired" "${err_file}" || fail "expected expired cursor message in stderr" + +echo "changes_consumer_410.sh: PASS" diff --git a/tests/changes_consumer_handler.sh b/tests/changes_consumer_handler.sh new file mode 100755 index 0000000..b5399b1 --- /dev/null +++ b/tests/changes_consumer_handler.sh @@ -0,0 +1,67 @@ +#!/usr/bin/env bash +set -euo pipefail + +ROOT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")/.." && pwd)" + +fail() { + echo "changes_consumer_handler.sh: FAIL: $1" >&2 + exit 1 +} + +tmp_dir="$(mktemp -d /tmp/changes-consumer-handler.XXXXXX)" +cleanup() { + rm -rf "${tmp_dir}" +} +trap cleanup EXIT + +mkdir -p "${tmp_dir}/bin" +cat > "${tmp_dir}/bin/curl" <<'MOCK' +#!/usr/bin/env bash +set -euo pipefail +printf '%s\n%s' '{"events":[{"kind":"edge_upsert","edge_ref":"e1"}],"next_cursor":"g1_next"}' '200' +MOCK +chmod +x "${tmp_dir}/bin/curl" + +out_file="${tmp_dir}/out.log" +err_file="${tmp_dir}/err.log" + +# Case 1: handler succeeds -> cursor advances. +cursor_success="${tmp_dir}/cursor.success" +printf '%s' "g1_start" > "${cursor_success}" + +PATH="${tmp_dir}/bin:${PATH}" \ +CURSOR_FILE="${cursor_success}" \ +SYNC_LIMIT=10 \ +SYNC_WAIT_MS=1 \ +SOCK="${tmp_dir}/fake.sock" \ +BASE="http://localhost" \ +SPACE="app1" \ +CHANGES_EVENT_HANDLER='printf "%s\n" "${EVENT_JSON}" >/dev/null' \ +"${ROOT_DIR}/scripts/changes_consumer.sh" --once >"${out_file}" 2>"${err_file}" || fail "consumer failed on handler success case" + +[[ -f "${cursor_success}" ]] || fail "cursor file missing after handler success case" +[[ "$(cat "${cursor_success}")" == "g1_next" ]] || fail "cursor did not advance after handler success" + +# Case 2: handler fails -> cursor does not advance. +cursor_fail="${tmp_dir}/cursor.fail" +printf '%s' "g1_start" > "${cursor_fail}" + +set +e +PATH="${tmp_dir}/bin:${PATH}" \ +CURSOR_FILE="${cursor_fail}" \ +SYNC_LIMIT=10 \ +SYNC_WAIT_MS=1 \ +SOCK="${tmp_dir}/fake.sock" \ +BASE="http://localhost" \ +SPACE="app1" \ +CHANGES_EVENT_HANDLER='exit 1' \ +"${ROOT_DIR}/scripts/changes_consumer.sh" --once >"${out_file}" 2>"${err_file}" +rc=$? +set -e + +[[ "${rc}" -ne 0 ]] || fail "expected non-zero exit when handler fails" +[[ -f "${cursor_fail}" ]] || fail "cursor file missing after handler failure case" +[[ "$(cat "${cursor_fail}")" == "g1_start" ]] || fail "cursor advanced despite handler failure" +grep -q "cursor not advanced" "${err_file}" || fail "missing handler-failure cursor warning" + +echo "changes_consumer_handler.sh: PASS" diff --git a/tests/integration_v2.sh b/tests/integration_v2.sh index c14902c..c262f12 100755 --- a/tests/integration_v2.sh +++ b/tests/integration_v2.sh @@ -21,6 +21,10 @@ assert_contains() { fi } +# Include deterministic changes-consumer cursor semantics checks +# in the regular integration entrypoint. +"${ROOT_DIR}/tests/changes_consumer_handler.sh" + app_init require_jq if [[ ! -S "${SOCK}" ]]; then