Canonicalize v2 changes consumer flow and tests

This commit is contained in:
Carl Niklas Rydberg 2026-02-07 21:20:16 +01:00
parent 281f775ebb
commit d3bb7a0783
9 changed files with 322 additions and 48 deletions

View file

@ -38,10 +38,10 @@ unhealthy, it automatically falls back to `fs` (configurable via
./scripts/ingest_example.sh
```
5. Run sample changes sync loop:
5. Run sample changes consumer (recommended):
```sh
./scripts/sync_loop.sh
./scripts/v2_app.sh consume-changes
```
## v2 Vertical Slice CLI
@ -52,6 +52,7 @@ Use the integrated v2 app flow wrapper:
./scripts/v2_app.sh startup-check
./scripts/v2_app.sh ingest '{"idempotency_key":"k1","mode":"continue_on_error","nodes":[{"name":"doc-1"}]}'
./scripts/v2_app.sh sync-once
./scripts/v2_app.sh consume-changes --once
./scripts/v2_app.sh retrieve 'doc-1' 'ms.within_domain'
./scripts/v2_app.sh tombstone '<edge_ref>'
```
@ -62,6 +63,13 @@ Run integration coverage (requires running `amduatd` + `jq`):
./tests/integration_v2.sh
```
Run local cursor/handler semantics checks (no daemon required):
```sh
./tests/changes_consumer_410.sh
./tests/changes_consumer_handler.sh
```
Run a fast end-to-end smoke (startup + ingest + sync + retrieve + tombstone):
```sh
@ -72,4 +80,5 @@ Run a fast end-to-end smoke (startup + ingest + sync + retrieve + tombstone):
- This scaffold assumes local Unix-socket access to `amduatd`.
- Graph cursors are opaque and must be persisted exactly as returned.
- `./scripts/sync_loop.sh` and `sync-loop` CLI are deprecated compatibility aliases.
- Keep `contracts/amduatd-api-contract.v2.json` in sync with upstream when you pull updates.

View file

@ -150,6 +150,13 @@ For integration tests/examples:
- `scripts/test_graph_queries.sh`
- `scripts/test_graph_contract.sh`
- `scripts/changes_consumer.sh` (durable changes loop with event handler hook)
- `tests/changes_consumer_410.sh` (forced cursor-expiry path)
- `tests/changes_consumer_handler.sh` (cursor advances only on handler success)
Canonical CLI entrypoint for ongoing sync:
- `./scripts/v2_app.sh consume-changes`
## 10) Copy/Paste Integration Skeleton
@ -196,14 +203,42 @@ while true; do
path="/v2/graph/changes?limit=200&wait_ms=15000"
fi
resp="$(curl --unix-socket "${SOCK}" -sS "${BASE}${path}" -H "X-Amduat-Space: ${SPACE}")" || break
# Capture response body and status for explicit 410 handling.
raw="$(curl --unix-socket "${SOCK}" -sS -w '\n%{http_code}' "${BASE}${path}" -H "X-Amduat-Space: ${SPACE}")" || break
code="$(printf '%s\n' "${raw}" | tail -n1)"
resp="$(printf '%s\n' "${raw}" | sed '$d')"
# TODO: parse and process resp.events[] in your app.
next="$(printf '%s\n' "${resp}" | sed -n 's/.*"next_cursor":"\([^"]*\)".*/\1/p')"
[ -n "${next}" ] && cursor="${next}"
if [ "${code}" = "410" ]; then
echo "changes cursor expired; rebootstrap required" >&2
cursor="" # Option: switch to a since_as_of bootstrap strategy here.
sleep 1
continue
fi
[ "${code}" = "200" ] || { echo "changes failed: HTTP ${code}" >&2; sleep 1; continue; }
# Process each event. Replace this with app-specific handler logic.
ok=1
while IFS= read -r ev; do
kind="$(printf '%s' "${ev}" | jq -r '.kind // "unknown"')"
ref="$(printf '%s' "${ev}" | jq -r '.edge_ref // .node_ref // "n/a"')"
echo "apply event kind=${kind} ref=${ref}"
# handle_event "${ev}" || { ok=0; break; }
done < <(printf '%s' "${resp}" | jq -c '.events[]?')
# Advance cursor only after all events in this batch are handled successfully.
if [ "${ok}" = "1" ]; then
next="$(printf '%s' "${resp}" | jq -r '.next_cursor // empty')"
[ -n "${next}" ] && cursor="${next}"
fi
done
```
Failure semantics for the loop above:
- Keep `cursor` in durable storage (file/db) and load it at process startup.
- Update stored cursor only after successful event processing for that response.
- On `410 Gone`, your stored cursor is outside replay retention; reset and rebootstrap with `since_as_of` or a full sync.
Agent retrieval call:
```sh

145
scripts/changes_consumer.sh Executable file
View file

@ -0,0 +1,145 @@
#!/usr/bin/env bash
set -euo pipefail
ROOT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")/.." && pwd)"
# shellcheck source=/dev/null
source "${ROOT_DIR}/src/app_v2.sh"
usage() {
cat <<USAGE
usage: $0 [--once]
options:
--once Process a single /v2/graph/changes poll and exit.
env:
CHANGES_EVENT_HANDLER Optional command string; executed per event with EVENT_JSON set.
CHANGES_LOOP_SLEEP_SEC Sleep between polls (default: 1).
USAGE
}
require_jq() {
if ! command -v jq >/dev/null 2>&1; then
echo "changes_consumer.sh: jq is required" >&2
exit 2
fi
}
read_cursor() {
if [[ -f "${CURSOR_FILE}" ]]; then
cat "${CURSOR_FILE}"
fi
}
write_cursor() {
local next="$1"
printf '%s' "${next}" > "${CURSOR_FILE}"
}
clear_cursor() {
rm -f "${CURSOR_FILE}"
}
handle_event() {
local event_json="$1"
if [[ -n "${CHANGES_EVENT_HANDLER:-}" ]]; then
EVENT_JSON="${event_json}" bash -lc "${CHANGES_EVENT_HANDLER}"
return $?
fi
local kind ref
kind="$(printf '%s' "${event_json}" | jq -r '.kind // "unknown"')"
ref="$(printf '%s' "${event_json}" | jq -r '.edge_ref // .node_ref // "n/a"')"
echo "apply event kind=${kind} ref=${ref}"
}
poll_once() {
local cursor="$1"
local path="/v2/graph/changes?limit=${SYNC_LIMIT}&wait_ms=${SYNC_WAIT_MS}"
if [[ -n "${cursor}" ]]; then
path+="&since_cursor=${cursor}"
fi
if ! amduat_api_call GET "${path}"; then
if [[ "${AMDUAT_LAST_STATUS}" == "410" ]]; then
echo "changes cursor expired; rebootstrap required" >&2
clear_cursor
printf '\n'
return 10
fi
echo "changes request failed: HTTP ${AMDUAT_LAST_STATUS}" >&2
return 1
fi
local ok=1
local event_json
while IFS= read -r event_json; do
if ! handle_event "${event_json}"; then
ok=0
break
fi
done < <(printf '%s' "${AMDUAT_LAST_BODY}" | jq -c '.events[]?')
if [[ "${ok}" != "1" ]]; then
echo "event handler failed; cursor not advanced" >&2
return 2
fi
local next_cursor
next_cursor="$(printf '%s' "${AMDUAT_LAST_BODY}" | jq -r '.next_cursor // empty')"
if [[ -n "${next_cursor}" ]]; then
write_cursor "${next_cursor}"
fi
printf '%s\n' "${AMDUAT_LAST_BODY}"
return 0
}
main() {
local once=0
if [[ $# -gt 1 ]]; then
usage >&2
return 2
fi
if [[ $# -eq 1 ]]; then
case "$1" in
--once) once=1 ;;
-h|--help)
usage
return 0
;;
*)
usage >&2
return 2
;;
esac
fi
require_jq
app_init
local loop_sleep="${CHANGES_LOOP_SLEEP_SEC:-1}"
while true; do
local cursor
cursor="$(read_cursor)"
poll_once "${cursor}" || {
status=$?
if [[ "${status}" != "10" && "${once}" == "1" ]]; then
return "${status}"
fi
if [[ "${once}" == "1" ]]; then
return 0
fi
sleep "${loop_sleep}"
continue
}
if [[ "${once}" == "1" ]]; then
return 0
fi
sleep "${loop_sleep}"
done
}
main "$@"

View file

@ -2,39 +2,5 @@
set -euo pipefail
ROOT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")/.." && pwd)"
ENV_FILE="${ROOT_DIR}/config/env.local"
if [[ ! -f "${ENV_FILE}" ]]; then
ENV_FILE="${ROOT_DIR}/config/env.example"
fi
# shellcheck source=/dev/null
source "${ENV_FILE}"
# shellcheck source=/dev/null
source "${ROOT_DIR}/src/client.sh"
limit="${SYNC_LIMIT:-200}"
wait_ms="${SYNC_WAIT_MS:-15000}"
cursor_file="${ROOT_DIR}/.cursor"
cursor=""
if [[ -f "${cursor_file}" ]]; then
cursor="$(cat "${cursor_file}")"
fi
while true; do
if [[ -n "${cursor}" ]]; then
path="/v2/graph/changes?since_cursor=${cursor}&limit=${limit}&wait_ms=${wait_ms}"
else
path="/v2/graph/changes?limit=${limit}&wait_ms=${wait_ms}"
fi
resp="$(api_get "${path}")"
echo "${resp}"
next="$(printf '%s\n' "${resp}" | sed -n 's/.*"next_cursor":"\([^"]*\)".*/\1/p')"
if [[ -n "${next}" ]]; then
cursor="${next}"
printf '%s' "${cursor}" > "${cursor_file}"
fi
sleep 1
done
echo "sync_loop.sh is deprecated; forwarding to scripts/changes_consumer.sh" >&2
exec "${ROOT_DIR}/scripts/changes_consumer.sh" "$@"

View file

@ -13,7 +13,8 @@ commands:
startup-check
ingest PAYLOAD_JSON
sync-once
sync-loop
consume-changes [--once]
sync-loop (deprecated alias for consume-changes)
retrieve ROOTS_CSV [GOAL_PREDICATES_CSV]
tombstone EDGE_REF
USAGE
@ -43,8 +44,12 @@ case "${cmd}" in
sync-once)
app_sync_once
;;
consume-changes)
"${ROOT_DIR}/scripts/changes_consumer.sh" "$@"
;;
sync-loop)
app_sync_loop
echo "sync-loop is deprecated; use consume-changes" >&2
"${ROOT_DIR}/scripts/changes_consumer.sh" "$@"
;;
retrieve)
if [[ $# -lt 1 || $# -gt 2 ]]; then

View file

@ -53,10 +53,10 @@ app_sync_once() {
}
app_sync_loop() {
while true; do
app_sync_once
sleep 1
done
local root_dir
root_dir="$(cd "$(dirname "${BASH_SOURCE[0]}")/.." && pwd)"
echo "app_sync_loop is deprecated; use scripts/changes_consumer.sh" >&2
"${root_dir}/scripts/changes_consumer.sh"
}
app_retrieve_with_fallback() {

43
tests/changes_consumer_410.sh Executable file
View file

@ -0,0 +1,43 @@
#!/usr/bin/env bash
set -euo pipefail
ROOT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")/.." && pwd)"
fail() {
echo "changes_consumer_410.sh: FAIL: $1" >&2
exit 1
}
tmp_dir="$(mktemp -d /tmp/changes-consumer-410.XXXXXX)"
cleanup() {
rm -rf "${tmp_dir}"
}
trap cleanup EXIT
cursor_file="${tmp_dir}/cursor"
printf '%s' "g1_expired_cursor" > "${cursor_file}"
mkdir -p "${tmp_dir}/bin"
cat > "${tmp_dir}/bin/curl" <<'MOCK'
#!/usr/bin/env bash
set -euo pipefail
printf '%s\n%s' '{"error":"cursor expired"}' '410'
MOCK
chmod +x "${tmp_dir}/bin/curl"
out_file="${tmp_dir}/out.log"
err_file="${tmp_dir}/err.log"
PATH="${tmp_dir}/bin:${PATH}" \
CURSOR_FILE="${cursor_file}" \
SYNC_LIMIT=10 \
SYNC_WAIT_MS=1 \
SOCK="${tmp_dir}/fake.sock" \
BASE="http://localhost" \
SPACE="app1" \
"${ROOT_DIR}/scripts/changes_consumer.sh" --once >"${out_file}" 2>"${err_file}" || fail "consumer returned non-zero for forced 410 path"
[[ ! -f "${cursor_file}" ]] || fail "cursor file should be cleared on 410"
grep -q "cursor expired" "${err_file}" || fail "expected expired cursor message in stderr"
echo "changes_consumer_410.sh: PASS"

View file

@ -0,0 +1,67 @@
#!/usr/bin/env bash
set -euo pipefail
ROOT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")/.." && pwd)"
fail() {
echo "changes_consumer_handler.sh: FAIL: $1" >&2
exit 1
}
tmp_dir="$(mktemp -d /tmp/changes-consumer-handler.XXXXXX)"
cleanup() {
rm -rf "${tmp_dir}"
}
trap cleanup EXIT
mkdir -p "${tmp_dir}/bin"
cat > "${tmp_dir}/bin/curl" <<'MOCK'
#!/usr/bin/env bash
set -euo pipefail
printf '%s\n%s' '{"events":[{"kind":"edge_upsert","edge_ref":"e1"}],"next_cursor":"g1_next"}' '200'
MOCK
chmod +x "${tmp_dir}/bin/curl"
out_file="${tmp_dir}/out.log"
err_file="${tmp_dir}/err.log"
# Case 1: handler succeeds -> cursor advances.
cursor_success="${tmp_dir}/cursor.success"
printf '%s' "g1_start" > "${cursor_success}"
PATH="${tmp_dir}/bin:${PATH}" \
CURSOR_FILE="${cursor_success}" \
SYNC_LIMIT=10 \
SYNC_WAIT_MS=1 \
SOCK="${tmp_dir}/fake.sock" \
BASE="http://localhost" \
SPACE="app1" \
CHANGES_EVENT_HANDLER='printf "%s\n" "${EVENT_JSON}" >/dev/null' \
"${ROOT_DIR}/scripts/changes_consumer.sh" --once >"${out_file}" 2>"${err_file}" || fail "consumer failed on handler success case"
[[ -f "${cursor_success}" ]] || fail "cursor file missing after handler success case"
[[ "$(cat "${cursor_success}")" == "g1_next" ]] || fail "cursor did not advance after handler success"
# Case 2: handler fails -> cursor does not advance.
cursor_fail="${tmp_dir}/cursor.fail"
printf '%s' "g1_start" > "${cursor_fail}"
set +e
PATH="${tmp_dir}/bin:${PATH}" \
CURSOR_FILE="${cursor_fail}" \
SYNC_LIMIT=10 \
SYNC_WAIT_MS=1 \
SOCK="${tmp_dir}/fake.sock" \
BASE="http://localhost" \
SPACE="app1" \
CHANGES_EVENT_HANDLER='exit 1' \
"${ROOT_DIR}/scripts/changes_consumer.sh" --once >"${out_file}" 2>"${err_file}"
rc=$?
set -e
[[ "${rc}" -ne 0 ]] || fail "expected non-zero exit when handler fails"
[[ -f "${cursor_fail}" ]] || fail "cursor file missing after handler failure case"
[[ "$(cat "${cursor_fail}")" == "g1_start" ]] || fail "cursor advanced despite handler failure"
grep -q "cursor not advanced" "${err_file}" || fail "missing handler-failure cursor warning"
echo "changes_consumer_handler.sh: PASS"

View file

@ -21,6 +21,10 @@ assert_contains() {
fi
}
# Include deterministic changes-consumer cursor semantics checks
# in the regular integration entrypoint.
"${ROOT_DIR}/tests/changes_consumer_handler.sh"
app_init
require_jq
if [[ ! -S "${SOCK}" ]]; then