Add federation cursors, pull APIs, and smoke test

This commit is contained in:
Carl Niklas Rydberg 2026-01-24 14:03:26 +01:00
parent f3a065c8ab
commit fd43cfaf59
16 changed files with 4985 additions and 53 deletions

View file

@ -27,7 +27,9 @@ target_link_libraries(amduat_federation
set(amduatd_sources src/amduatd.c src/amduatd_http.c src/amduatd_caps.c
src/amduatd_space.c src/amduatd_concepts.c
src/amduatd_store.c src/amduatd_derivation_index.c
src/amduatd_fed.c src/amduatd_space_doctor.c)
src/amduatd_fed.c src/amduatd_fed_cursor.c
src/amduatd_fed_pull_plan.c src/amduatd_fed_pull_apply.c
src/amduatd_space_doctor.c)
if(AMDUATD_ENABLE_UI)
list(APPEND amduatd_sources src/amduatd_ui.c)
endif()
@ -137,6 +139,82 @@ target_link_libraries(amduatd_test_fed_cfg
add_test(NAME amduatd_fed_cfg COMMAND amduatd_test_fed_cfg)
add_executable(amduatd_test_fed_cursor
tests/test_amduatd_fed_cursor.c
src/amduatd_fed_cursor.c
src/amduatd_fed.c
src/amduatd_space.c
src/amduatd_store.c
)
target_include_directories(amduatd_test_fed_cursor
PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}
PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}/src
PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}/vendor/amduat/include
)
target_link_libraries(amduatd_test_fed_cursor
PRIVATE amduat_asl_store_fs amduat_asl_store_index_fs amduat_asl_record
amduat_asl amduat_enc amduat_asl_pointer_fs amduat_util
amduat_hash_asl1
)
add_test(NAME amduatd_fed_cursor COMMAND amduatd_test_fed_cursor)
add_executable(amduatd_test_fed_pull_plan
tests/test_amduatd_fed_pull_plan.c
src/amduatd_fed_pull_plan.c
src/amduatd_fed_cursor.c
src/amduatd_fed.c
src/amduatd_space.c
)
target_include_directories(amduatd_test_fed_pull_plan
PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}
PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}/src
PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}/vendor/amduat/include
)
target_link_libraries(amduatd_test_fed_pull_plan
PRIVATE amduat_asl amduat_asl_record amduat_enc amduat_util amduat_hash_asl1
amduat_asl_pointer_fs
)
add_test(NAME amduatd_fed_pull_plan COMMAND amduatd_test_fed_pull_plan)
add_executable(amduatd_test_fed_pull_apply
tests/test_amduatd_fed_pull_apply.c
src/amduatd_fed_pull_apply.c
src/amduatd_fed_pull_plan.c
src/amduatd_fed_cursor.c
src/amduatd_fed.c
src/amduatd_space.c
src/amduatd_store.c
)
target_include_directories(amduatd_test_fed_pull_apply
PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}
PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}/src
PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}/vendor/amduat/include
)
target_link_libraries(amduatd_test_fed_pull_apply
PRIVATE amduat_asl_store_fs amduat_asl_store_index_fs amduat_asl_record
amduat_asl amduat_enc amduat_asl_pointer_fs amduat_util
amduat_hash_asl1 amduat_fed
)
add_test(NAME amduatd_fed_pull_apply COMMAND amduatd_test_fed_pull_apply)
add_executable(amduatd_http_unix
tests/tools/amduatd_http_unix.c
)
add_test(NAME amduatd_fed_smoke
COMMAND bash ${CMAKE_CURRENT_SOURCE_DIR}/scripts/test_fed_smoke.sh
)
set_tests_properties(amduatd_fed_smoke PROPERTIES SKIP_RETURN_CODE 77)
add_executable(amduatd_test_space_doctor
tests/test_amduatd_space_doctor.c
src/amduatd_space_doctor.c

View file

@ -71,6 +71,64 @@ Flags:
endpoints. If `--space` is configured, unix transport requests will include the
same `X-Amduat-Space` header when contacting peers.
### Federation cursors
Federation cursors track deterministic, auditable sync checkpoints per
`(space, peer)`. Cursor heads are stored as ASL pointers that reference CAS
records (`fed/cursor` schema). A peer key should be a stable identifier for the
remote (for example a federation registry domain id rendered as a decimal
string).
Read the current cursor for a peer:
```sh
curl --unix-socket amduatd.sock \
'http://localhost/v1/fed/cursor?peer=domain-2' \
-H 'X-Amduat-Space: demo'
```
Write a cursor update (CAS-safe; include `expected_ref` to enforce; omitting it
only succeeds when the cursor is absent):
```sh
curl --unix-socket amduatd.sock -X POST \
'http://localhost/v1/fed/cursor?peer=domain-2' \
-H 'Content-Type: application/json' \
-H 'X-Amduat-Space: demo' \
-d '{"last_logseq":123,"last_record_hash":"<ref>","expected_ref":"<ref>"}'
```
Cursor values are intended to drive incremental log/index scanning when that
infrastructure is available; the cursor endpoints themselves do not require the
index backend.
### Federation pull plan
You can ask the daemon to compute a read-only plan of which remote records would
be pulled from a peer given the current cursor:
```sh
curl --unix-socket amduatd.sock \
'http://localhost/v1/fed/pull/plan?peer=2&limit=128' \
-H 'X-Amduat-Space: demo'
```
The plan does not write artifacts, records, or cursors. It is deterministic and
returns only identifiers (logseq/ref), plus the next cursor candidate if the
plan were applied successfully.
Apply a bounded batch of remote records (advances the cursor only after
success):
```sh
curl --unix-socket amduatd.sock -X POST \
'http://localhost/v1/fed/pull?peer=2&limit=128' \
-H 'X-Amduat-Space: demo'
```
`/v1/fed/pull` requires the index backend and will not advance the cursor on
partial failure.
Run the daemon with derivation indexing enabled:
```sh
@ -89,6 +147,18 @@ Dev loop (build + restart):
./scripts/dev-restart.sh
```
## Federation smoke test
Run the end-to-end federation smoke test (starts two local daemons, pulls a
record, and verifies the artifact was replicated):
```sh
./scripts/test_fed_smoke.sh
```
The test requires the index backend and either `curl` with `--unix-socket`
support or the built-in `build/amduatd_http_unix` helper.
Query store meta:
```sh
@ -229,6 +299,10 @@ When the daemon uses the `fs` store backend, index-only checks are reported as
- `GET /v1/space/doctor` → deterministic space health checks
- `GET /v1/ui` → browser UI for authoring/running programs
- `GET /v1/fed/records?domain_id=...&from_logseq=...&limit=...``{domain_id, snapshot_id, log_prefix, next_logseq, records[]}` (published artifacts + tombstones + PER + TGK edges)
- `GET /v1/fed/cursor?peer=...``{peer_key, space_id, last_logseq, last_record_hash, ref}`
- `POST /v1/fed/cursor?peer=...``{ref}` (CAS update; `expected_ref` in body)
- `GET /v1/fed/pull/plan?peer=...&limit=...``{peer, effective_space, cursor, remote_scan, records, next_cursor_candidate, ...}`
- `POST /v1/fed/pull?peer=...&limit=...``{peer, effective_space, cursor_before, plan_summary, applied, cursor_after, errors}`
- `GET /v1/fed/artifacts/{ref}` → raw bytes for federation resolve
- `GET /v1/fed/status``{status, domain_id, registry_ref, last_tick_ms}`
- `POST /v1/artifacts`

View file

@ -901,3 +901,204 @@ amduat_fed_transport_t amduat_fed_transport_unix_ops(
ops.get_artifact = amduat_fed_transport_unix_get_artifact;
return ops;
}
bool amduat_fed_transport_unix_get_records_with_limit(
amduat_fed_transport_unix_t *transport,
uint32_t domain_id,
uint64_t from_logseq,
uint64_t limit,
int *out_status,
amduat_fed_record_t **out_records,
size_t *out_len,
char **out_body) {
char req[2048];
char space_header[AMDUAT_ASL_POINTER_NAME_MAX + 32u];
const char *space_line = "";
int fd;
uint8_t *buf = NULL;
size_t buf_len = 0;
const uint8_t *body = NULL;
size_t body_len = 0;
int status = 0;
if (out_status != NULL) {
*out_status = 0;
}
if (out_records != NULL) {
*out_records = NULL;
}
if (out_len != NULL) {
*out_len = 0;
}
if (out_body != NULL) {
*out_body = NULL;
}
if (transport == NULL || out_status == NULL || out_records == NULL ||
out_len == NULL) {
return false;
}
if (transport->has_space) {
snprintf(space_header,
sizeof(space_header),
"X-Amduat-Space: %s\r\n",
transport->space_id);
space_line = space_header;
}
snprintf(req, sizeof(req),
"GET /v1/fed/records?domain_id=%u&from_logseq=%llu&limit=%llu HTTP/1.1\r\n"
"Host: localhost\r\n"
"%s"
"Connection: close\r\n"
"\r\n",
(unsigned int)domain_id,
(unsigned long long)from_logseq,
(unsigned long long)limit,
space_line);
fd = amduat_fed_transport_unix_connect(transport->socket_path);
if (fd < 0) {
return false;
}
if (!amduat_fed_transport_unix_send_all(fd, req, strlen(req))) {
close(fd);
return false;
}
if (!amduat_fed_transport_unix_read_all(fd, &buf, &buf_len)) {
close(fd);
return false;
}
close(fd);
if (!amduat_fed_transport_unix_split_response(buf,
buf_len,
&body,
&body_len,
&status)) {
free(buf);
return false;
}
*out_status = status;
if (status != 200) {
if (out_body != NULL && body != NULL) {
char *copy = (char *)malloc(body_len + 1u);
if (copy == NULL) {
free(buf);
return false;
}
if (body_len != 0u) {
memcpy(copy, body, body_len);
}
copy[body_len] = '\0';
*out_body = copy;
}
free(buf);
return true;
}
if (!amduat_fed_transport_parse_records((const char *)body,
body_len,
out_records,
out_len)) {
free(buf);
return false;
}
free(buf);
return true;
}
bool amduat_fed_transport_unix_get_artifact_with_status(
amduat_fed_transport_unix_t *transport,
amduat_reference_t ref,
int *out_status,
amduat_octets_t *out_bytes,
char **out_body) {
char *ref_hex = NULL;
char req[2048];
char space_header[AMDUAT_ASL_POINTER_NAME_MAX + 32u];
const char *space_line = "";
int fd;
uint8_t *buf = NULL;
size_t buf_len = 0;
const uint8_t *body = NULL;
size_t body_len = 0;
int status = 0;
if (out_status != NULL) {
*out_status = 0;
}
if (out_bytes != NULL) {
*out_bytes = amduat_octets(NULL, 0u);
}
if (out_body != NULL) {
*out_body = NULL;
}
if (transport == NULL || out_status == NULL || out_bytes == NULL) {
return false;
}
if (!amduat_asl_ref_encode_hex(ref, &ref_hex)) {
return false;
}
if (transport->has_space) {
snprintf(space_header,
sizeof(space_header),
"X-Amduat-Space: %s\r\n",
transport->space_id);
space_line = space_header;
}
snprintf(req, sizeof(req),
"GET /v1/fed/artifacts/%s HTTP/1.1\r\n"
"Host: localhost\r\n"
"%s"
"Connection: close\r\n"
"\r\n",
ref_hex,
space_line);
free(ref_hex);
fd = amduat_fed_transport_unix_connect(transport->socket_path);
if (fd < 0) {
return false;
}
if (!amduat_fed_transport_unix_send_all(fd, req, strlen(req))) {
close(fd);
return false;
}
if (!amduat_fed_transport_unix_read_all(fd, &buf, &buf_len)) {
close(fd);
return false;
}
close(fd);
if (!amduat_fed_transport_unix_split_response(buf,
buf_len,
&body,
&body_len,
&status)) {
free(buf);
return false;
}
*out_status = status;
if (status != 200) {
if (out_body != NULL && body != NULL) {
char *copy = (char *)malloc(body_len + 1u);
if (copy == NULL) {
free(buf);
return false;
}
if (body_len != 0u) {
memcpy(copy, body, body_len);
}
copy[body_len] = '\0';
*out_body = copy;
}
free(buf);
return true;
}
if (!amduat_octets_clone(amduat_octets(body, body_len), out_bytes)) {
free(buf);
return false;
}
free(buf);
return true;
}

View file

@ -25,6 +25,27 @@ bool amduat_fed_transport_unix_set_space(amduat_fed_transport_unix_t *transport,
amduat_fed_transport_t amduat_fed_transport_unix_ops(
amduat_fed_transport_unix_t *transport);
/* Returns true on successful HTTP exchange. Caller frees records via
* amduat_fed_transport_unix_ops(...).free_records and frees out_body with free.
*/
bool amduat_fed_transport_unix_get_records_with_limit(
amduat_fed_transport_unix_t *transport,
uint32_t domain_id,
uint64_t from_logseq,
uint64_t limit,
int *out_status,
amduat_fed_record_t **out_records,
size_t *out_len,
char **out_body);
/* Returns true on successful HTTP exchange. Caller frees out_body with free. */
bool amduat_fed_transport_unix_get_artifact_with_status(
amduat_fed_transport_unix_t *transport,
amduat_reference_t ref,
int *out_status,
amduat_octets_t *out_bytes,
char **out_body);
#ifdef __cplusplus
} /* extern "C" */
#endif

256
scripts/test_fed_smoke.sh Normal file
View file

@ -0,0 +1,256 @@
#!/usr/bin/env bash
set -euo pipefail
ROOT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")/.." && pwd)"
HTTP_HELPER="${ROOT_DIR}/build/amduatd_http_unix"
USE_HTTP_HELPER=0
TMPDIR="${TMPDIR:-/tmp}"
mkdir -p "${TMPDIR}"
if ! command -v grep >/dev/null 2>&1; then
echo "skip: grep not found" >&2
exit 77
fi
if ! command -v awk >/dev/null 2>&1; then
echo "skip: awk not found" >&2
exit 77
fi
if command -v curl >/dev/null 2>&1; then
if curl --help 2>/dev/null | grep -q -- '--unix-socket'; then
USE_HTTP_HELPER=0
else
USE_HTTP_HELPER=1
fi
else
USE_HTTP_HELPER=1
fi
if [[ "${USE_HTTP_HELPER}" -eq 1 && ! -x "${HTTP_HELPER}" ]]; then
echo "skip: curl lacks --unix-socket support and helper missing" >&2
exit 77
fi
AMDUATD_BIN="${ROOT_DIR}/build/amduatd"
ASL_BIN="${ROOT_DIR}/vendor/amduat/build/amduat-asl"
if [[ ! -x "${AMDUATD_BIN}" || ! -x "${ASL_BIN}" ]]; then
echo "missing binaries; build amduatd and amduat-asl first" >&2
exit 1
fi
tmp_root="$(mktemp -d -p "${TMPDIR}" amduatd-fed-smoke-XXXXXX)"
root_a="${tmp_root}/a"
root_b="${tmp_root}/b"
sock_a="${tmp_root}/amduatd-a.sock"
sock_b="${tmp_root}/amduatd-b.sock"
space_id="smoke"
log_a="${tmp_root}/amduatd-a.log"
log_b="${tmp_root}/amduatd-b.log"
cleanup() {
if [[ -n "${pid_a:-}" ]]; then
kill "${pid_a}" >/dev/null 2>&1 || true
fi
if [[ -n "${pid_b:-}" ]]; then
kill "${pid_b}" >/dev/null 2>&1 || true
fi
rm -rf "${tmp_root}"
}
trap cleanup EXIT
mkdir -p "${root_a}" "${root_b}"
"${ASL_BIN}" init --root "${root_a}"
"${ASL_BIN}" init --root "${root_b}"
"${AMDUATD_BIN}" --root "${root_a}" --sock "${sock_a}" \
--store-backend index --space "${space_id}" \
--fed-enable --fed-transport unix \
--fed-unix-sock "${sock_b}" --fed-domain-id 1 \
>"${log_a}" 2>&1 &
pid_a=$!
"${AMDUATD_BIN}" --root "${root_b}" --sock "${sock_b}" \
--store-backend index --space "${space_id}" \
--fed-enable --fed-transport unix \
--fed-unix-sock "${sock_a}" --fed-domain-id 2 \
>"${log_b}" 2>&1 &
pid_b=$!
http_get() {
local sock="$1"
local path="$2"
shift 2
if [[ "${USE_HTTP_HELPER}" -eq 1 ]]; then
"${HTTP_HELPER}" --sock "${sock}" --method GET --path "${path}" "$@"
else
curl --silent --show-error --fail \
--unix-socket "${sock}" \
"$@" \
"http://localhost${path}"
fi
}
http_get_allow() {
local sock="$1"
local path="$2"
shift 2
if [[ "${USE_HTTP_HELPER}" -eq 1 ]]; then
"${HTTP_HELPER}" --sock "${sock}" --method GET --path "${path}" \
--allow-status "$@"
else
curl --silent --show-error \
--unix-socket "${sock}" \
"$@" \
"http://localhost${path}"
fi
}
http_post() {
local sock="$1"
local path="$2"
local data="$3"
shift 3
if [[ "${USE_HTTP_HELPER}" -eq 1 ]]; then
"${HTTP_HELPER}" --sock "${sock}" --method POST --path "${path}" \
--data "${data}" \
"$@"
else
curl --silent --show-error --fail \
--unix-socket "${sock}" \
"$@" \
--data-binary "${data}" \
"http://localhost${path}"
fi
}
wait_for_ready() {
local sock="$1"
local pid="$2"
local log_path="$3"
local i
for i in $(seq 1 100); do
if ! kill -0 "${pid}" >/dev/null 2>&1; then
if [[ -f "${log_path}" ]] && grep -q "bind: Operation not permitted" "${log_path}"; then
echo "skip: bind not permitted for unix socket" >&2
exit 77
fi
if [[ -f "${log_path}" ]]; then
cat "${log_path}" >&2
fi
return 1
fi
if [[ -S "${sock}" ]]; then
if http_get "${sock}" "/v1/meta" >/dev/null 2>&1; then
return 0
fi
fi
sleep 0.1
done
return 1
}
if ! wait_for_ready "${sock_a}" "${pid_a}" "${log_a}"; then
echo "daemon A not ready" >&2
exit 1
fi
if ! wait_for_ready "${sock_b}" "${pid_b}" "${log_b}"; then
echo "daemon B not ready" >&2
exit 1
fi
payload="fed-smoke"
artifact_resp="$(
http_post "${sock_a}" "/v1/artifacts" "${payload}" \
--header "Content-Type: application/octet-stream" \
--header "X-Amduat-Space: ${space_id}"
)" || {
echo "artifact POST failed" >&2
if [[ -f "${log_a}" ]]; then
cat "${log_a}" >&2
fi
exit 1
}
ref="$(
printf '%s' "${artifact_resp}" \
| tr -d '\r\n' \
| awk 'match($0, /"ref":"[^"]+"/) {print substr($0, RSTART+7, RLENGTH-8)}'
)"
if [[ -z "${ref}" ]]; then
echo "failed to parse ref from daemon A" >&2
echo "artifact response: ${artifact_resp}" >&2
if [[ -f "${log_a}" ]]; then
cat "${log_a}" >&2
fi
exit 1
fi
plan_resp="$(
http_get_allow "${sock_b}" "/v1/fed/pull/plan?peer=1&limit=8" \
--header "X-Amduat-Space: ${space_id}"
)" || {
echo "pull plan failed" >&2
if [[ -f "${log_b}" ]]; then
cat "${log_b}" >&2
fi
exit 1
}
if echo "${plan_resp}" | grep -q "\"record_count\":0"; then
echo "pull plan empty" >&2
echo "plan response: ${plan_resp}" >&2
exit 1
fi
pull_resp="$(
http_post "${sock_b}" "/v1/fed/pull?peer=1&limit=8" "" \
--header "X-Amduat-Space: ${space_id}"
)" || {
echo "pull apply failed" >&2
if [[ -f "${log_b}" ]]; then
cat "${log_b}" >&2
fi
exit 1
}
if ! echo "${pull_resp}" | grep -q "\"advanced\":true"; then
echo "pull did not advance cursor" >&2
echo "pull response: ${pull_resp}" >&2
exit 1
fi
cursor_json="$(
http_get_allow "${sock_b}" "/v1/fed/cursor?peer=1" \
--header "X-Amduat-Space: ${space_id}"
)" || {
echo "cursor fetch failed" >&2
if [[ -f "${log_a}" ]]; then
cat "${log_a}" >&2
fi
if [[ -f "${log_b}" ]]; then
cat "${log_b}" >&2
fi
exit 1
}
echo "${cursor_json}" | grep -q "\"last_logseq\":" || {
echo "cursor missing last_logseq" >&2
echo "cursor response: ${cursor_json}" >&2
exit 1
}
payload_b="$(
http_get "${sock_b}" "/v1/artifacts/${ref}" \
--header "X-Amduat-Space: ${space_id}"
)" || {
echo "artifact fetch failed" >&2
if [[ -f "${log_b}" ]]; then
cat "${log_b}" >&2
fi
exit 1
}
if [[ "${payload_b}" != "${payload}" ]]; then
echo "payload mismatch after pull" >&2
exit 1
fi
echo "fed smoke ok"

File diff suppressed because it is too large Load diff

584
src/amduatd_fed_cursor.c Normal file
View file

@ -0,0 +1,584 @@
#include "amduatd_fed_cursor.h"
#include "amduat/asl/record.h"
#include "amduat/enc/asl1_core_codec.h"
#include <stdlib.h>
#include <string.h>
enum {
AMDUATD_FED_CURSOR_MAGIC_LEN = 8,
AMDUATD_FED_CURSOR_VERSION = 1
};
static const uint8_t k_amduatd_fed_cursor_magic[AMDUATD_FED_CURSOR_MAGIC_LEN] = {
'A', 'F', 'C', 'U', 'R', '1', '\0', '\0'
};
enum {
AMDUATD_FED_CURSOR_FLAG_HAS_LOGSEQ = 1u << 0,
AMDUATD_FED_CURSOR_FLAG_HAS_RECORD_REF = 1u << 1,
AMDUATD_FED_CURSOR_FLAG_HAS_SPACE = 1u << 2
};
static void amduatd_fed_cursor_store_u32_le(uint8_t *out, uint32_t value) {
out[0] = (uint8_t)(value & 0xffu);
out[1] = (uint8_t)((value >> 8) & 0xffu);
out[2] = (uint8_t)((value >> 16) & 0xffu);
out[3] = (uint8_t)((value >> 24) & 0xffu);
}
static void amduatd_fed_cursor_store_u64_le(uint8_t *out, uint64_t value) {
out[0] = (uint8_t)(value & 0xffu);
out[1] = (uint8_t)((value >> 8) & 0xffu);
out[2] = (uint8_t)((value >> 16) & 0xffu);
out[3] = (uint8_t)((value >> 24) & 0xffu);
out[4] = (uint8_t)((value >> 32) & 0xffu);
out[5] = (uint8_t)((value >> 40) & 0xffu);
out[6] = (uint8_t)((value >> 48) & 0xffu);
out[7] = (uint8_t)((value >> 56) & 0xffu);
}
static bool amduatd_fed_cursor_read_u32_le(const uint8_t *data,
size_t len,
size_t *offset,
uint32_t *out) {
if (len - *offset < 4u) {
return false;
}
*out = (uint32_t)data[*offset] |
((uint32_t)data[*offset + 1u] << 8) |
((uint32_t)data[*offset + 2u] << 16) |
((uint32_t)data[*offset + 3u] << 24);
*offset += 4u;
return true;
}
static bool amduatd_fed_cursor_read_u64_le(const uint8_t *data,
size_t len,
size_t *offset,
uint64_t *out) {
if (len - *offset < 8u) {
return false;
}
*out = (uint64_t)data[*offset] |
((uint64_t)data[*offset + 1u] << 8) |
((uint64_t)data[*offset + 2u] << 16) |
((uint64_t)data[*offset + 3u] << 24) |
((uint64_t)data[*offset + 4u] << 32) |
((uint64_t)data[*offset + 5u] << 40) |
((uint64_t)data[*offset + 6u] << 48) |
((uint64_t)data[*offset + 7u] << 56);
*offset += 8u;
return true;
}
static bool amduatd_fed_cursor_add_size(size_t *acc, size_t add) {
if (*acc > SIZE_MAX - add) {
return false;
}
*acc += add;
return true;
}
static bool amduatd_fed_cursor_strdup(const char *s, size_t len, char **out) {
char *buf;
if (out == NULL) {
return false;
}
*out = NULL;
if (s == NULL || len == 0u) {
return false;
}
if (len > SIZE_MAX - 1u) {
return false;
}
buf = (char *)malloc(len + 1u);
if (buf == NULL) {
return false;
}
memcpy(buf, s, len);
buf[len] = '\0';
*out = buf;
return true;
}
static bool amduatd_fed_cursor_peer_key_is_valid(const char *peer_key) {
if (peer_key == NULL || peer_key[0] == '\0') {
return false;
}
return amduat_asl_pointer_name_is_valid(peer_key);
}
static bool amduatd_fed_cursor_record_encode(
const amduatd_fed_cursor_record_t *record,
amduat_octets_t *out_payload) {
size_t total = 0u;
uint32_t peer_len = 0u;
uint32_t space_len = 0u;
uint32_t flags = 0u;
amduat_octets_t ref_bytes = amduat_octets(NULL, 0u);
uint8_t *buf = NULL;
size_t offset = 0u;
if (out_payload != NULL) {
*out_payload = amduat_octets(NULL, 0u);
}
if (record == NULL || out_payload == NULL) {
return false;
}
if (record->peer_key == NULL || record->peer_key[0] == '\0') {
return false;
}
peer_len = (uint32_t)strlen(record->peer_key);
if (record->space_id != NULL && record->space_id[0] != '\0') {
space_len = (uint32_t)strlen(record->space_id);
flags |= AMDUATD_FED_CURSOR_FLAG_HAS_SPACE;
}
if (record->has_logseq) {
flags |= AMDUATD_FED_CURSOR_FLAG_HAS_LOGSEQ;
}
if (record->has_record_ref) {
if (record->last_record_ref.digest.data == NULL ||
record->last_record_ref.digest.len == 0u) {
return false;
}
if (!amduat_enc_asl1_core_encode_reference_v1(record->last_record_ref,
&ref_bytes)) {
return false;
}
flags |= AMDUATD_FED_CURSOR_FLAG_HAS_RECORD_REF;
}
if (!amduatd_fed_cursor_add_size(&total, AMDUATD_FED_CURSOR_MAGIC_LEN) ||
!amduatd_fed_cursor_add_size(&total, 4u + 4u + 4u + 4u) ||
!amduatd_fed_cursor_add_size(&total, peer_len) ||
!amduatd_fed_cursor_add_size(&total, space_len) ||
(record->has_logseq && !amduatd_fed_cursor_add_size(&total, 8u)) ||
(record->has_record_ref &&
!amduatd_fed_cursor_add_size(&total, 4u + ref_bytes.len))) {
amduat_octets_free(&ref_bytes);
return false;
}
buf = (uint8_t *)malloc(total);
if (buf == NULL) {
amduat_octets_free(&ref_bytes);
return false;
}
memcpy(buf + offset,
k_amduatd_fed_cursor_magic,
AMDUATD_FED_CURSOR_MAGIC_LEN);
offset += AMDUATD_FED_CURSOR_MAGIC_LEN;
amduatd_fed_cursor_store_u32_le(buf + offset,
(uint32_t)AMDUATD_FED_CURSOR_VERSION);
offset += 4u;
amduatd_fed_cursor_store_u32_le(buf + offset, flags);
offset += 4u;
amduatd_fed_cursor_store_u32_le(buf + offset, peer_len);
offset += 4u;
amduatd_fed_cursor_store_u32_le(buf + offset, space_len);
offset += 4u;
memcpy(buf + offset, record->peer_key, peer_len);
offset += peer_len;
if (space_len != 0u) {
memcpy(buf + offset, record->space_id, space_len);
offset += space_len;
}
if (record->has_logseq) {
amduatd_fed_cursor_store_u64_le(buf + offset, record->last_logseq);
offset += 8u;
}
if (record->has_record_ref) {
amduatd_fed_cursor_store_u32_le(buf + offset, (uint32_t)ref_bytes.len);
offset += 4u;
memcpy(buf + offset, ref_bytes.data, ref_bytes.len);
offset += ref_bytes.len;
}
amduat_octets_free(&ref_bytes);
*out_payload = amduat_octets(buf, total);
return true;
}
static bool amduatd_fed_cursor_record_decode(
amduat_octets_t payload,
amduatd_fed_cursor_record_t *out_record) {
size_t offset = 0u;
uint32_t version = 0u;
uint32_t flags = 0u;
uint32_t peer_len = 0u;
uint32_t space_len = 0u;
bool has_logseq = false;
bool has_record_ref = false;
bool has_space = false;
char *peer_key = NULL;
char *space_id = NULL;
amduat_reference_t record_ref = amduat_reference(0u, amduat_octets(NULL, 0u));
if (out_record == NULL) {
return false;
}
amduatd_fed_cursor_record_init(out_record);
if (payload.len < AMDUATD_FED_CURSOR_MAGIC_LEN + 16u) {
return false;
}
if (memcmp(payload.data,
k_amduatd_fed_cursor_magic,
AMDUATD_FED_CURSOR_MAGIC_LEN) != 0) {
return false;
}
offset = AMDUATD_FED_CURSOR_MAGIC_LEN;
if (!amduatd_fed_cursor_read_u32_le(payload.data,
payload.len,
&offset,
&version) ||
version != AMDUATD_FED_CURSOR_VERSION ||
!amduatd_fed_cursor_read_u32_le(payload.data,
payload.len,
&offset,
&flags) ||
!amduatd_fed_cursor_read_u32_le(payload.data,
payload.len,
&offset,
&peer_len) ||
!amduatd_fed_cursor_read_u32_le(payload.data,
payload.len,
&offset,
&space_len)) {
return false;
}
has_logseq = (flags & AMDUATD_FED_CURSOR_FLAG_HAS_LOGSEQ) != 0u;
has_record_ref = (flags & AMDUATD_FED_CURSOR_FLAG_HAS_RECORD_REF) != 0u;
has_space = (flags & AMDUATD_FED_CURSOR_FLAG_HAS_SPACE) != 0u;
if (peer_len == 0u || payload.len - offset < peer_len) {
return false;
}
if (!amduatd_fed_cursor_strdup((const char *)payload.data + offset,
peer_len,
&peer_key)) {
return false;
}
offset += peer_len;
if ((space_len != 0u) != has_space) {
free(peer_key);
return false;
}
if (space_len != 0u) {
if (payload.len - offset < space_len) {
free(peer_key);
return false;
}
if (!amduatd_fed_cursor_strdup((const char *)payload.data + offset,
space_len,
&space_id)) {
free(peer_key);
return false;
}
offset += space_len;
}
if (has_logseq) {
if (!amduatd_fed_cursor_read_u64_le(payload.data,
payload.len,
&offset,
&out_record->last_logseq)) {
free(peer_key);
free(space_id);
return false;
}
out_record->has_logseq = true;
}
if (has_record_ref) {
uint32_t ref_len = 0u;
amduat_octets_t ref_bytes;
if (!amduatd_fed_cursor_read_u32_le(payload.data,
payload.len,
&offset,
&ref_len) ||
payload.len - offset < ref_len) {
free(peer_key);
free(space_id);
return false;
}
ref_bytes = amduat_octets(payload.data + offset, ref_len);
offset += ref_len;
if (!amduat_enc_asl1_core_decode_reference_v1(ref_bytes, &record_ref)) {
free(peer_key);
free(space_id);
return false;
}
out_record->last_record_ref = record_ref;
out_record->has_record_ref = true;
}
if (offset != payload.len) {
if (out_record->has_record_ref) {
amduat_reference_free(&out_record->last_record_ref);
}
free(peer_key);
free(space_id);
return false;
}
out_record->peer_key = peer_key;
out_record->space_id = space_id;
return true;
}
void amduatd_fed_cursor_record_init(amduatd_fed_cursor_record_t *record) {
if (record == NULL) {
return;
}
memset(record, 0, sizeof(*record));
record->last_record_ref = amduat_reference(0u, amduat_octets(NULL, 0u));
}
void amduatd_fed_cursor_record_free(amduatd_fed_cursor_record_t *record) {
if (record == NULL) {
return;
}
free(record->peer_key);
free(record->space_id);
record->peer_key = NULL;
record->space_id = NULL;
if (record->has_record_ref) {
amduat_reference_free(&record->last_record_ref);
}
memset(record, 0, sizeof(*record));
}
bool amduatd_fed_cursor_pointer_name(const amduatd_space_t *space,
const char *peer_key,
amduat_octets_t *out_name) {
const char prefix[] = "fed/cursor/";
const char suffix[] = "/head";
size_t peer_len;
size_t total_len;
char *base = NULL;
bool ok;
if (out_name != NULL) {
*out_name = amduat_octets(NULL, 0u);
}
if (out_name == NULL || !amduatd_fed_cursor_peer_key_is_valid(peer_key)) {
return false;
}
peer_len = strlen(peer_key);
if (peer_len > SIZE_MAX - (sizeof(prefix) - 1u) - (sizeof(suffix) - 1u)) {
return false;
}
total_len = (sizeof(prefix) - 1u) + peer_len + (sizeof(suffix) - 1u);
base = (char *)malloc(total_len + 1u);
if (base == NULL) {
return false;
}
memcpy(base, prefix, sizeof(prefix) - 1u);
memcpy(base + sizeof(prefix) - 1u, peer_key, peer_len);
memcpy(base + sizeof(prefix) - 1u + peer_len, suffix, sizeof(suffix) - 1u);
base[total_len] = '\0';
ok = amduatd_space_scope_name(space, base, out_name);
free(base);
return ok;
}
amduatd_fed_cursor_status_t amduatd_fed_cursor_check_enabled(
const amduatd_fed_cfg_t *cfg) {
if (cfg == NULL) {
return AMDUATD_FED_CURSOR_ERR_INVALID;
}
if (!cfg->enabled) {
return AMDUATD_FED_CURSOR_ERR_DISABLED;
}
return AMDUATD_FED_CURSOR_OK;
}
amduatd_fed_cursor_status_t amduatd_fed_cursor_get(
amduat_asl_store_t *store,
amduat_asl_pointer_store_t *pointer_store,
const amduatd_space_t *effective_space,
const char *peer_key,
amduatd_fed_cursor_record_t *out_cursor,
amduat_reference_t *out_ref) {
amduat_octets_t pointer_name = amduat_octets(NULL, 0u);
amduat_reference_t pointer_ref;
amduat_asl_pointer_error_t perr;
bool exists = false;
amduat_asl_record_t record;
amduat_asl_store_error_t store_err;
if (out_ref != NULL) {
*out_ref = amduat_reference(0u, amduat_octets(NULL, 0u));
}
if (store == NULL || pointer_store == NULL || out_cursor == NULL ||
peer_key == NULL) {
return AMDUATD_FED_CURSOR_ERR_INVALID;
}
if (!amduatd_fed_cursor_pointer_name(effective_space,
peer_key,
&pointer_name)) {
return AMDUATD_FED_CURSOR_ERR_INVALID;
}
memset(&pointer_ref, 0, sizeof(pointer_ref));
perr = amduat_asl_pointer_get(pointer_store,
(const char *)pointer_name.data,
&exists,
&pointer_ref);
amduat_octets_free(&pointer_name);
if (perr != AMDUAT_ASL_POINTER_OK) {
return AMDUATD_FED_CURSOR_ERR_POINTER;
}
if (!exists) {
return AMDUATD_FED_CURSOR_ERR_NOT_FOUND;
}
memset(&record, 0, sizeof(record));
store_err = amduat_asl_record_store_get(store, pointer_ref, &record);
if (store_err != AMDUAT_ASL_STORE_OK) {
amduat_reference_free(&pointer_ref);
return AMDUATD_FED_CURSOR_ERR_STORE;
}
if (record.schema.len != strlen("fed/cursor") ||
memcmp(record.schema.data, "fed/cursor", record.schema.len) != 0) {
amduat_asl_record_free(&record);
amduat_reference_free(&pointer_ref);
return AMDUATD_FED_CURSOR_ERR_CODEC;
}
if (!amduatd_fed_cursor_record_decode(record.payload, out_cursor)) {
amduat_asl_record_free(&record);
amduat_reference_free(&pointer_ref);
return AMDUATD_FED_CURSOR_ERR_CODEC;
}
amduat_asl_record_free(&record);
if (strcmp(out_cursor->peer_key, peer_key) != 0) {
amduatd_fed_cursor_record_free(out_cursor);
amduat_reference_free(&pointer_ref);
return AMDUATD_FED_CURSOR_ERR_CODEC;
}
if (effective_space != NULL && effective_space->enabled &&
effective_space->space_id.data != NULL) {
const char *space_id = (const char *)effective_space->space_id.data;
if (out_cursor->space_id == NULL ||
strcmp(out_cursor->space_id, space_id) != 0) {
amduatd_fed_cursor_record_free(out_cursor);
amduat_reference_free(&pointer_ref);
return AMDUATD_FED_CURSOR_ERR_CODEC;
}
} else if (out_cursor->space_id != NULL && out_cursor->space_id[0] != '\0') {
amduatd_fed_cursor_record_free(out_cursor);
amduat_reference_free(&pointer_ref);
return AMDUATD_FED_CURSOR_ERR_CODEC;
}
if (out_ref != NULL) {
if (!amduat_reference_clone(pointer_ref, out_ref)) {
amduatd_fed_cursor_record_free(out_cursor);
amduat_reference_free(&pointer_ref);
return AMDUATD_FED_CURSOR_ERR_STORE;
}
}
amduat_reference_free(&pointer_ref);
return AMDUATD_FED_CURSOR_OK;
}
amduatd_fed_cursor_status_t amduatd_fed_cursor_cas_set(
amduat_asl_store_t *store,
amduat_asl_pointer_store_t *pointer_store,
const amduatd_space_t *effective_space,
const char *peer_key,
const amduat_reference_t *expected_ref,
const amduatd_fed_cursor_record_t *new_cursor,
amduat_reference_t *out_new_ref) {
amduat_octets_t pointer_name = amduat_octets(NULL, 0u);
amduat_octets_t payload = amduat_octets(NULL, 0u);
amduat_reference_t record_ref;
amduat_asl_store_error_t store_err;
amduat_asl_pointer_error_t perr;
bool swapped = false;
if (out_new_ref != NULL) {
*out_new_ref = amduat_reference(0u, amduat_octets(NULL, 0u));
}
if (store == NULL || pointer_store == NULL || peer_key == NULL ||
new_cursor == NULL) {
return AMDUATD_FED_CURSOR_ERR_INVALID;
}
if (new_cursor->peer_key == NULL ||
strcmp(new_cursor->peer_key, peer_key) != 0) {
return AMDUATD_FED_CURSOR_ERR_INVALID;
}
if (!new_cursor->has_logseq && !new_cursor->has_record_ref) {
return AMDUATD_FED_CURSOR_ERR_INVALID;
}
if (effective_space != NULL && effective_space->enabled &&
effective_space->space_id.data != NULL) {
const char *space_id = (const char *)effective_space->space_id.data;
if (new_cursor->space_id == NULL ||
strcmp(new_cursor->space_id, space_id) != 0) {
return AMDUATD_FED_CURSOR_ERR_INVALID;
}
} else if (new_cursor->space_id != NULL && new_cursor->space_id[0] != '\0') {
return AMDUATD_FED_CURSOR_ERR_INVALID;
}
if (!amduatd_fed_cursor_pointer_name(effective_space,
peer_key,
&pointer_name)) {
return AMDUATD_FED_CURSOR_ERR_INVALID;
}
if (!amduatd_fed_cursor_record_encode(new_cursor, &payload)) {
amduat_octets_free(&pointer_name);
return AMDUATD_FED_CURSOR_ERR_CODEC;
}
memset(&record_ref, 0, sizeof(record_ref));
store_err = amduat_asl_record_store_put(store,
amduat_octets("fed/cursor",
strlen("fed/cursor")),
payload,
&record_ref);
amduat_octets_free(&payload);
if (store_err != AMDUAT_ASL_STORE_OK) {
amduat_octets_free(&pointer_name);
return AMDUATD_FED_CURSOR_ERR_STORE;
}
perr = amduat_asl_pointer_cas(pointer_store,
(const char *)pointer_name.data,
expected_ref != NULL,
expected_ref,
&record_ref,
&swapped);
amduat_octets_free(&pointer_name);
if (perr != AMDUAT_ASL_POINTER_OK) {
amduat_reference_free(&record_ref);
return AMDUATD_FED_CURSOR_ERR_POINTER;
}
if (!swapped) {
amduat_reference_free(&record_ref);
return AMDUATD_FED_CURSOR_ERR_CONFLICT;
}
if (out_new_ref != NULL) {
if (!amduat_reference_clone(record_ref, out_new_ref)) {
amduat_reference_free(&record_ref);
return AMDUATD_FED_CURSOR_ERR_STORE;
}
}
amduat_reference_free(&record_ref);
return AMDUATD_FED_CURSOR_OK;
}

69
src/amduatd_fed_cursor.h Normal file
View file

@ -0,0 +1,69 @@
#ifndef AMDUATD_FED_CURSOR_H
#define AMDUATD_FED_CURSOR_H
#include "amduat/asl/asl_pointer_fs.h"
#include "amduat/asl/core.h"
#include "amduat/asl/store.h"
#include "amduatd_fed.h"
#include "amduatd_space.h"
#include <stdbool.h>
#include <stdint.h>
#ifdef __cplusplus
extern "C" {
#endif
typedef enum {
AMDUATD_FED_CURSOR_OK = 0,
AMDUATD_FED_CURSOR_ERR_INVALID = 1,
AMDUATD_FED_CURSOR_ERR_NOT_FOUND = 2,
AMDUATD_FED_CURSOR_ERR_POINTER = 3,
AMDUATD_FED_CURSOR_ERR_STORE = 4,
AMDUATD_FED_CURSOR_ERR_CODEC = 5,
AMDUATD_FED_CURSOR_ERR_CONFLICT = 6,
AMDUATD_FED_CURSOR_ERR_DISABLED = 7
} amduatd_fed_cursor_status_t;
typedef struct {
char *peer_key;
char *space_id;
bool has_logseq;
uint64_t last_logseq;
bool has_record_ref;
amduat_reference_t last_record_ref;
} amduatd_fed_cursor_record_t;
void amduatd_fed_cursor_record_init(amduatd_fed_cursor_record_t *record);
void amduatd_fed_cursor_record_free(amduatd_fed_cursor_record_t *record);
bool amduatd_fed_cursor_pointer_name(const amduatd_space_t *space,
const char *peer_key,
amduat_octets_t *out_name);
amduatd_fed_cursor_status_t amduatd_fed_cursor_check_enabled(
const amduatd_fed_cfg_t *cfg);
amduatd_fed_cursor_status_t amduatd_fed_cursor_get(
amduat_asl_store_t *store,
amduat_asl_pointer_store_t *pointer_store,
const amduatd_space_t *effective_space,
const char *peer_key,
amduatd_fed_cursor_record_t *out_cursor,
amduat_reference_t *out_ref);
amduatd_fed_cursor_status_t amduatd_fed_cursor_cas_set(
amduat_asl_store_t *store,
amduat_asl_pointer_store_t *pointer_store,
const amduatd_space_t *effective_space,
const char *peer_key,
const amduat_reference_t *expected_ref,
const amduatd_fed_cursor_record_t *new_cursor,
amduat_reference_t *out_new_ref);
#ifdef __cplusplus
} /* extern "C" */
#endif
#endif /* AMDUATD_FED_CURSOR_H */

View file

@ -0,0 +1,518 @@
#include "amduatd_fed_pull_apply.h"
#include "amduat/asl/artifact_io.h"
#include "amduat/asl/store.h"
#include "amduat/enc/fer1_receipt.h"
#include "amduat/enc/tgk1_edge.h"
#include "amduat/fed/ingest.h"
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
static bool amduatd_fed_pull_parse_u32(const char *s, uint32_t *out) {
char *end = NULL;
unsigned long val;
if (s == NULL || out == NULL || s[0] == '\0') {
return false;
}
val = strtoul(s, &end, 10);
if (end == s || *end != '\0' || val > UINT32_MAX) {
return false;
}
*out = (uint32_t)val;
return true;
}
static bool amduatd_fed_pull_strdup(const char *s, char **out) {
size_t len;
char *buf;
if (out == NULL) {
return false;
}
*out = NULL;
if (s == NULL) {
return false;
}
len = strlen(s);
if (len > SIZE_MAX - 1u) {
return false;
}
buf = (char *)malloc(len + 1u);
if (buf == NULL) {
return false;
}
if (len != 0u) {
memcpy(buf, s, len);
}
buf[len] = '\0';
*out = buf;
return true;
}
void amduatd_fed_pull_apply_report_init(
amduatd_fed_pull_apply_report_t *report) {
if (report == NULL) {
return;
}
memset(report, 0, sizeof(*report));
report->cursor_ref = amduat_reference(0u, amduat_octets(NULL, 0u));
report->cursor_after_ref = amduat_reference(0u, amduat_octets(NULL, 0u));
amduatd_fed_pull_plan_candidate_init(&report->plan_candidate);
}
void amduatd_fed_pull_apply_report_free(
amduatd_fed_pull_apply_report_t *report) {
if (report == NULL) {
return;
}
if (report->cursor_ref_set) {
amduat_reference_free(&report->cursor_ref);
}
if (report->cursor_after_ref_set) {
amduat_reference_free(&report->cursor_after_ref);
}
amduatd_fed_pull_plan_candidate_free(&report->plan_candidate);
memset(report, 0, sizeof(*report));
}
static void amduatd_fed_pull_report_error(
amduatd_fed_pull_apply_report_t *report,
const char *msg) {
if (report == NULL || msg == NULL) {
return;
}
memset(report->error, 0, sizeof(report->error));
strncpy(report->error, msg, sizeof(report->error) - 1u);
}
static bool amduatd_fed_pull_apply_record(
amduat_asl_store_t *store,
const amduatd_fed_pull_transport_t *transport,
const amduat_fed_record_t *record,
size_t *io_artifact_count,
int *out_remote_status,
char *err_buf,
size_t err_cap) {
int status = 0;
amduat_octets_t bytes = amduat_octets(NULL, 0u);
amduat_artifact_t artifact;
amduat_reference_t stored_ref;
amduat_asl_index_state_t state;
amduat_asl_store_error_t store_err;
amduat_type_tag_t type_tag = amduat_type_tag(0u);
bool has_tag = false;
char *body = NULL;
if (out_remote_status != NULL) {
*out_remote_status = 0;
}
if (record->id.type == AMDUAT_FED_REC_TOMBSTONE) {
store_err = amduat_asl_store_tombstone(store,
record->id.ref,
0u,
0u,
&state);
if (store_err != AMDUAT_ASL_STORE_OK) {
snprintf(err_buf, err_cap, "tombstone failed");
return false;
}
return true;
}
if (transport == NULL || transport->get_artifact == NULL) {
snprintf(err_buf, err_cap, "missing artifact transport");
return false;
}
if (!transport->get_artifact(transport->ctx,
record->id.ref,
&status,
&bytes,
&body)) {
snprintf(err_buf, err_cap, "artifact fetch failed");
free(body);
return false;
}
if (status != 200) {
if (out_remote_status != NULL) {
*out_remote_status = status;
}
snprintf(err_buf,
err_cap,
"artifact fetch status %d",
status);
free(body);
amduat_octets_free(&bytes);
return false;
}
free(body);
if (record->id.type == AMDUAT_FED_REC_TGK_EDGE) {
type_tag = amduat_type_tag(AMDUAT_TYPE_TAG_TGK1_EDGE_V1);
has_tag = true;
} else if (record->id.type == AMDUAT_FED_REC_PER) {
type_tag = amduat_type_tag(AMDUAT_TYPE_TAG_FER1_RECEIPT_1);
has_tag = true;
}
if (!amduat_asl_artifact_from_bytes(bytes,
AMDUAT_ASL_IO_RAW,
has_tag,
type_tag,
&artifact)) {
amduat_octets_free(&bytes);
snprintf(err_buf, err_cap, "artifact decode failed");
return false;
}
bytes = amduat_octets(NULL, 0u);
store_err = amduat_asl_store_put_indexed(store,
artifact,
&stored_ref,
&state);
amduat_asl_artifact_free(&artifact);
amduat_octets_free(&bytes);
if (store_err != AMDUAT_ASL_STORE_OK) {
snprintf(err_buf, err_cap, "artifact store failed");
return false;
}
amduat_reference_free(&stored_ref);
if (io_artifact_count != NULL) {
*io_artifact_count += 1u;
}
return true;
}
amduatd_fed_pull_apply_status_t amduatd_fed_pull_apply(
amduat_asl_store_t *store,
amduat_asl_pointer_store_t *pointer_store,
const amduatd_space_t *effective_space,
const char *peer_key,
uint64_t limit,
const amduatd_fed_cfg_t *fed_cfg,
const amduatd_fed_pull_transport_t *transport,
amduatd_fed_pull_apply_report_t *out_report) {
uint32_t domain_id = 0u;
amduatd_fed_cursor_record_t cursor;
amduat_reference_t cursor_ref;
bool cursor_present = false;
amduat_fed_record_t *records = NULL;
size_t record_len = 0;
size_t record_len_total = 0;
int remote_status = 0;
char *remote_body = NULL;
amduatd_fed_cursor_candidate_t candidate;
amduatd_fed_cursor_record_t next_cursor;
amduat_reference_t next_ref;
size_t i;
size_t applied_records = 0u;
size_t applied_artifacts = 0u;
char err_buf[128];
if (out_report == NULL) {
return AMDUATD_FED_PULL_APPLY_ERR_INVALID;
}
amduatd_fed_pull_apply_report_init(out_report);
out_report->peer_key = peer_key;
out_report->effective_space = effective_space;
out_report->limit = limit;
if (store == NULL || pointer_store == NULL || peer_key == NULL ||
fed_cfg == NULL || transport == NULL) {
amduatd_fed_pull_report_error(out_report, "invalid inputs");
return AMDUATD_FED_PULL_APPLY_ERR_INVALID;
}
if (!fed_cfg->enabled) {
amduatd_fed_pull_report_error(out_report, "federation disabled");
return AMDUATD_FED_PULL_APPLY_ERR_DISABLED;
}
if (store->ops.log_scan == NULL || store->ops.current_state == NULL ||
store->ops.put_indexed == NULL || store->ops.tombstone == NULL) {
amduatd_fed_pull_report_error(out_report, "requires index backend");
return AMDUATD_FED_PULL_APPLY_ERR_UNSUPPORTED;
}
{
amduat_octets_t scoped = amduat_octets(NULL, 0u);
if (!amduatd_fed_cursor_pointer_name(effective_space,
peer_key,
&scoped)) {
amduatd_fed_pull_report_error(out_report, "invalid peer");
return AMDUATD_FED_PULL_APPLY_ERR_INVALID;
}
amduat_octets_free(&scoped);
}
if (!amduatd_fed_pull_parse_u32(peer_key, &domain_id)) {
amduatd_fed_pull_report_error(out_report, "invalid peer");
return AMDUATD_FED_PULL_APPLY_ERR_INVALID;
}
if (transport->get_records == NULL || transport->free_records == NULL ||
transport->get_artifact == NULL) {
amduatd_fed_pull_report_error(out_report, "transport unavailable");
return AMDUATD_FED_PULL_APPLY_ERR_UNSUPPORTED;
}
amduatd_fed_cursor_record_init(&cursor);
memset(&cursor_ref, 0, sizeof(cursor_ref));
{
amduatd_fed_cursor_status_t cursor_status;
cursor_status = amduatd_fed_cursor_get(store,
pointer_store,
effective_space,
peer_key,
&cursor,
&cursor_ref);
if (cursor_status == AMDUATD_FED_CURSOR_ERR_NOT_FOUND) {
cursor_present = false;
} else if (cursor_status == AMDUATD_FED_CURSOR_OK) {
cursor_present = true;
} else {
amduatd_fed_cursor_record_free(&cursor);
amduat_reference_free(&cursor_ref);
amduatd_fed_pull_report_error(out_report, "cursor read failed");
return AMDUATD_FED_PULL_APPLY_ERR_STORE;
}
}
if (cursor_present && cursor.has_logseq &&
cursor.last_logseq == UINT64_MAX) {
amduatd_fed_cursor_record_free(&cursor);
amduat_reference_free(&cursor_ref);
amduatd_fed_pull_report_error(out_report, "cursor overflow");
return AMDUATD_FED_PULL_APPLY_ERR_INVALID;
}
if (!transport->get_records(transport->ctx,
domain_id,
cursor_present && cursor.has_logseq
? cursor.last_logseq + 1u
: 0u,
limit,
&remote_status,
&records,
&record_len,
&remote_body)) {
amduatd_fed_cursor_record_free(&cursor);
amduat_reference_free(&cursor_ref);
amduatd_fed_pull_report_error(out_report, "remote fetch failed");
return AMDUATD_FED_PULL_APPLY_ERR_REMOTE;
}
out_report->remote_status = remote_status;
if (remote_status != 200) {
amduatd_fed_cursor_record_free(&cursor);
amduat_reference_free(&cursor_ref);
if (remote_body != NULL && remote_body[0] != '\0') {
amduatd_fed_pull_report_error(out_report, remote_body);
} else {
amduatd_fed_pull_report_error(out_report, "remote error");
}
free(remote_body);
return AMDUATD_FED_PULL_APPLY_ERR_REMOTE;
}
free(remote_body);
remote_body = NULL;
record_len_total = record_len;
if (record_len > limit) {
record_len = (size_t)limit;
}
out_report->cursor_present = cursor_present;
if (cursor_present && cursor.has_logseq) {
out_report->cursor_has_logseq = true;
out_report->cursor_logseq = cursor.last_logseq;
}
if (cursor_present) {
if (amduat_reference_clone(cursor_ref, &out_report->cursor_ref)) {
out_report->cursor_ref_set = true;
}
}
if (!amduatd_fed_pull_plan_next_cursor_candidate(cursor_present ? &cursor
: NULL,
records,
record_len,
&candidate)) {
if (records != NULL) {
transport->free_records(transport->ctx, records, record_len_total);
}
amduatd_fed_cursor_record_free(&cursor);
amduat_reference_free(&cursor_ref);
amduatd_fed_pull_report_error(out_report, "plan candidate failed");
return AMDUATD_FED_PULL_APPLY_ERR_INVALID;
}
out_report->plan_record_count = record_len;
out_report->plan_candidate = candidate;
if (record_len == 0u) {
if (records != NULL) {
transport->free_records(transport->ctx, records, record_len_total);
}
amduatd_fed_cursor_record_free(&cursor);
amduat_reference_free(&cursor_ref);
return AMDUATD_FED_PULL_APPLY_OK;
}
{
size_t err_index = 0;
size_t conflict_index = 0;
amduat_fed_ingest_error_t ingest_rc;
ingest_rc = amduat_fed_ingest_validate(records,
record_len,
&err_index,
&conflict_index);
if (ingest_rc != AMDUAT_FED_INGEST_OK) {
transport->free_records(transport->ctx, records, record_len_total);
amduatd_fed_cursor_record_free(&cursor);
amduat_reference_free(&cursor_ref);
amduatd_fed_pull_report_error(out_report, "invalid record batch");
return AMDUATD_FED_PULL_APPLY_ERR_INVALID;
}
}
if (cursor_present && cursor.has_logseq &&
records[0].logseq <= cursor.last_logseq) {
transport->free_records(transport->ctx, records, record_len_total);
amduatd_fed_cursor_record_free(&cursor);
amduat_reference_free(&cursor_ref);
amduatd_fed_pull_report_error(out_report, "cursor would move backwards");
return AMDUATD_FED_PULL_APPLY_ERR_INVALID;
}
for (i = 0; i < record_len; ++i) {
int artifact_status = 0;
if (i > 0 && records[i].logseq < records[i - 1].logseq) {
transport->free_records(transport->ctx, records, record_len_total);
amduatd_fed_cursor_record_free(&cursor);
amduat_reference_free(&cursor_ref);
amduatd_fed_pull_report_error(out_report, "record order invalid");
return AMDUATD_FED_PULL_APPLY_ERR_INVALID;
}
memset(err_buf, 0, sizeof(err_buf));
if (!amduatd_fed_pull_apply_record(store,
transport,
&records[i],
&applied_artifacts,
&artifact_status,
err_buf,
sizeof(err_buf))) {
transport->free_records(transport->ctx, records, record_len_total);
amduatd_fed_cursor_record_free(&cursor);
amduat_reference_free(&cursor_ref);
applied_records = i;
out_report->applied_record_count = applied_records;
out_report->applied_artifact_count = applied_artifacts;
if (artifact_status != 0) {
out_report->remote_status = artifact_status;
}
amduatd_fed_pull_report_error(out_report, err_buf);
return AMDUATD_FED_PULL_APPLY_ERR_STORE;
}
applied_records++;
}
out_report->applied_record_count = applied_records;
out_report->applied_artifact_count = applied_artifacts;
amduatd_fed_cursor_record_init(&next_cursor);
if (!amduatd_fed_pull_strdup(peer_key, &next_cursor.peer_key)) {
transport->free_records(transport->ctx, records, record_len_total);
amduatd_fed_cursor_record_free(&cursor);
amduat_reference_free(&cursor_ref);
amduatd_fed_cursor_record_free(&next_cursor);
amduatd_fed_pull_report_error(out_report, "oom");
return AMDUATD_FED_PULL_APPLY_ERR_OOM;
}
if (next_cursor.peer_key == NULL) {
transport->free_records(transport->ctx, records, record_len_total);
amduatd_fed_cursor_record_free(&cursor);
amduat_reference_free(&cursor_ref);
amduatd_fed_pull_report_error(out_report, "oom");
return AMDUATD_FED_PULL_APPLY_ERR_OOM;
}
if (effective_space != NULL && effective_space->enabled &&
effective_space->space_id.data != NULL) {
if (!amduatd_fed_pull_strdup(
(const char *)effective_space->space_id.data,
&next_cursor.space_id)) {
transport->free_records(transport->ctx, records, record_len_total);
amduatd_fed_cursor_record_free(&cursor);
amduat_reference_free(&cursor_ref);
amduatd_fed_cursor_record_free(&next_cursor);
amduatd_fed_pull_report_error(out_report, "oom");
return AMDUATD_FED_PULL_APPLY_ERR_OOM;
}
if (next_cursor.space_id == NULL) {
transport->free_records(transport->ctx, records, record_len_total);
amduatd_fed_cursor_record_free(&cursor);
amduat_reference_free(&cursor_ref);
amduatd_fed_cursor_record_free(&next_cursor);
amduatd_fed_pull_report_error(out_report, "oom");
return AMDUATD_FED_PULL_APPLY_ERR_OOM;
}
}
if (candidate.has_logseq) {
next_cursor.has_logseq = true;
next_cursor.last_logseq = candidate.logseq;
}
if (candidate.has_ref) {
next_cursor.has_record_ref = true;
if (!amduat_reference_clone(candidate.ref,
&next_cursor.last_record_ref)) {
transport->free_records(transport->ctx, records, record_len_total);
amduatd_fed_cursor_record_free(&cursor);
amduat_reference_free(&cursor_ref);
amduatd_fed_cursor_record_free(&next_cursor);
amduatd_fed_pull_report_error(out_report, "oom");
return AMDUATD_FED_PULL_APPLY_ERR_OOM;
}
}
memset(&next_ref, 0, sizeof(next_ref));
{
amduatd_fed_cursor_status_t cursor_status;
cursor_status = amduatd_fed_cursor_cas_set(store,
pointer_store,
effective_space,
peer_key,
cursor_present ? &cursor_ref
: NULL,
&next_cursor,
&next_ref);
if (cursor_status == AMDUATD_FED_CURSOR_ERR_CONFLICT) {
transport->free_records(transport->ctx, records, record_len_total);
amduatd_fed_cursor_record_free(&cursor);
amduat_reference_free(&cursor_ref);
amduatd_fed_cursor_record_free(&next_cursor);
amduat_reference_free(&next_ref);
amduatd_fed_pull_report_error(out_report, "cursor conflict");
return AMDUATD_FED_PULL_APPLY_ERR_CONFLICT;
}
if (cursor_status != AMDUATD_FED_CURSOR_OK) {
transport->free_records(transport->ctx, records, record_len_total);
amduatd_fed_cursor_record_free(&cursor);
amduat_reference_free(&cursor_ref);
amduatd_fed_cursor_record_free(&next_cursor);
amduat_reference_free(&next_ref);
amduatd_fed_pull_report_error(out_report, "cursor update failed");
return AMDUATD_FED_PULL_APPLY_ERR_STORE;
}
}
out_report->cursor_advanced = true;
if (candidate.has_logseq) {
out_report->cursor_after_has_logseq = true;
out_report->cursor_after_logseq = candidate.logseq;
}
if (amduat_reference_clone(next_ref, &out_report->cursor_after_ref)) {
out_report->cursor_after_ref_set = true;
}
transport->free_records(transport->ctx, records, record_len_total);
amduatd_fed_cursor_record_free(&cursor);
amduat_reference_free(&cursor_ref);
amduatd_fed_cursor_record_free(&next_cursor);
amduat_reference_free(&next_ref);
return AMDUATD_FED_PULL_APPLY_OK;
}

View file

@ -0,0 +1,90 @@
#ifndef AMDUATD_FED_PULL_APPLY_H
#define AMDUATD_FED_PULL_APPLY_H
#include "amduat/asl/core.h"
#include "amduat/fed/replay.h"
#include "amduatd_fed.h"
#include "amduatd_fed_cursor.h"
#include "amduatd_fed_pull_plan.h"
#include "amduatd_space.h"
#include <stdbool.h>
#include <stddef.h>
#include <stdint.h>
#ifdef __cplusplus
extern "C" {
#endif
typedef struct {
void *ctx;
bool (*get_records)(void *ctx,
uint32_t domain_id,
uint64_t from_logseq,
uint64_t limit,
int *out_status,
amduat_fed_record_t **out_records,
size_t *out_len,
char **out_body);
void (*free_records)(void *ctx, amduat_fed_record_t *records, size_t len);
bool (*get_artifact)(void *ctx,
amduat_reference_t ref,
int *out_status,
amduat_octets_t *out_bytes,
char **out_body);
} amduatd_fed_pull_transport_t;
typedef enum {
AMDUATD_FED_PULL_APPLY_OK = 0,
AMDUATD_FED_PULL_APPLY_ERR_INVALID = 1,
AMDUATD_FED_PULL_APPLY_ERR_DISABLED = 2,
AMDUATD_FED_PULL_APPLY_ERR_UNSUPPORTED = 3,
AMDUATD_FED_PULL_APPLY_ERR_REMOTE = 4,
AMDUATD_FED_PULL_APPLY_ERR_STORE = 5,
AMDUATD_FED_PULL_APPLY_ERR_CONFLICT = 6,
AMDUATD_FED_PULL_APPLY_ERR_OOM = 7
} amduatd_fed_pull_apply_status_t;
typedef struct {
const char *peer_key;
const amduatd_space_t *effective_space;
uint64_t limit;
bool cursor_present;
bool cursor_has_logseq;
uint64_t cursor_logseq;
bool cursor_ref_set;
amduat_reference_t cursor_ref;
size_t plan_record_count;
amduatd_fed_cursor_candidate_t plan_candidate;
size_t applied_record_count;
size_t applied_artifact_count;
bool cursor_advanced;
bool cursor_after_has_logseq;
uint64_t cursor_after_logseq;
bool cursor_after_ref_set;
amduat_reference_t cursor_after_ref;
int remote_status;
char error[256];
} amduatd_fed_pull_apply_report_t;
void amduatd_fed_pull_apply_report_init(
amduatd_fed_pull_apply_report_t *report);
void amduatd_fed_pull_apply_report_free(
amduatd_fed_pull_apply_report_t *report);
amduatd_fed_pull_apply_status_t amduatd_fed_pull_apply(
amduat_asl_store_t *store,
amduat_asl_pointer_store_t *pointer_store,
const amduatd_space_t *effective_space,
const char *peer_key,
uint64_t limit,
const amduatd_fed_cfg_t *fed_cfg,
const amduatd_fed_pull_transport_t *transport,
amduatd_fed_pull_apply_report_t *out_report);
#ifdef __cplusplus
} /* extern "C" */
#endif
#endif /* AMDUATD_FED_PULL_APPLY_H */

477
src/amduatd_fed_pull_plan.c Normal file
View file

@ -0,0 +1,477 @@
#include "amduatd_fed_pull_plan.h"
#include "amduat/asl/ref_text.h"
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
typedef struct {
char *data;
size_t len;
size_t cap;
} amduatd_fed_plan_strbuf_t;
static void amduatd_fed_plan_strbuf_free(amduatd_fed_plan_strbuf_t *b) {
if (b == NULL) {
return;
}
free(b->data);
b->data = NULL;
b->len = 0;
b->cap = 0;
}
static bool amduatd_fed_plan_strbuf_reserve(amduatd_fed_plan_strbuf_t *b,
size_t extra) {
size_t need;
size_t next_cap;
char *next;
if (b == NULL) {
return false;
}
if (extra > (SIZE_MAX - b->len)) {
return false;
}
need = b->len + extra;
if (need <= b->cap) {
return true;
}
next_cap = b->cap != 0 ? b->cap : 256u;
while (next_cap < need) {
if (next_cap > (SIZE_MAX / 2u)) {
next_cap = need;
break;
}
next_cap *= 2u;
}
next = (char *)realloc(b->data, next_cap);
if (next == NULL) {
return false;
}
b->data = next;
b->cap = next_cap;
return true;
}
static bool amduatd_fed_plan_strbuf_append(amduatd_fed_plan_strbuf_t *b,
const char *s,
size_t n) {
if (b == NULL) {
return false;
}
if (n == 0u) {
return true;
}
if (s == NULL) {
return false;
}
if (!amduatd_fed_plan_strbuf_reserve(b, n + 1u)) {
return false;
}
memcpy(b->data + b->len, s, n);
b->len += n;
b->data[b->len] = '\0';
return true;
}
static bool amduatd_fed_plan_strbuf_append_cstr(amduatd_fed_plan_strbuf_t *b,
const char *s) {
return amduatd_fed_plan_strbuf_append(
b, s != NULL ? s : "", s != NULL ? strlen(s) : 0u);
}
static const char *amduatd_fed_plan_record_type_name(
amduat_fed_record_type_t type) {
switch (type) {
case AMDUAT_FED_REC_ARTIFACT:
return "artifact";
case AMDUAT_FED_REC_PER:
return "per";
case AMDUAT_FED_REC_TGK_EDGE:
return "tgk_edge";
case AMDUAT_FED_REC_TOMBSTONE:
return "tombstone";
default:
return "unknown";
}
}
void amduatd_fed_pull_plan_candidate_init(
amduatd_fed_cursor_candidate_t *candidate) {
if (candidate == NULL) {
return;
}
memset(candidate, 0, sizeof(*candidate));
candidate->ref = amduat_reference(0u, amduat_octets(NULL, 0u));
}
void amduatd_fed_pull_plan_candidate_free(
amduatd_fed_cursor_candidate_t *candidate) {
if (candidate == NULL) {
return;
}
if (candidate->has_ref) {
amduat_reference_free(&candidate->ref);
}
memset(candidate, 0, sizeof(*candidate));
}
bool amduatd_fed_pull_plan_next_cursor_candidate(
const amduatd_fed_cursor_record_t *cursor,
const amduat_fed_record_t *records,
size_t record_count,
amduatd_fed_cursor_candidate_t *out_candidate) {
if (out_candidate == NULL) {
return false;
}
amduatd_fed_pull_plan_candidate_init(out_candidate);
if (record_count > 0u && records != NULL) {
const amduat_fed_record_t *last = &records[record_count - 1u];
out_candidate->has_logseq = true;
out_candidate->logseq = last->logseq;
out_candidate->has_ref = true;
if (!amduat_reference_clone(last->id.ref, &out_candidate->ref)) {
amduatd_fed_pull_plan_candidate_free(out_candidate);
return false;
}
return true;
}
if (cursor != NULL) {
if (cursor->has_logseq) {
out_candidate->has_logseq = true;
out_candidate->logseq = cursor->last_logseq;
}
if (cursor->has_record_ref) {
out_candidate->has_ref = true;
if (!amduat_reference_clone(cursor->last_record_ref,
&out_candidate->ref)) {
amduatd_fed_pull_plan_candidate_free(out_candidate);
return false;
}
}
}
return true;
}
amduatd_fed_pull_plan_status_t amduatd_fed_pull_plan_check(
const amduatd_fed_cfg_t *cfg,
const amduat_asl_store_t *store) {
if (cfg == NULL || store == NULL) {
return AMDUATD_FED_PULL_PLAN_ERR_INVALID;
}
if (!cfg->enabled) {
return AMDUATD_FED_PULL_PLAN_ERR_DISABLED;
}
if (store->ops.log_scan == NULL || store->ops.current_state == NULL) {
return AMDUATD_FED_PULL_PLAN_ERR_UNSUPPORTED;
}
return AMDUATD_FED_PULL_PLAN_OK;
}
amduatd_fed_pull_plan_status_t amduatd_fed_pull_plan_json(
const amduatd_fed_pull_plan_input_t *input,
char **out_json) {
amduatd_fed_plan_strbuf_t b;
size_t i;
const amduat_fed_record_t *first = NULL;
const amduat_fed_record_t *last = NULL;
amduatd_fed_cursor_candidate_t candidate;
char *ref_hex = NULL;
char *cursor_ref_hex = NULL;
char tmp[64];
if (out_json != NULL) {
*out_json = NULL;
}
if (input == NULL || out_json == NULL || input->peer_key == NULL) {
return AMDUATD_FED_PULL_PLAN_ERR_INVALID;
}
if (input->cursor_present &&
(input->cursor == NULL || input->cursor_ref == NULL)) {
return AMDUATD_FED_PULL_PLAN_ERR_INVALID;
}
if (input->record_count > 0u && input->records != NULL) {
first = &input->records[0];
last = &input->records[input->record_count - 1u];
}
if (!amduatd_fed_pull_plan_next_cursor_candidate(
input->cursor_present ? input->cursor : NULL,
input->records,
input->record_count,
&candidate)) {
return AMDUATD_FED_PULL_PLAN_ERR_OOM;
}
if (input->cursor_present &&
input->cursor_ref != NULL &&
input->cursor_ref->digest.data != NULL) {
if (!amduat_asl_ref_encode_hex(*input->cursor_ref, &cursor_ref_hex)) {
return AMDUATD_FED_PULL_PLAN_ERR_OOM;
}
}
memset(&b, 0, sizeof(b));
if (!amduatd_fed_plan_strbuf_append_cstr(&b, "{")) {
goto plan_oom;
}
if (!amduatd_fed_plan_strbuf_append_cstr(&b, "\"peer\":\"") ||
!amduatd_fed_plan_strbuf_append_cstr(&b, input->peer_key) ||
!amduatd_fed_plan_strbuf_append_cstr(&b, "\",")) {
goto plan_oom;
}
if (!amduatd_fed_plan_strbuf_append_cstr(&b, "\"effective_space\":{")) {
goto plan_oom;
}
if (input->effective_space != NULL &&
input->effective_space->enabled &&
input->effective_space->space_id.data != NULL) {
const char *space_id = (const char *)input->effective_space->space_id.data;
if (!amduatd_fed_plan_strbuf_append_cstr(&b, "\"mode\":\"scoped\",") ||
!amduatd_fed_plan_strbuf_append_cstr(&b, "\"space_id\":\"") ||
!amduatd_fed_plan_strbuf_append_cstr(&b, space_id) ||
!amduatd_fed_plan_strbuf_append_cstr(&b, "\"")) {
goto plan_oom;
}
} else {
if (!amduatd_fed_plan_strbuf_append_cstr(&b, "\"mode\":\"unscoped\",") ||
!amduatd_fed_plan_strbuf_append_cstr(&b, "\"space_id\":null")) {
goto plan_oom;
}
}
if (!amduatd_fed_plan_strbuf_append_cstr(&b, "},")) {
goto plan_oom;
}
if (!amduatd_fed_plan_strbuf_append_cstr(&b, "\"cursor\":{")) {
goto plan_oom;
}
if (!amduatd_fed_plan_strbuf_append_cstr(&b, "\"present\":") ||
!amduatd_fed_plan_strbuf_append_cstr(&b,
input->cursor_present ? "true"
: "false")) {
goto plan_oom;
}
if (!amduatd_fed_plan_strbuf_append_cstr(&b, ",\"last_logseq\":")) {
goto plan_oom;
}
if (input->cursor_present && input->cursor != NULL &&
input->cursor->has_logseq) {
snprintf(tmp, sizeof(tmp), "%llu",
(unsigned long long)input->cursor->last_logseq);
if (!amduatd_fed_plan_strbuf_append_cstr(&b, tmp)) {
goto plan_oom;
}
} else {
if (!amduatd_fed_plan_strbuf_append_cstr(&b, "null")) {
goto plan_oom;
}
}
if (!amduatd_fed_plan_strbuf_append_cstr(&b, ",\"last_record_hash\":")) {
goto plan_oom;
}
if (input->cursor_present && input->cursor != NULL &&
input->cursor->has_record_ref) {
if (!amduat_asl_ref_encode_hex(input->cursor->last_record_ref, &ref_hex)) {
goto plan_oom;
}
if (!amduatd_fed_plan_strbuf_append_cstr(&b, "\"") ||
!amduatd_fed_plan_strbuf_append_cstr(&b, ref_hex) ||
!amduatd_fed_plan_strbuf_append_cstr(&b, "\"")) {
goto plan_oom;
}
free(ref_hex);
ref_hex = NULL;
} else {
if (!amduatd_fed_plan_strbuf_append_cstr(&b, "null")) {
goto plan_oom;
}
}
if (!amduatd_fed_plan_strbuf_append_cstr(&b, ",\"cursor_ref\":")) {
goto plan_oom;
}
if (cursor_ref_hex != NULL) {
if (!amduatd_fed_plan_strbuf_append_cstr(&b, "\"") ||
!amduatd_fed_plan_strbuf_append_cstr(&b, cursor_ref_hex) ||
!amduatd_fed_plan_strbuf_append_cstr(&b, "\"")) {
goto plan_oom;
}
} else {
if (!amduatd_fed_plan_strbuf_append_cstr(&b, "null")) {
goto plan_oom;
}
}
if (!amduatd_fed_plan_strbuf_append_cstr(&b, "},")) {
goto plan_oom;
}
if (!amduatd_fed_plan_strbuf_append_cstr(&b, "\"remote_scan\":{")) {
goto plan_oom;
}
snprintf(tmp, sizeof(tmp), "%zu", input->record_count);
if (!amduatd_fed_plan_strbuf_append_cstr(&b, "\"record_count\":") ||
!amduatd_fed_plan_strbuf_append_cstr(&b, tmp)) {
goto plan_oom;
}
if (!amduatd_fed_plan_strbuf_append_cstr(&b, ",\"first_logseq\":")) {
goto plan_oom;
}
if (first != NULL) {
snprintf(tmp, sizeof(tmp), "%llu", (unsigned long long)first->logseq);
if (!amduatd_fed_plan_strbuf_append_cstr(&b, tmp)) {
goto plan_oom;
}
} else {
if (!amduatd_fed_plan_strbuf_append_cstr(&b, "null")) {
goto plan_oom;
}
}
if (!amduatd_fed_plan_strbuf_append_cstr(&b, ",\"first_record_hash\":")) {
goto plan_oom;
}
if (first != NULL) {
if (!amduat_asl_ref_encode_hex(first->id.ref, &ref_hex)) {
goto plan_oom;
}
if (!amduatd_fed_plan_strbuf_append_cstr(&b, "\"") ||
!amduatd_fed_plan_strbuf_append_cstr(&b, ref_hex) ||
!amduatd_fed_plan_strbuf_append_cstr(&b, "\"")) {
goto plan_oom;
}
free(ref_hex);
ref_hex = NULL;
} else {
if (!amduatd_fed_plan_strbuf_append_cstr(&b, "null")) {
goto plan_oom;
}
}
if (!amduatd_fed_plan_strbuf_append_cstr(&b, ",\"last_logseq\":")) {
goto plan_oom;
}
if (last != NULL) {
snprintf(tmp, sizeof(tmp), "%llu", (unsigned long long)last->logseq);
if (!amduatd_fed_plan_strbuf_append_cstr(&b, tmp)) {
goto plan_oom;
}
} else {
if (!amduatd_fed_plan_strbuf_append_cstr(&b, "null")) {
goto plan_oom;
}
}
if (!amduatd_fed_plan_strbuf_append_cstr(&b, ",\"last_record_hash\":")) {
goto plan_oom;
}
if (last != NULL) {
if (!amduat_asl_ref_encode_hex(last->id.ref, &ref_hex)) {
goto plan_oom;
}
if (!amduatd_fed_plan_strbuf_append_cstr(&b, "\"") ||
!amduatd_fed_plan_strbuf_append_cstr(&b, ref_hex) ||
!amduatd_fed_plan_strbuf_append_cstr(&b, "\"")) {
goto plan_oom;
}
free(ref_hex);
ref_hex = NULL;
} else {
if (!amduatd_fed_plan_strbuf_append_cstr(&b, "null")) {
goto plan_oom;
}
}
if (!amduatd_fed_plan_strbuf_append_cstr(&b, "},")) {
goto plan_oom;
}
if (!amduatd_fed_plan_strbuf_append_cstr(&b, "\"records\":[")) {
goto plan_oom;
}
for (i = 0; i < input->record_count; ++i) {
const amduat_fed_record_t *rec = &input->records[i];
const char *type_name = amduatd_fed_plan_record_type_name(rec->id.type);
if (i > 0) {
if (!amduatd_fed_plan_strbuf_append_cstr(&b, ",")) {
goto plan_oom;
}
}
if (!amduat_asl_ref_encode_hex(rec->id.ref, &ref_hex)) {
goto plan_oom;
}
if (!amduatd_fed_plan_strbuf_append_cstr(&b, "{\"logseq\":")) {
goto plan_oom;
}
snprintf(tmp, sizeof(tmp), "%llu", (unsigned long long)rec->logseq);
if (!amduatd_fed_plan_strbuf_append_cstr(&b, tmp) ||
!amduatd_fed_plan_strbuf_append_cstr(&b, ",\"type\":\"") ||
!amduatd_fed_plan_strbuf_append_cstr(&b, type_name) ||
!amduatd_fed_plan_strbuf_append_cstr(&b, "\",\"ref\":\"") ||
!amduatd_fed_plan_strbuf_append_cstr(&b, ref_hex) ||
!amduatd_fed_plan_strbuf_append_cstr(&b, "\"}")) {
goto plan_oom;
}
free(ref_hex);
ref_hex = NULL;
}
if (!amduatd_fed_plan_strbuf_append_cstr(&b, "],")) {
goto plan_oom;
}
if (!amduatd_fed_plan_strbuf_append_cstr(
&b,
"\"required_artifacts_status\":\"unknown\","
"\"required_artifacts\":[],"
"\"next_cursor_candidate\":{")) {
goto plan_oom;
}
if (!amduatd_fed_plan_strbuf_append_cstr(&b, "\"last_logseq\":")) {
goto plan_oom;
}
if (candidate.has_logseq) {
snprintf(tmp, sizeof(tmp), "%llu", (unsigned long long)candidate.logseq);
if (!amduatd_fed_plan_strbuf_append_cstr(&b, tmp)) {
goto plan_oom;
}
} else {
if (!amduatd_fed_plan_strbuf_append_cstr(&b, "null")) {
goto plan_oom;
}
}
if (!amduatd_fed_plan_strbuf_append_cstr(&b, ",\"last_record_hash\":")) {
goto plan_oom;
}
if (candidate.has_ref) {
if (!amduat_asl_ref_encode_hex(candidate.ref, &ref_hex)) {
goto plan_oom;
}
if (!amduatd_fed_plan_strbuf_append_cstr(&b, "\"") ||
!amduatd_fed_plan_strbuf_append_cstr(&b, ref_hex) ||
!amduatd_fed_plan_strbuf_append_cstr(&b, "\"")) {
goto plan_oom;
}
free(ref_hex);
ref_hex = NULL;
} else {
if (!amduatd_fed_plan_strbuf_append_cstr(&b, "null")) {
goto plan_oom;
}
}
if (!amduatd_fed_plan_strbuf_append_cstr(&b, "}}\n")) {
goto plan_oom;
}
amduatd_fed_pull_plan_candidate_free(&candidate);
free(cursor_ref_hex);
*out_json = b.data;
return AMDUATD_FED_PULL_PLAN_OK;
plan_oom:
free(ref_hex);
amduatd_fed_pull_plan_candidate_free(&candidate);
free(cursor_ref_hex);
amduatd_fed_plan_strbuf_free(&b);
return AMDUATD_FED_PULL_PLAN_ERR_OOM;
}

View file

@ -0,0 +1,65 @@
#ifndef AMDUATD_FED_PULL_PLAN_H
#define AMDUATD_FED_PULL_PLAN_H
#include "amduat/fed/replay.h"
#include "amduatd_fed.h"
#include "amduatd_fed_cursor.h"
#include "amduatd_space.h"
#include <stdbool.h>
#include <stddef.h>
#ifdef __cplusplus
extern "C" {
#endif
typedef enum {
AMDUATD_FED_PULL_PLAN_OK = 0,
AMDUATD_FED_PULL_PLAN_ERR_INVALID = 1,
AMDUATD_FED_PULL_PLAN_ERR_DISABLED = 2,
AMDUATD_FED_PULL_PLAN_ERR_UNSUPPORTED = 3,
AMDUATD_FED_PULL_PLAN_ERR_OOM = 4
} amduatd_fed_pull_plan_status_t;
typedef struct {
const char *peer_key;
const amduatd_space_t *effective_space;
bool cursor_present;
const amduatd_fed_cursor_record_t *cursor;
const amduat_reference_t *cursor_ref;
const amduat_fed_record_t *records;
size_t record_count;
} amduatd_fed_pull_plan_input_t;
typedef struct {
bool has_logseq;
uint64_t logseq;
bool has_ref;
amduat_reference_t ref;
} amduatd_fed_cursor_candidate_t;
amduatd_fed_pull_plan_status_t amduatd_fed_pull_plan_check(
const amduatd_fed_cfg_t *cfg,
const amduat_asl_store_t *store);
void amduatd_fed_pull_plan_candidate_init(
amduatd_fed_cursor_candidate_t *candidate);
void amduatd_fed_pull_plan_candidate_free(
amduatd_fed_cursor_candidate_t *candidate);
bool amduatd_fed_pull_plan_next_cursor_candidate(
const amduatd_fed_cursor_record_t *cursor,
const amduat_fed_record_t *records,
size_t record_count,
amduatd_fed_cursor_candidate_t *out_candidate);
amduatd_fed_pull_plan_status_t amduatd_fed_pull_plan_json(
const amduatd_fed_pull_plan_input_t *input,
char **out_json);
#ifdef __cplusplus
} /* extern "C" */
#endif
#endif /* AMDUATD_FED_PULL_PLAN_H */

View file

@ -0,0 +1,204 @@
#ifndef _POSIX_C_SOURCE
#define _POSIX_C_SOURCE 200809L
#endif
#include "amduatd_fed_cursor.h"
#include "amduatd_store.h"
#include "amduat/asl/asl_store_fs_meta.h"
#include "amduat/hash/asl1.h"
#include <stdbool.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
static int failures = 0;
static void expect(bool cond, const char *msg) {
if (!cond) {
fprintf(stderr, "FAIL: %s\n", msg);
failures++;
}
}
static char *amduatd_test_make_temp_dir(void) {
char tmpl[] = "/tmp/amduatd-fed-cursor-XXXXXX";
char *dir = mkdtemp(tmpl);
size_t len;
char *copy;
if (dir == NULL) {
perror("mkdtemp");
return NULL;
}
len = strlen(dir);
copy = (char *)malloc(len + 1u);
if (copy == NULL) {
fprintf(stderr, "failed to allocate temp dir copy\n");
return NULL;
}
memcpy(copy, dir, len + 1u);
return copy;
}
static bool amduatd_make_test_ref(uint8_t fill, amduat_reference_t *out_ref) {
uint8_t digest_bytes[32];
amduat_octets_t digest;
if (out_ref == NULL) {
return false;
}
memset(digest_bytes, fill, sizeof(digest_bytes));
if (!amduat_octets_clone(amduat_octets(digest_bytes, sizeof(digest_bytes)),
&digest)) {
return false;
}
*out_ref = amduat_reference(AMDUAT_HASH_ASL1_ID_SHA256, digest);
return true;
}
int main(void) {
char *root = amduatd_test_make_temp_dir();
amduat_asl_store_fs_config_t cfg;
amduatd_store_ctx_t store_ctx;
amduat_asl_store_t store;
amduat_asl_pointer_store_t pointer_store;
amduatd_space_t space;
amduatd_fed_cfg_t fed_cfg;
amduatd_fed_cursor_record_t cursor;
amduatd_fed_cursor_record_t fetched;
amduat_reference_t record_ref;
amduat_reference_t wrong_ref;
amduat_reference_t new_ref;
amduat_reference_t get_ref;
amduatd_fed_cursor_status_t status;
if (root == NULL) {
return 1;
}
memset(&cfg, 0, sizeof(cfg));
if (!amduat_asl_store_fs_init_root(root, NULL, &cfg)) {
fprintf(stderr, "failed to init store root\n");
free(root);
return 1;
}
memset(&store_ctx, 0, sizeof(store_ctx));
memset(&store, 0, sizeof(store));
if (!amduatd_store_init(&store,
&cfg,
&store_ctx,
root,
AMDUATD_STORE_BACKEND_FS)) {
fprintf(stderr, "failed to init store\n");
free(root);
return 1;
}
if (!amduat_asl_pointer_store_init(&pointer_store, root)) {
fprintf(stderr, "failed to init pointer store\n");
free(root);
return 1;
}
if (!amduatd_space_init(&space, "alpha", false)) {
fprintf(stderr, "failed to init space\n");
free(root);
return 1;
}
amduatd_fed_cfg_init(&fed_cfg);
expect(amduatd_fed_cursor_check_enabled(&fed_cfg) ==
AMDUATD_FED_CURSOR_ERR_DISABLED,
"disabled federation status");
amduatd_fed_cursor_record_init(&fetched);
memset(&get_ref, 0, sizeof(get_ref));
status = amduatd_fed_cursor_get(&store,
&pointer_store,
&space,
"peer-a",
&fetched,
&get_ref);
expect(status == AMDUATD_FED_CURSOR_ERR_NOT_FOUND, "empty cursor not found");
amduatd_fed_cursor_record_free(&fetched);
amduatd_fed_cursor_record_init(&cursor);
cursor.peer_key = strdup("peer-a");
cursor.space_id = strdup("alpha");
if (cursor.peer_key == NULL || cursor.space_id == NULL) {
fprintf(stderr, "failed to allocate cursor identifiers\n");
amduatd_fed_cursor_record_free(&cursor);
free(root);
return 1;
}
cursor.has_logseq = true;
cursor.last_logseq = 42u;
if (!amduatd_make_test_ref(0x11, &record_ref)) {
fprintf(stderr, "failed to make record ref\n");
amduatd_fed_cursor_record_free(&cursor);
free(root);
return 1;
}
cursor.has_record_ref = true;
cursor.last_record_ref = record_ref;
memset(&new_ref, 0, sizeof(new_ref));
status = amduatd_fed_cursor_cas_set(&store,
&pointer_store,
&space,
"peer-a",
NULL,
&cursor,
&new_ref);
expect(status == AMDUATD_FED_CURSOR_OK, "cursor set ok");
amduatd_fed_cursor_record_init(&fetched);
memset(&get_ref, 0, sizeof(get_ref));
status = amduatd_fed_cursor_get(&store,
&pointer_store,
&space,
"peer-a",
&fetched,
&get_ref);
expect(status == AMDUATD_FED_CURSOR_OK, "cursor get ok");
expect(strcmp(fetched.peer_key, "peer-a") == 0, "peer key match");
expect(fetched.space_id != NULL && strcmp(fetched.space_id, "alpha") == 0,
"space match");
expect(fetched.has_logseq && fetched.last_logseq == 42u, "logseq match");
expect(fetched.has_record_ref &&
amduat_reference_eq(fetched.last_record_ref, record_ref),
"record ref match");
expect(amduat_reference_eq(get_ref, new_ref), "pointer ref match");
amduatd_fed_cursor_record_free(&fetched);
amduat_reference_free(&get_ref);
if (!amduatd_make_test_ref(0x22, &wrong_ref)) {
fprintf(stderr, "failed to make wrong ref\n");
amduat_reference_free(&new_ref);
amduatd_fed_cursor_record_free(&cursor);
free(root);
return 1;
}
status = amduatd_fed_cursor_cas_set(&store,
&pointer_store,
&space,
"peer-a",
&wrong_ref,
&cursor,
NULL);
expect(status == AMDUATD_FED_CURSOR_ERR_CONFLICT, "cursor cas conflict");
amduat_reference_free(&wrong_ref);
amduatd_fed_cursor_record_free(&cursor);
amduat_reference_free(&new_ref);
{
amduat_octets_t name = amduat_octets(NULL, 0u);
bool ok = amduatd_fed_cursor_pointer_name(&space,
"bad peer",
&name);
expect(!ok, "peer validation rejects invalid key");
amduat_octets_free(&name);
}
free(root);
return failures == 0 ? 0 : 1;
}

View file

@ -0,0 +1,578 @@
#ifndef _POSIX_C_SOURCE
#define _POSIX_C_SOURCE 200809L
#endif
#include "amduatd_fed_pull_apply.h"
#include "amduatd_fed_cursor.h"
#include "amduatd_store.h"
#include "amduat/asl/artifact_io.h"
#include "amduat/asl/asl_store_fs_meta.h"
#include "amduat/asl/ref_derive.h"
#include "amduat/hash/asl1.h"
#include <stdbool.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
typedef struct {
amduat_fed_record_t *records;
size_t record_count;
amduat_octets_t *artifact_bytes;
bool fail_artifact;
size_t fail_index;
int fail_status;
bool mutate_cursor;
amduat_asl_store_t *store;
amduat_asl_pointer_store_t *pointer_store;
const amduatd_space_t *space;
const char *peer_key;
} amduatd_test_pull_transport_t;
static int failures = 0;
static void expect(bool cond, const char *msg) {
if (!cond) {
fprintf(stderr, "FAIL: %s\n", msg);
failures++;
}
}
static char *amduatd_test_make_temp_dir(void) {
char tmpl[] = "/tmp/amduatd-fed-pull-XXXXXX";
char *dir = mkdtemp(tmpl);
size_t len;
char *copy;
if (dir == NULL) {
perror("mkdtemp");
return NULL;
}
len = strlen(dir);
copy = (char *)malloc(len + 1u);
if (copy == NULL) {
fprintf(stderr, "failed to allocate temp dir copy\n");
return NULL;
}
memcpy(copy, dir, len + 1u);
return copy;
}
static bool amduatd_test_clone_record(const amduat_fed_record_t *src,
amduat_fed_record_t *dst) {
if (src == NULL || dst == NULL) {
return false;
}
*dst = *src;
if (!amduat_reference_clone(src->id.ref, &dst->id.ref)) {
return false;
}
return true;
}
static bool amduatd_test_pull_get_records(void *ctx,
uint32_t domain_id,
uint64_t from_logseq,
uint64_t limit,
int *out_status,
amduat_fed_record_t **out_records,
size_t *out_len,
char **out_body) {
amduatd_test_pull_transport_t *t = (amduatd_test_pull_transport_t *)ctx;
amduat_fed_record_t *records = NULL;
size_t i;
(void)domain_id;
(void)from_logseq;
(void)limit;
if (out_status == NULL || out_records == NULL || out_len == NULL) {
return false;
}
*out_status = 200;
*out_records = NULL;
*out_len = 0;
if (out_body != NULL) {
*out_body = NULL;
}
if (t == NULL || t->record_count == 0u) {
return true;
}
records = (amduat_fed_record_t *)calloc(t->record_count, sizeof(*records));
if (records == NULL) {
return false;
}
for (i = 0; i < t->record_count; ++i) {
if (!amduatd_test_clone_record(&t->records[i], &records[i])) {
free(records);
return false;
}
}
*out_records = records;
*out_len = t->record_count;
if (t->mutate_cursor && t->store != NULL && t->pointer_store != NULL &&
t->space != NULL && t->peer_key != NULL) {
amduatd_fed_cursor_record_t cursor;
amduat_reference_t cursor_ref;
amduatd_fed_cursor_record_t current;
amduat_reference_t current_ref;
const amduat_reference_t *expected_ref = NULL;
amduatd_fed_cursor_record_init(&cursor);
cursor.peer_key = strdup(t->peer_key);
cursor.space_id = strdup((const char *)t->space->space_id.data);
cursor.has_logseq = true;
cursor.last_logseq = 99u;
cursor.has_record_ref = true;
if (cursor.peer_key != NULL && cursor.space_id != NULL &&
amduat_reference_clone(t->records[0].id.ref,
&cursor.last_record_ref)) {
amduatd_fed_cursor_record_init(&current);
memset(&current_ref, 0, sizeof(current_ref));
if (amduatd_fed_cursor_get(t->store,
t->pointer_store,
t->space,
t->peer_key,
&current,
&current_ref) == AMDUATD_FED_CURSOR_OK) {
expected_ref = &current_ref;
}
(void)amduatd_fed_cursor_cas_set(t->store,
t->pointer_store,
t->space,
t->peer_key,
expected_ref,
&cursor,
&cursor_ref);
if (expected_ref != NULL) {
amduat_reference_free(&current_ref);
}
amduatd_fed_cursor_record_free(&current);
amduat_reference_free(&cursor_ref);
}
amduatd_fed_cursor_record_free(&cursor);
}
return true;
}
static void amduatd_test_pull_free_records(void *ctx,
amduat_fed_record_t *records,
size_t len) {
size_t i;
(void)ctx;
if (records == NULL) {
return;
}
for (i = 0; i < len; ++i) {
amduat_reference_free(&records[i].id.ref);
}
free(records);
}
static bool amduatd_test_pull_get_artifact(void *ctx,
amduat_reference_t ref,
int *out_status,
amduat_octets_t *out_bytes,
char **out_body) {
amduatd_test_pull_transport_t *t = (amduatd_test_pull_transport_t *)ctx;
size_t i;
if (out_status == NULL || out_bytes == NULL) {
return false;
}
*out_status = 404;
*out_bytes = amduat_octets(NULL, 0u);
if (out_body != NULL) {
*out_body = NULL;
}
if (t == NULL) {
return false;
}
for (i = 0; i < t->record_count; ++i) {
if (!amduat_reference_eq(ref, t->records[i].id.ref)) {
continue;
}
if (t->fail_artifact && i == t->fail_index) {
*out_status = t->fail_status;
return true;
}
if (!amduat_octets_clone(t->artifact_bytes[i], out_bytes)) {
return false;
}
*out_status = 200;
return true;
}
return true;
}
static bool amduatd_test_make_record(amduat_asl_store_t *store,
const char *payload,
uint64_t logseq,
uint32_t domain_id,
amduat_fed_record_t *out_record,
amduat_octets_t *out_bytes) {
amduat_artifact_t artifact;
amduat_reference_t ref;
amduat_octets_t artifact_bytes = amduat_octets(NULL, 0u);
amduat_octets_t payload_bytes = amduat_octets(NULL, 0u);
if (store == NULL || payload == NULL || out_record == NULL ||
out_bytes == NULL) {
return false;
}
if (!amduat_octets_clone(amduat_octets(payload, strlen(payload)),
&payload_bytes)) {
return false;
}
if (!amduat_asl_artifact_from_bytes(payload_bytes,
AMDUAT_ASL_IO_RAW,
false,
amduat_type_tag(0u),
&artifact)) {
amduat_octets_free(&payload_bytes);
return false;
}
if (!amduat_asl_ref_derive(artifact,
store->config.encoding_profile_id,
store->config.hash_id,
&ref,
&artifact_bytes)) {
amduat_asl_artifact_free(&artifact);
return false;
}
amduat_asl_artifact_free(&artifact);
amduat_octets_free(&artifact_bytes);
memset(out_record, 0, sizeof(*out_record));
out_record->id.type = AMDUAT_FED_REC_ARTIFACT;
out_record->id.ref = ref;
out_record->logseq = logseq;
out_record->snapshot_id = 0u;
out_record->log_prefix = 0u;
out_record->meta.domain_id = domain_id;
out_record->meta.visibility = 1u;
out_record->meta.has_source = 0u;
out_record->meta.source_domain = 0u;
if (!amduat_octets_clone(amduat_octets(payload, strlen(payload)),
out_bytes)) {
amduat_reference_free(&ref);
return false;
}
return true;
}
static void amduatd_test_free_transport(amduatd_test_pull_transport_t *t) {
size_t i;
if (t == NULL) {
return;
}
if (t->records != NULL) {
for (i = 0; i < t->record_count; ++i) {
amduat_reference_free(&t->records[i].id.ref);
}
free(t->records);
}
if (t->artifact_bytes != NULL) {
for (i = 0; i < t->record_count; ++i) {
amduat_octets_free(&t->artifact_bytes[i]);
}
free(t->artifact_bytes);
}
memset(t, 0, sizeof(*t));
}
int main(void) {
char *root = amduatd_test_make_temp_dir();
amduat_asl_store_fs_config_t cfg;
amduatd_store_ctx_t store_ctx;
amduat_asl_store_t store;
amduat_asl_pointer_store_t pointer_store;
amduatd_space_t space;
amduatd_fed_cfg_t fed_cfg;
if (root == NULL) {
return 1;
}
memset(&cfg, 0, sizeof(cfg));
if (!amduat_asl_store_fs_init_root(root, NULL, &cfg)) {
fprintf(stderr, "failed to init store root\n");
free(root);
return 1;
}
memset(&store_ctx, 0, sizeof(store_ctx));
memset(&store, 0, sizeof(store));
if (!amduatd_store_init(&store,
&cfg,
&store_ctx,
root,
AMDUATD_STORE_BACKEND_INDEX)) {
fprintf(stderr, "failed to init store\n");
free(root);
return 1;
}
if (!amduat_asl_pointer_store_init(&pointer_store, root)) {
fprintf(stderr, "failed to init pointer store\n");
free(root);
return 1;
}
if (!amduatd_space_init(&space, "alpha", false)) {
fprintf(stderr, "failed to init space\n");
free(root);
return 1;
}
amduatd_fed_cfg_init(&fed_cfg);
fed_cfg.enabled = true;
{
amduatd_test_pull_transport_t t;
amduatd_fed_pull_transport_t transport;
amduat_fed_record_t *records = NULL;
amduat_octets_t *bytes = NULL;
amduatd_fed_pull_apply_report_t report;
amduatd_fed_pull_apply_status_t rc;
amduat_reference_t cursor_ref;
amduatd_fed_cursor_record_t cursor;
size_t i;
memset(&t, 0, sizeof(t));
records = (amduat_fed_record_t *)calloc(2u, sizeof(*records));
bytes = (amduat_octets_t *)calloc(2u, sizeof(*bytes));
if (records == NULL || bytes == NULL) {
fprintf(stderr, "failed to alloc records\n");
free(root);
return 1;
}
if (!amduatd_test_make_record(&store,
"hello",
1u,
1u,
&records[0],
&bytes[0]) ||
!amduatd_test_make_record(&store,
"world",
2u,
1u,
&records[1],
&bytes[1])) {
fprintf(stderr, "failed to make records\n");
free(root);
return 1;
}
t.records = records;
t.record_count = 2u;
t.artifact_bytes = bytes;
memset(&transport, 0, sizeof(transport));
transport.ctx = &t;
transport.get_records = amduatd_test_pull_get_records;
transport.free_records = amduatd_test_pull_free_records;
transport.get_artifact = amduatd_test_pull_get_artifact;
amduatd_fed_pull_apply_report_init(&report);
rc = amduatd_fed_pull_apply(&store,
&pointer_store,
&space,
"1",
2u,
&fed_cfg,
&transport,
&report);
expect(rc == AMDUATD_FED_PULL_APPLY_OK, "apply success");
expect(report.applied_record_count == 2u, "applied record count");
expect(report.cursor_advanced, "cursor advanced");
for (i = 0; i < 2u; ++i) {
amduat_artifact_t artifact;
memset(&artifact, 0, sizeof(artifact));
expect(amduat_asl_store_get(&store,
records[i].id.ref,
&artifact) == AMDUAT_ASL_STORE_OK,
"artifact stored");
amduat_asl_artifact_free(&artifact);
}
amduatd_fed_cursor_record_init(&cursor);
memset(&cursor_ref, 0, sizeof(cursor_ref));
expect(amduatd_fed_cursor_get(&store,
&pointer_store,
&space,
"1",
&cursor,
&cursor_ref) == AMDUATD_FED_CURSOR_OK,
"cursor get after apply");
expect(cursor.has_logseq && cursor.last_logseq == 2u,
"cursor advanced logseq");
amduat_reference_free(&cursor_ref);
amduatd_fed_cursor_record_free(&cursor);
amduatd_fed_pull_apply_report_free(&report);
amduatd_test_free_transport(&t);
}
{
amduatd_test_pull_transport_t t;
amduatd_fed_pull_transport_t transport;
amduat_fed_record_t *records = NULL;
amduat_octets_t *bytes = NULL;
amduatd_fed_pull_apply_report_t report;
amduatd_fed_pull_apply_status_t rc;
amduatd_fed_cursor_record_t cursor;
amduat_reference_t cursor_ref;
uint64_t before_logseq = 0u;
bool before_has_logseq = false;
memset(&t, 0, sizeof(t));
records = (amduat_fed_record_t *)calloc(2u, sizeof(*records));
bytes = (amduat_octets_t *)calloc(2u, sizeof(*bytes));
if (records == NULL || bytes == NULL) {
fprintf(stderr, "failed to alloc records\n");
free(root);
return 1;
}
if (!amduatd_test_make_record(&store,
"alpha",
3u,
1u,
&records[0],
&bytes[0]) ||
!amduatd_test_make_record(&store,
"beta",
4u,
1u,
&records[1],
&bytes[1])) {
fprintf(stderr, "failed to make records\n");
free(root);
return 1;
}
t.records = records;
t.record_count = 2u;
t.artifact_bytes = bytes;
t.fail_artifact = true;
t.fail_index = 1u;
t.fail_status = 503;
memset(&transport, 0, sizeof(transport));
transport.ctx = &t;
transport.get_records = amduatd_test_pull_get_records;
transport.free_records = amduatd_test_pull_free_records;
transport.get_artifact = amduatd_test_pull_get_artifact;
amduatd_fed_cursor_record_init(&cursor);
memset(&cursor_ref, 0, sizeof(cursor_ref));
expect(amduatd_fed_cursor_get(&store,
&pointer_store,
&space,
"1",
&cursor,
&cursor_ref) == AMDUATD_FED_CURSOR_OK,
"cursor present before partial");
before_has_logseq = cursor.has_logseq;
before_logseq = cursor.last_logseq;
amduat_reference_free(&cursor_ref);
amduatd_fed_cursor_record_free(&cursor);
amduatd_fed_pull_apply_report_init(&report);
rc = amduatd_fed_pull_apply(&store,
&pointer_store,
&space,
"1",
2u,
&fed_cfg,
&transport,
&report);
expect(rc == AMDUATD_FED_PULL_APPLY_ERR_STORE, "apply partial failure");
expect(report.applied_record_count == 1u, "partial applied count");
amduatd_fed_cursor_record_init(&cursor);
memset(&cursor_ref, 0, sizeof(cursor_ref));
expect(amduatd_fed_cursor_get(&store,
&pointer_store,
&space,
"1",
&cursor,
&cursor_ref) == AMDUATD_FED_CURSOR_OK,
"cursor present (from previous test)");
expect(cursor.has_logseq == before_has_logseq, "cursor unchanged flag");
expect(cursor.last_logseq == before_logseq, "cursor unchanged logseq");
amduat_reference_free(&cursor_ref);
amduatd_fed_cursor_record_free(&cursor);
amduatd_fed_pull_apply_report_free(&report);
amduatd_test_free_transport(&t);
}
{
amduatd_test_pull_transport_t t;
amduatd_fed_pull_transport_t transport;
amduat_fed_record_t *records = NULL;
amduat_octets_t *bytes = NULL;
amduatd_fed_pull_apply_report_t report;
amduatd_fed_pull_apply_status_t rc;
amduatd_fed_cursor_record_t cursor;
amduat_reference_t cursor_ref;
memset(&t, 0, sizeof(t));
records = (amduat_fed_record_t *)calloc(1u, sizeof(*records));
bytes = (amduat_octets_t *)calloc(1u, sizeof(*bytes));
if (records == NULL || bytes == NULL) {
fprintf(stderr, "failed to alloc records\n");
free(root);
return 1;
}
if (!amduatd_test_make_record(&store,
"gamma",
5u,
1u,
&records[0],
&bytes[0])) {
fprintf(stderr, "failed to make record\n");
free(root);
return 1;
}
t.records = records;
t.record_count = 1u;
t.artifact_bytes = bytes;
t.mutate_cursor = true;
t.store = &store;
t.pointer_store = &pointer_store;
t.space = &space;
t.peer_key = "1";
memset(&transport, 0, sizeof(transport));
transport.ctx = &t;
transport.get_records = amduatd_test_pull_get_records;
transport.free_records = amduatd_test_pull_free_records;
transport.get_artifact = amduatd_test_pull_get_artifact;
amduatd_fed_pull_apply_report_init(&report);
rc = amduatd_fed_pull_apply(&store,
&pointer_store,
&space,
"1",
1u,
&fed_cfg,
&transport,
&report);
expect(rc == AMDUATD_FED_PULL_APPLY_ERR_CONFLICT, "cursor conflict");
amduatd_fed_cursor_record_init(&cursor);
memset(&cursor_ref, 0, sizeof(cursor_ref));
expect(amduatd_fed_cursor_get(&store,
&pointer_store,
&space,
"1",
&cursor,
&cursor_ref) == AMDUATD_FED_CURSOR_OK,
"cursor present after conflict");
expect(cursor.has_logseq && cursor.last_logseq == 99u,
"cursor unchanged on conflict");
amduat_reference_free(&cursor_ref);
amduatd_fed_cursor_record_free(&cursor);
amduatd_fed_pull_apply_report_free(&report);
amduatd_test_free_transport(&t);
}
free(root);
return failures == 0 ? 0 : 1;
}

View file

@ -0,0 +1,142 @@
#ifndef _POSIX_C_SOURCE
#define _POSIX_C_SOURCE 200809L
#endif
#include "amduatd_fed_pull_plan.h"
#include "amduat/hash/asl1.h"
#include <stdbool.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
static int failures = 0;
static void expect(bool cond, const char *msg) {
if (!cond) {
fprintf(stderr, "FAIL: %s\n", msg);
failures++;
}
}
static bool amduatd_make_test_ref(uint8_t fill, amduat_reference_t *out_ref) {
uint8_t digest_bytes[32];
amduat_octets_t digest;
if (out_ref == NULL) {
return false;
}
memset(digest_bytes, fill, sizeof(digest_bytes));
if (!amduat_octets_clone(amduat_octets(digest_bytes, sizeof(digest_bytes)),
&digest)) {
return false;
}
*out_ref = amduat_reference(AMDUAT_HASH_ASL1_ID_SHA256, digest);
return true;
}
int main(void) {
amduatd_fed_cfg_t cfg;
amduat_asl_store_t store;
amduatd_fed_pull_plan_status_t status;
amduatd_fed_cfg_init(&cfg);
memset(&store, 0, sizeof(store));
status = amduatd_fed_pull_plan_check(&cfg, &store);
expect(status == AMDUATD_FED_PULL_PLAN_ERR_DISABLED,
"disabled federation check");
cfg.enabled = true;
status = amduatd_fed_pull_plan_check(&cfg, &store);
expect(status == AMDUATD_FED_PULL_PLAN_ERR_UNSUPPORTED,
"unsupported backend check");
{
amduatd_fed_pull_plan_input_t input;
amduat_fed_record_t records[2];
amduat_reference_t ref0;
amduat_reference_t ref1;
char *json = NULL;
if (!amduatd_make_test_ref(0x01, &ref0) ||
!amduatd_make_test_ref(0x02, &ref1)) {
fprintf(stderr, "FAIL: make refs\n");
return 1;
}
memset(records, 0, sizeof(records));
records[0].id.type = AMDUAT_FED_REC_ARTIFACT;
records[0].id.ref = ref0;
records[0].logseq = 1u;
records[1].id.type = AMDUAT_FED_REC_PER;
records[1].id.ref = ref1;
records[1].logseq = 2u;
memset(&input, 0, sizeof(input));
input.peer_key = "1";
input.cursor_present = false;
input.records = records;
input.record_count = 2u;
status = amduatd_fed_pull_plan_json(&input, &json);
expect(status == AMDUATD_FED_PULL_PLAN_OK, "plan json missing cursor");
expect(json != NULL && strstr(json, "\"present\":false") != NULL,
"cursor present false");
expect(json != NULL && strstr(json, "\"record_count\":2") != NULL,
"record count");
expect(json != NULL && strstr(json, "\"last_logseq\":2") != NULL,
"next cursor candidate");
free(json);
amduat_reference_free(&ref0);
amduat_reference_free(&ref1);
}
{
amduatd_fed_pull_plan_input_t input;
amduatd_fed_cursor_record_t cursor;
amduat_reference_t cursor_ref;
amduat_reference_t record_ref;
char *json = NULL;
amduatd_fed_cursor_record_init(&cursor);
cursor.peer_key = strdup("7");
cursor.space_id = NULL;
if (cursor.peer_key == NULL) {
fprintf(stderr, "FAIL: cursor peer allocation\n");
return 1;
}
cursor.has_logseq = true;
cursor.last_logseq = 5u;
if (!amduatd_make_test_ref(0x03, &record_ref)) {
fprintf(stderr, "FAIL: make cursor ref\n");
return 1;
}
cursor.has_record_ref = true;
cursor.last_record_ref = record_ref;
if (!amduatd_make_test_ref(0x04, &cursor_ref)) {
fprintf(stderr, "FAIL: make cursor ref\n");
amduatd_fed_cursor_record_free(&cursor);
return 1;
}
memset(&input, 0, sizeof(input));
input.peer_key = "7";
input.cursor_present = true;
input.cursor = &cursor;
input.cursor_ref = &cursor_ref;
input.records = NULL;
input.record_count = 0u;
status = amduatd_fed_pull_plan_json(&input, &json);
expect(status == AMDUATD_FED_PULL_PLAN_OK, "plan json with cursor");
expect(json != NULL && strstr(json, "\"present\":true") != NULL,
"cursor present true");
expect(json != NULL && strstr(json, "\"last_logseq\":5") != NULL,
"cursor logseq echoed");
free(json);
amduat_reference_free(&cursor_ref);
amduatd_fed_cursor_record_free(&cursor);
}
return failures == 0 ? 0 : 1;
}

View file

@ -0,0 +1,384 @@
#ifndef _POSIX_C_SOURCE
#define _POSIX_C_SOURCE 200809L
#endif
#include <errno.h>
#include <stdbool.h>
#include <stdint.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/socket.h>
#include <sys/un.h>
#include <unistd.h>
typedef struct {
char *data;
size_t len;
size_t cap;
} amduatd_buf_t;
static void amduatd_buf_free(amduatd_buf_t *b) {
if (b == NULL) {
return;
}
free(b->data);
b->data = NULL;
b->len = 0u;
b->cap = 0u;
}
static bool amduatd_buf_reserve(amduatd_buf_t *b, size_t extra) {
size_t need;
size_t next;
char *mem;
if (b == NULL) {
return false;
}
if (extra > SIZE_MAX - b->len) {
return false;
}
need = b->len + extra;
if (need <= b->cap) {
return true;
}
next = b->cap == 0u ? 512u : b->cap;
while (next < need) {
if (next > SIZE_MAX / 2u) {
next = need;
break;
}
next *= 2u;
}
mem = (char *)realloc(b->data, next);
if (mem == NULL) {
return false;
}
b->data = mem;
b->cap = next;
return true;
}
static bool amduatd_buf_append(amduatd_buf_t *b, const char *s, size_t n) {
if (b == NULL) {
return false;
}
if (n == 0u) {
return true;
}
if (s == NULL) {
return false;
}
if (!amduatd_buf_reserve(b, n)) {
return false;
}
memcpy(b->data + b->len, s, n);
b->len += n;
return true;
}
static bool amduatd_buf_append_cstr(amduatd_buf_t *b, const char *s) {
return amduatd_buf_append(b, s, s != NULL ? strlen(s) : 0u);
}
static void amduatd_usage(const char *prog) {
fprintf(stderr,
"usage: %s --sock PATH --method METHOD --path PATH "
"[--header \"K: V\"]... [--data STRING|--data-file PATH] "
"[--allow-status]\n",
prog);
}
static bool amduatd_read_file(const char *path, uint8_t **out_data,
size_t *out_len) {
FILE *fp;
long size;
uint8_t *buf;
if (out_data == NULL || out_len == NULL || path == NULL) {
return false;
}
*out_data = NULL;
*out_len = 0u;
fp = fopen(path, "rb");
if (fp == NULL) {
return false;
}
if (fseek(fp, 0, SEEK_END) != 0) {
fclose(fp);
return false;
}
size = ftell(fp);
if (size < 0) {
fclose(fp);
return false;
}
if (fseek(fp, 0, SEEK_SET) != 0) {
fclose(fp);
return false;
}
buf = (uint8_t *)malloc((size_t)size);
if (buf == NULL) {
fclose(fp);
return false;
}
if (size != 0 &&
fread(buf, 1u, (size_t)size, fp) != (size_t)size) {
free(buf);
fclose(fp);
return false;
}
fclose(fp);
*out_data = buf;
*out_len = (size_t)size;
return true;
}
static int amduatd_parse_status(const char *line) {
const char *p;
char *end = NULL;
long code;
if (line == NULL) {
return -1;
}
p = strchr(line, ' ');
if (p == NULL) {
return -1;
}
while (*p == ' ') {
p++;
}
errno = 0;
code = strtol(p, &end, 10);
if (errno != 0 || end == p || code < 0 || code > 999) {
return -1;
}
return (int)code;
}
int main(int argc, char **argv) {
const char *sock_path = NULL;
const char *method = NULL;
const char *path = NULL;
bool allow_status = false;
char **headers = NULL;
size_t header_count = 0u;
uint8_t *body = NULL;
size_t body_len = 0u;
int fd = -1;
struct sockaddr_un addr;
amduatd_buf_t req;
amduatd_buf_t resp;
ssize_t nread;
int status;
char *header_end;
size_t body_offset;
size_t i;
memset(&req, 0, sizeof(req));
memset(&resp, 0, sizeof(resp));
for (i = 1; i < (size_t)argc; ++i) {
const char *arg = argv[i];
if (strcmp(arg, "--sock") == 0 && i + 1 < (size_t)argc) {
sock_path = argv[++i];
} else if (strcmp(arg, "--method") == 0 && i + 1 < (size_t)argc) {
method = argv[++i];
} else if (strcmp(arg, "--path") == 0 && i + 1 < (size_t)argc) {
path = argv[++i];
} else if (strcmp(arg, "--header") == 0 && i + 1 < (size_t)argc) {
char **next = (char **)realloc(headers,
(header_count + 1u) * sizeof(*headers));
if (next == NULL) {
fprintf(stderr, "oom\n");
goto fail;
}
headers = next;
headers[header_count++] = argv[++i];
} else if (strcmp(arg, "--data") == 0 && i + 1 < (size_t)argc) {
const char *data = argv[++i];
body_len = strlen(data);
if (body_len == 0u) {
body = NULL;
} else {
body = (uint8_t *)malloc(body_len);
if (body == NULL) {
fprintf(stderr, "oom\n");
goto fail;
}
memcpy(body, data, body_len);
}
} else if (strcmp(arg, "--data-file") == 0 && i + 1 < (size_t)argc) {
if (!amduatd_read_file(argv[++i], &body, &body_len)) {
fprintf(stderr, "failed to read data file\n");
goto fail;
}
} else if (strcmp(arg, "--allow-status") == 0) {
allow_status = true;
} else {
amduatd_usage(argv[0]);
goto fail;
}
}
if (sock_path == NULL || method == NULL || path == NULL) {
amduatd_usage(argv[0]);
goto fail;
}
fd = socket(AF_UNIX, SOCK_STREAM, 0);
if (fd < 0) {
perror("socket");
goto fail;
}
memset(&addr, 0, sizeof(addr));
addr.sun_family = AF_UNIX;
if (strlen(sock_path) >= sizeof(addr.sun_path)) {
fprintf(stderr, "socket path too long\n");
goto fail;
}
strncpy(addr.sun_path, sock_path, sizeof(addr.sun_path) - 1u);
if (connect(fd, (struct sockaddr *)&addr, sizeof(addr)) != 0) {
perror("connect");
goto fail;
}
if (!amduatd_buf_append_cstr(&req, method) ||
!amduatd_buf_append_cstr(&req, " ") ||
!amduatd_buf_append_cstr(&req, path) ||
!amduatd_buf_append_cstr(&req, " HTTP/1.1\r\n") ||
!amduatd_buf_append_cstr(&req, "Host: localhost\r\n") ||
!amduatd_buf_append_cstr(&req, "Connection: close\r\n")) {
fprintf(stderr, "oom\n");
goto fail;
}
for (i = 0; i < header_count; ++i) {
if (!amduatd_buf_append_cstr(&req, headers[i]) ||
!amduatd_buf_append_cstr(&req, "\r\n")) {
fprintf(stderr, "oom\n");
goto fail;
}
}
if (body != NULL) {
char len_buf[64];
int len;
len = snprintf(len_buf, sizeof(len_buf), "Content-Length: %zu\r\n",
body_len);
if (len < 0 || (size_t)len >= sizeof(len_buf) ||
!amduatd_buf_append(&req, len_buf, (size_t)len)) {
fprintf(stderr, "oom\n");
goto fail;
}
}
if (!amduatd_buf_append_cstr(&req, "\r\n")) {
fprintf(stderr, "oom\n");
goto fail;
}
{
size_t off = 0u;
while (off < req.len) {
ssize_t n = write(fd, req.data + off, req.len - off);
if (n <= 0) {
perror("write");
goto fail;
}
off += (size_t)n;
}
}
if (body != NULL && body_len != 0u) {
size_t off = 0u;
while (off < body_len) {
ssize_t n = write(fd, body + off, body_len - off);
if (n <= 0) {
perror("write");
goto fail;
}
off += (size_t)n;
}
}
while (true) {
char tmp[4096];
nread = read(fd, tmp, sizeof(tmp));
if (nread < 0) {
perror("read");
goto fail;
}
if (nread == 0) {
break;
}
if (!amduatd_buf_append(&resp, tmp, (size_t)nread)) {
fprintf(stderr, "oom\n");
goto fail;
}
}
if (resp.len == 0u) {
fprintf(stderr, "empty response\n");
goto fail;
}
if (!amduatd_buf_append(&resp, "\0", 1u)) {
fprintf(stderr, "oom\n");
goto fail;
}
{
char *line_end = strstr(resp.data, "\r\n");
if (line_end == NULL) {
fprintf(stderr, "invalid response\n");
goto fail;
}
*line_end = '\0';
status = amduatd_parse_status(resp.data);
*line_end = '\r';
if (status < 0) {
fprintf(stderr, "invalid status\n");
goto fail;
}
}
header_end = strstr(resp.data, "\r\n\r\n");
if (header_end == NULL) {
body_offset = resp.len;
} else {
body_offset = (size_t)(header_end - resp.data) + 4u;
}
if (body_offset < resp.len) {
size_t out_len = resp.len - body_offset;
if (resp.len > 0u && resp.data[resp.len - 1u] == '\0') {
if (out_len > 0u) {
out_len -= 1u;
}
}
if (out_len > 0u) {
fwrite(resp.data + body_offset, 1u, out_len, stdout);
}
}
fflush(stdout);
amduatd_buf_free(&req);
amduatd_buf_free(&resp);
free(body);
free(headers);
if (fd >= 0) {
close(fd);
}
if (!allow_status && (status < 200 || status >= 300)) {
return 1;
}
return 0;
fail:
amduatd_buf_free(&req);
amduatd_buf_free(&resp);
free(body);
free(headers);
if (fd >= 0) {
close(fd);
}
return 1;
}