Add federation ingest endpoint with tests and docs

This commit is contained in:
Carl Niklas Rydberg 2026-01-24 15:49:47 +01:00
parent 79f19213ce
commit 8a490ef09e
5 changed files with 1255 additions and 12 deletions

View file

@ -215,6 +215,11 @@ add_test(NAME amduatd_fed_smoke
)
set_tests_properties(amduatd_fed_smoke PROPERTIES SKIP_RETURN_CODE 77)
add_test(NAME amduatd_fed_ingest
COMMAND bash ${CMAKE_CURRENT_SOURCE_DIR}/scripts/test_fed_ingest.sh
)
set_tests_properties(amduatd_fed_ingest PROPERTIES SKIP_RETURN_CODE 77)
add_executable(amduatd_test_space_doctor
tests/test_amduatd_space_doctor.c
src/amduatd_space_doctor.c

View file

@ -129,6 +129,39 @@ curl --unix-socket amduatd.sock -X POST \
`/v1/fed/pull` requires the index backend and will not advance the cursor on
partial failure.
### Federation ingest (receiver)
`/v1/fed/ingest` applies a single incoming record (push receiver). The request
is space-scoped via `X-Amduat-Space` and requires federation to be enabled;
otherwise the daemon responds with 503.
For artifact, per, and tgk_edge records, send raw bytes and provide metadata via
query params:
```sh
curl --unix-socket amduatd.sock -X POST \
'http://localhost/v1/fed/ingest?record_type=artifact&ref=<ref>' \
-H 'Content-Type: application/octet-stream' \
-H 'X-Amduat-Space: demo' \
--data-binary 'payload'
```
For tombstones, send a small JSON payload:
```sh
curl --unix-socket amduatd.sock -X POST \
'http://localhost/v1/fed/ingest' \
-H 'Content-Type: application/json' \
-H 'X-Amduat-Space: demo' \
-d '{"record_type":"tombstone","ref":"<ref>"}'
```
Notes:
- Record types: `artifact`, `per`, `tgk_edge`, `tombstone`.
- Size limit: 8 MiB per request.
- Tombstones use deterministic defaults: `scope=0`, `reason_code=0`.
Run the daemon with derivation indexing enabled:
```sh

371
scripts/test_fed_ingest.sh Normal file
View file

@ -0,0 +1,371 @@
#!/usr/bin/env bash
set -euo pipefail
ROOT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")/.." && pwd)"
HTTP_HELPER="${ROOT_DIR}/build/amduatd_http_unix"
USE_HTTP_HELPER=0
TMPDIR="${TMPDIR:-/tmp}"
mkdir -p "${TMPDIR}"
if ! command -v grep >/dev/null 2>&1; then
echo "skip: grep not found" >&2
exit 77
fi
if ! command -v awk >/dev/null 2>&1; then
echo "skip: awk not found" >&2
exit 77
fi
if command -v curl >/dev/null 2>&1; then
if curl --help 2>/dev/null | grep -q -- '--unix-socket'; then
USE_HTTP_HELPER=0
else
USE_HTTP_HELPER=1
fi
else
USE_HTTP_HELPER=1
fi
if [[ "${USE_HTTP_HELPER}" -eq 1 && ! -x "${HTTP_HELPER}" ]]; then
echo "skip: curl lacks --unix-socket support and helper missing" >&2
exit 77
fi
AMDUATD_BIN="${ROOT_DIR}/build/amduatd"
ASL_BIN="${ROOT_DIR}/vendor/amduat/build/amduat-asl"
if [[ ! -x "${AMDUATD_BIN}" || ! -x "${ASL_BIN}" ]]; then
echo "missing binaries; build amduatd and amduat-asl first" >&2
exit 1
fi
tmp_root="$(mktemp -d -p "${TMPDIR}" amduatd-fed-ingest-XXXXXX)"
tmp_keep="${AMDUATD_FED_INGEST_KEEP_TMP:-0}"
last_log="${TMPDIR}/amduatd-fed-ingest.last.log"
root_store="${tmp_root}/store"
root_ref="${tmp_root}/ref"
sock="${tmp_root}/amduatd.sock"
log="${tmp_root}/amduatd.log"
space_a="alpha"
space_b="beta"
show_log() {
if [[ -n "${log}" && -f "${log}" ]]; then
echo "daemon log: ${log}" >&2
cat "${log}" >&2
else
echo "daemon log missing: ${log}" >&2
fi
if [[ -f "${last_log}" ]]; then
echo "last log copy: ${last_log}" >&2
fi
}
die() {
echo "$1" >&2
show_log
exit 1
}
cleanup() {
if [[ -n "${pid:-}" ]]; then
kill "${pid}" >/dev/null 2>&1 || true
fi
if [[ -f "${log}" ]]; then
cp -f "${log}" "${last_log}" 2>/dev/null || true
fi
if [[ "${tmp_keep}" -eq 0 ]]; then
rm -rf "${tmp_root}"
else
echo "kept tmpdir: ${tmp_root}" >&2
fi
}
trap cleanup EXIT
mkdir -p "${root_store}" "${root_ref}"
"${ASL_BIN}" index init --root "${root_store}"
"${ASL_BIN}" index init --root "${root_ref}"
"${AMDUATD_BIN}" --root "${root_store}" --sock "${sock}" \
--store-backend index --space "${space_a}" \
--fed-enable --fed-transport stub \
>"${log}" 2>&1 &
pid=$!
http_get() {
local sock_path="$1"
local path="$2"
shift 2
if [[ "${USE_HTTP_HELPER}" -eq 1 ]]; then
"${HTTP_HELPER}" --sock "${sock_path}" --method GET --path "${path}" "$@"
else
curl --silent --show-error --fail \
--unix-socket "${sock_path}" \
"$@" \
"http://localhost${path}"
fi
}
http_get_allow() {
local sock_path="$1"
local path="$2"
shift 2
if [[ "${USE_HTTP_HELPER}" -eq 1 ]]; then
"${HTTP_HELPER}" --sock "${sock_path}" --method GET --path "${path}" \
--allow-status "$@"
else
curl --silent --show-error \
--unix-socket "${sock_path}" \
"$@" \
"http://localhost${path}"
fi
}
http_post() {
local sock_path="$1"
local path="$2"
local data="$3"
shift 3
if [[ "${USE_HTTP_HELPER}" -eq 1 ]]; then
"${HTTP_HELPER}" --sock "${sock_path}" --method POST --path "${path}" \
--data "${data}" \
"$@"
else
curl --silent --show-error --fail \
--unix-socket "${sock_path}" \
"$@" \
--data-binary "${data}" \
"http://localhost${path}"
fi
}
http_post_allow() {
local sock_path="$1"
local path="$2"
local data="$3"
shift 3
if [[ "${USE_HTTP_HELPER}" -eq 1 ]]; then
"${HTTP_HELPER}" --sock "${sock_path}" --method POST --path "${path}" \
--allow-status --data "${data}" \
"$@"
else
curl --silent --show-error \
--unix-socket "${sock_path}" \
"$@" \
--data-binary "${data}" \
"http://localhost${path}"
fi
}
http_status() {
local sock_path="$1"
local method="$2"
local path="$3"
shift 3
if [[ "${USE_HTTP_HELPER}" -eq 1 ]]; then
echo ""
return 0
fi
curl --silent --show-error --output /dev/null --write-out '%{http_code}' \
--unix-socket "${sock_path}" \
-X "${method}" \
"$@" \
"http://localhost${path}"
}
wait_for_ready() {
local sock_path="$1"
local pid_val="$2"
local log_path="$3"
local i
for i in $(seq 1 100); do
if ! kill -0 "${pid_val}" >/dev/null 2>&1; then
if [[ -f "${log_path}" ]] && grep -q "bind: Operation not permitted" "${log_path}"; then
echo "skip: bind not permitted for unix socket" >&2
exit 77
fi
if [[ -f "${log_path}" ]]; then
cat "${log_path}" >&2
fi
return 1
fi
if [[ -S "${sock_path}" ]]; then
if http_get "${sock_path}" "/v1/meta" >/dev/null 2>&1; then
return 0
fi
fi
sleep 0.1
done
return 1
}
if ! wait_for_ready "${sock}" "${pid}" "${log}"; then
die "daemon not ready"
fi
payload="fed-ingest"
ref="$(
printf '%s' "${payload}" | "${ASL_BIN}" put --root "${root_ref}" \
--input - --input-format raw --ref-format hex \
| tail -n 1 | tr -d '\r\n'
)"
ingest_resp="$(
http_post "${sock}" "/v1/fed/ingest?record_type=artifact&ref=${ref}" \
"${payload}" \
--header "Content-Type: application/octet-stream" \
--header "X-Amduat-Space: ${space_a}"
)" || {
die "ingest artifact failed"
}
status="$(
printf '%s' "${ingest_resp}" \
| tr -d '\r\n' \
| awk 'match($0, /"status":"[^"]+"/) {print substr($0, RSTART+10, RLENGTH-11)}'
)"
applied="$(
printf '%s' "${ingest_resp}" \
| tr -d '\r\n' \
| awk 'match($0, /"applied":[^,}]+/) {print substr($0, RSTART+10, RLENGTH-10)}'
)"
if [[ "${status}" != "ok" || "${applied}" != "true" ]]; then
die "unexpected ingest response: ${ingest_resp}"
fi
fetched="$(http_get "${sock}" "/v1/artifacts/${ref}")"
if [[ "${fetched}" != "${payload}" ]]; then
die "artifact fetch mismatch"
fi
ingest_again="$(
http_post "${sock}" "/v1/fed/ingest?record_type=artifact&ref=${ref}" \
"${payload}" \
--header "Content-Type: application/octet-stream" \
--header "X-Amduat-Space: ${space_a}"
)"
status="$(
printf '%s' "${ingest_again}" \
| tr -d '\r\n' \
| awk 'match($0, /"status":"[^"]+"/) {print substr($0, RSTART+10, RLENGTH-11)}'
)"
applied="$(
printf '%s' "${ingest_again}" \
| tr -d '\r\n' \
| awk 'match($0, /"applied":[^,}]+/) {print substr($0, RSTART+10, RLENGTH-10)}'
)"
if [[ "${status}" != "already_present" && "${applied}" != "false" ]]; then
die "unexpected re-ingest response: ${ingest_again}"
fi
tombstone_resp="$(
http_post "${sock}" "/v1/fed/ingest" \
"{\"record_type\":\"tombstone\",\"ref\":\"${ref}\"}" \
--header "Content-Type: application/json" \
--header "X-Amduat-Space: ${space_a}"
)"
status="$(
printf '%s' "${tombstone_resp}" \
| tr -d '\r\n' \
| awk 'match($0, /"status":"[^"]+"/) {print substr($0, RSTART+10, RLENGTH-11)}'
)"
if [[ "${status}" != "ok" ]]; then
die "unexpected tombstone response: ${tombstone_resp}"
fi
http_get_allow "${sock}" "/v1/artifacts/${ref}" >/dev/null 2>&1 || true
tombstone_again="$(
http_post "${sock}" "/v1/fed/ingest" \
"{\"record_type\":\"tombstone\",\"ref\":\"${ref}\"}" \
--header "Content-Type: application/json" \
--header "X-Amduat-Space: ${space_a}"
)"
status="$(
printf '%s' "${tombstone_again}" \
| tr -d '\r\n' \
| awk 'match($0, /"status":"[^"]+"/) {print substr($0, RSTART+10, RLENGTH-11)}'
)"
if [[ "${status}" != "ok" && "${status}" != "already_present" ]]; then
die "unexpected tombstone repeat response: ${tombstone_again}"
fi
cap_resp="$(
http_post "${sock}" "/v1/capabilities" \
"{\"kind\":\"pointer_name\",\"target\":{\"name\":\"space/${space_a}/fed/records\"},\"expiry_seconds\":3600}" \
--header "Content-Type: application/json" \
--header "X-Amduat-Space: ${space_a}"
)"
cap_token="$(
printf '%s' "${cap_resp}" \
| tr -d '\r\n' \
| awk 'match($0, /"token":"[^"]+"/) {print substr($0, RSTART+9, RLENGTH-10)}'
)"
if [[ -z "${cap_token}" ]]; then
die "failed to mint capability: ${cap_resp}"
fi
wrong_space_resp="$(
http_post_allow "${sock}" "/v1/fed/ingest?record_type=artifact&ref=${ref}" \
"${payload}" \
--header "Content-Type: application/octet-stream" \
--header "X-Amduat-Space: ${space_b}" \
--header "X-Amduat-Capability: ${cap_token}"
)"
status="$(
printf '%s' "${wrong_space_resp}" \
| tr -d '\r\n' \
| awk 'match($0, /"status":"[^"]+"/) {print substr($0, RSTART+10, RLENGTH-11)}'
)"
if [[ "${status}" != "invalid" ]]; then
die "unexpected wrong-space response: ${wrong_space_resp}"
fi
if [[ "${USE_HTTP_HELPER}" -eq 0 ]]; then
code="$(http_status "${sock}" "POST" "/v1/fed/ingest?record_type=artifact&ref=${ref}" \
--header "Content-Type: application/octet-stream" \
--header "X-Amduat-Space: ${space_b}" \
--header "X-Amduat-Capability: ${cap_token}")"
if [[ "${code}" != "403" ]]; then
die "expected 403 for wrong-space capability, got ${code}"
fi
fi
kill "${pid}" >/dev/null 2>&1 || true
wait "${pid}" >/dev/null 2>&1 || true
pid=""
rm -f "${sock}"
"${AMDUATD_BIN}" --root "${root_store}" --sock "${sock}" \
--store-backend index --space "${space_a}" \
--fed-transport stub \
>"${log}" 2>&1 &
pid=$!
if ! wait_for_ready "${sock}" "${pid}" "${log}"; then
die "daemon (fed disabled) not ready"
fi
disabled_resp="$(
http_post_allow "${sock}" "/v1/fed/ingest?record_type=artifact&ref=${ref}" \
"${payload}" \
--header "Content-Type: application/octet-stream" \
--header "X-Amduat-Space: ${space_a}"
)"
status="$(
printf '%s' "${disabled_resp}" \
| tr -d '\r\n' \
| awk 'match($0, /"status":"[^"]+"/) {print substr($0, RSTART+10, RLENGTH-11)}'
)"
if [[ "${status}" != "error" ]]; then
die "unexpected disabled response: ${disabled_resp}"
fi
if [[ "${USE_HTTP_HELPER}" -eq 0 ]]; then
code="$(http_status "${sock}" "POST" "/v1/fed/ingest?record_type=artifact&ref=${ref}" \
--header "Content-Type: application/octet-stream" \
--header "X-Amduat-Space: ${space_a}")"
if [[ "${code}" != "503" ]]; then
die "expected 503 for federation disabled, got ${code}"
fi
fi
echo "ok"

View file

@ -113,6 +113,8 @@ static bool amduatd_strbuf_append_char(amduatd_strbuf_t *b, char c);
static const char *const AMDUATD_DEFAULT_ROOT = ".amduat-asl";
static const char *const AMDUATD_DEFAULT_SOCK = "amduatd.sock";
static const uint64_t AMDUATD_FED_TICK_MS = 1000u;
static const size_t AMDUATD_FED_INGEST_MAX_BYTES = 8u * 1024u * 1024u;
static const size_t AMDUATD_REF_TEXT_MAX = 256u;
static const char k_amduatd_contract_v1_json[] =
"{"
"\"contract\":\"AMDUATD/API/1\","
@ -131,6 +133,7 @@ static const char k_amduatd_contract_v1_json[] =
"{\"method\":\"GET\",\"path\":\"/v1/fed/pull/plan\"},"
"{\"method\":\"POST\",\"path\":\"/v1/fed/pull\"},"
"{\"method\":\"POST\",\"path\":\"/v1/fed/pull\"},"
"{\"method\":\"POST\",\"path\":\"/v1/fed/ingest\"},"
"{\"method\":\"GET\",\"path\":\"/v1/fed/artifacts/{ref}\"},"
"{\"method\":\"GET\",\"path\":\"/v1/fed/status\"},"
"{\"method\":\"POST\",\"path\":\"/v1/concepts\"},"
@ -1087,6 +1090,140 @@ static bool amduatd_fed_records_log_name(const amduatd_space_t *space,
return amduatd_space_scope_name(space, "fed/records", out_name);
}
static bool amduatd_fed_ingest_parse_record_type(const char *s,
size_t len,
amduat_fed_record_type_t *out) {
if (out != NULL) {
*out = AMDUAT_FED_REC_ARTIFACT;
}
if (s == NULL || out == NULL) {
return false;
}
if (len == strlen("artifact") && memcmp(s, "artifact", len) == 0) {
*out = AMDUAT_FED_REC_ARTIFACT;
return true;
}
if (len == strlen("per") && memcmp(s, "per", len) == 0) {
*out = AMDUAT_FED_REC_PER;
return true;
}
if (len == strlen("tgk_edge") && memcmp(s, "tgk_edge", len) == 0) {
*out = AMDUAT_FED_REC_TGK_EDGE;
return true;
}
if (len == strlen("tombstone") && memcmp(s, "tombstone", len) == 0) {
*out = AMDUAT_FED_REC_TOMBSTONE;
return true;
}
return false;
}
static bool amduatd_fed_ingest_send_response(
int fd,
int code,
const char *reason,
const char *status,
bool applied,
const amduat_reference_t *ref,
const amduatd_space_t *effective_space) {
amduatd_strbuf_t b;
char *ref_hex = NULL;
bool ok = false;
memset(&b, 0, sizeof(b));
if (ref != NULL && ref->digest.data != NULL && ref->digest.len != 0u) {
if (!amduat_asl_ref_encode_hex(*ref, &ref_hex)) {
return amduatd_http_send_text(fd, 500, "Internal Server Error",
"encode error\n", false);
}
}
if (!amduatd_strbuf_append_cstr(&b, "{")) {
goto ingest_response_cleanup;
}
if (!amduatd_strbuf_append_cstr(&b, "\"status\":\"") ||
!amduatd_strbuf_append_cstr(&b, status != NULL ? status : "error") ||
!amduatd_strbuf_append_cstr(&b, "\",")) {
goto ingest_response_cleanup;
}
if (!amduatd_strbuf_append_cstr(&b, "\"applied\":") ||
!amduatd_strbuf_append_cstr(&b, applied ? "true" : "false") ||
!amduatd_strbuf_append_cstr(&b, ",")) {
goto ingest_response_cleanup;
}
if (!amduatd_strbuf_append_cstr(&b, "\"ref\":")) {
goto ingest_response_cleanup;
}
if (ref_hex != NULL) {
if (!amduatd_strbuf_append_cstr(&b, "\"") ||
!amduatd_strbuf_append_cstr(&b, ref_hex) ||
!amduatd_strbuf_append_cstr(&b, "\"")) {
goto ingest_response_cleanup;
}
} else {
if (!amduatd_strbuf_append_cstr(&b, "null")) {
goto ingest_response_cleanup;
}
}
if (!amduatd_strbuf_append_cstr(&b, ",\"effective_space\":{")) {
goto ingest_response_cleanup;
}
if (effective_space != NULL && effective_space->enabled &&
effective_space->space_id.data != NULL) {
const char *space_id = (const char *)effective_space->space_id.data;
if (!amduatd_strbuf_append_cstr(&b, "\"mode\":\"scoped\",") ||
!amduatd_strbuf_append_cstr(&b, "\"space_id\":\"") ||
!amduatd_strbuf_append_cstr(&b, space_id) ||
!amduatd_strbuf_append_cstr(&b, "\"")) {
goto ingest_response_cleanup;
}
} else {
if (!amduatd_strbuf_append_cstr(&b, "\"mode\":\"unscoped\",") ||
!amduatd_strbuf_append_cstr(&b, "\"space_id\":null")) {
goto ingest_response_cleanup;
}
}
if (!amduatd_strbuf_append_cstr(&b, "}}\n")) {
goto ingest_response_cleanup;
}
ok = amduatd_http_send_json(fd,
code,
reason != NULL ? reason : "OK",
b.data,
false);
ingest_response_cleanup:
amduatd_strbuf_free(&b);
free(ref_hex);
if (!ok) {
return amduatd_http_send_text(fd, 500, "Internal Server Error",
"error\n", false);
}
return true;
}
static void amduatd_fed_ingest_discard_body(int fd,
const amduatd_http_req_t *req,
size_t max_len) {
uint8_t buf[4096];
size_t remaining;
if (req == NULL || req->content_length == 0u) {
return;
}
if (max_len != 0u && req->content_length > max_len) {
return;
}
remaining = req->content_length;
while (remaining > 0u) {
size_t chunk = remaining < sizeof(buf) ? remaining : sizeof(buf);
if (!amduatd_read_exact(fd, buf, chunk)) {
break;
}
remaining -= chunk;
}
}
static bool amduatd_handle_get_fed_records(int fd,
amduat_asl_store_t *store,
const amduatd_fed_cfg_t *fed_cfg,
@ -1856,6 +1993,684 @@ fed_cursor_post_cleanup:
return true;
}
static bool amduatd_handle_post_fed_ingest(int fd,
amduat_asl_store_t *store,
const amduatd_fed_cfg_t *fed_cfg,
const amduatd_caps_t *caps,
const amduatd_cfg_t *dcfg,
const amduatd_http_req_t *req) {
char record_type_buf[32];
char ref_buf[AMDUATD_REF_TEXT_MAX + 1u];
char target_ref_buf[AMDUATD_REF_TEXT_MAX + 1u];
uint8_t *body = NULL;
const char *p = NULL;
const char *end = NULL;
bool have_record_type = false;
bool have_query_record_type = false;
bool have_ref = false;
bool have_ref_field = false;
bool have_target_ref_field = false;
bool have_body = false;
bool already_present = false;
amduat_fed_record_type_t record_type = AMDUAT_FED_REC_ARTIFACT;
amduat_fed_record_type_t query_record_type = AMDUAT_FED_REC_ARTIFACT;
amduat_reference_t ref;
amduat_reference_t stored_ref;
amduat_asl_store_error_t store_err;
amduat_asl_index_state_t state;
amduat_asl_io_format_t input_format = AMDUAT_ASL_IO_RAW;
amduat_type_tag_t type_tag = amduat_type_tag(0u);
bool has_type_tag = false;
memset(&ref, 0, sizeof(ref));
memset(&stored_ref, 0, sizeof(stored_ref));
if (store == NULL || fed_cfg == NULL || req == NULL || dcfg == NULL) {
return amduatd_fed_ingest_send_response(fd,
500,
"Internal Server Error",
"error",
false,
NULL,
req != NULL ? req->effective_space
: NULL);
}
if (!fed_cfg->enabled) {
amduatd_fed_ingest_discard_body(fd, req, AMDUATD_FED_INGEST_MAX_BYTES);
return amduatd_fed_ingest_send_response(fd,
503,
"Service Unavailable",
"error",
false,
NULL,
req->effective_space);
}
{
amduat_octets_t scoped = amduat_octets(NULL, 0u);
const char *err = NULL;
if (!amduatd_fed_scope_names(fed_cfg,
req->effective_space,
"fed",
&scoped,
&err)) {
amduatd_fed_ingest_discard_body(fd, req, AMDUATD_FED_INGEST_MAX_BYTES);
bool ok = amduatd_fed_ingest_send_response(fd,
400,
"Bad Request",
"invalid",
false,
NULL,
req->effective_space);
amduat_octets_free(&scoped);
return ok;
}
amduat_octets_free(&scoped);
}
if (caps != NULL && caps->enabled && req->x_capability[0] != '\0') {
const char *reason = NULL;
if (!amduatd_caps_check_space(caps, dcfg, req, &reason)) {
amduatd_fed_ingest_discard_body(fd, req, AMDUATD_FED_INGEST_MAX_BYTES);
if (reason != NULL && strcmp(reason, "wrong-space") == 0) {
return amduatd_fed_ingest_send_response(fd,
403,
"Forbidden",
"invalid",
false,
NULL,
req->effective_space);
}
return amduatd_fed_ingest_send_response(fd,
403,
"Forbidden",
"invalid",
false,
NULL,
req->effective_space);
}
}
if (amduatd_query_param(req->path,
"record_type",
record_type_buf,
sizeof(record_type_buf)) != NULL) {
size_t len = strlen(record_type_buf);
if (!amduatd_fed_ingest_parse_record_type(record_type_buf,
len,
&query_record_type)) {
amduatd_fed_ingest_discard_body(fd, req, AMDUATD_FED_INGEST_MAX_BYTES);
return amduatd_fed_ingest_send_response(fd,
400,
"Bad Request",
"invalid",
false,
NULL,
req->effective_space);
}
have_record_type = true;
have_query_record_type = true;
record_type = query_record_type;
}
if (amduatd_query_param(req->path, "ref", ref_buf, sizeof(ref_buf)) != NULL &&
ref_buf[0] != '\0') {
if (!amduat_asl_ref_decode_hex(ref_buf, &ref)) {
amduatd_fed_ingest_discard_body(fd, req, AMDUATD_FED_INGEST_MAX_BYTES);
return amduatd_fed_ingest_send_response(fd,
400,
"Bad Request",
"invalid",
false,
NULL,
req->effective_space);
}
have_ref = true;
have_ref_field = true;
}
if (amduatd_query_param(req->path,
"target_ref",
target_ref_buf,
sizeof(target_ref_buf)) != NULL &&
target_ref_buf[0] != '\0') {
if (have_ref_field) {
if (have_ref) {
amduatd_fed_ingest_discard_body(fd, req, AMDUATD_FED_INGEST_MAX_BYTES);
bool ok = amduatd_fed_ingest_send_response(fd,
400,
"Bad Request",
"invalid",
false,
&ref,
req->effective_space);
amduat_reference_free(&ref);
return ok;
}
amduatd_fed_ingest_discard_body(fd, req, AMDUATD_FED_INGEST_MAX_BYTES);
return amduatd_fed_ingest_send_response(fd,
400,
"Bad Request",
"invalid",
false,
NULL,
req->effective_space);
}
if (!amduat_asl_ref_decode_hex(target_ref_buf, &ref)) {
amduatd_fed_ingest_discard_body(fd, req, AMDUATD_FED_INGEST_MAX_BYTES);
return amduatd_fed_ingest_send_response(fd,
400,
"Bad Request",
"invalid",
false,
NULL,
req->effective_space);
}
have_ref = true;
have_target_ref_field = true;
}
if (req->content_length > AMDUATD_FED_INGEST_MAX_BYTES) {
amduatd_fed_ingest_discard_body(fd, req, AMDUATD_FED_INGEST_MAX_BYTES);
if (have_ref) {
bool ok = amduatd_fed_ingest_send_response(fd,
413,
"Payload Too Large",
"invalid",
false,
&ref,
req->effective_space);
amduat_reference_free(&ref);
return ok;
}
return amduatd_fed_ingest_send_response(fd,
413,
"Payload Too Large",
"invalid",
false,
NULL,
req->effective_space);
}
if (req->content_type[0] != '\0' &&
strstr(req->content_type, "application/json") != NULL) {
const char *parse_error = NULL;
bool json_have_record_type = false;
bool json_have_ref = false;
bool json_have_target_ref = false;
amduat_fed_record_type_t json_record_type = AMDUAT_FED_REC_ARTIFACT;
if (req->content_length == 0u) {
if (have_ref) {
bool ok = amduatd_fed_ingest_send_response(fd,
400,
"Bad Request",
"invalid",
false,
&ref,
req->effective_space);
amduat_reference_free(&ref);
return ok;
}
return amduatd_fed_ingest_send_response(fd,
400,
"Bad Request",
"invalid",
false,
NULL,
req->effective_space);
}
body = (uint8_t *)malloc(req->content_length);
if (body == NULL) {
if (have_ref) {
bool ok = amduatd_fed_ingest_send_response(fd,
500,
"Internal Server Error",
"error",
false,
&ref,
req->effective_space);
amduat_reference_free(&ref);
return ok;
}
return amduatd_fed_ingest_send_response(fd,
500,
"Internal Server Error",
"error",
false,
NULL,
req->effective_space);
}
if (!amduatd_read_exact(fd, body, req->content_length)) {
free(body);
if (have_ref) {
amduat_reference_free(&ref);
}
return false;
}
p = (const char *)body;
end = (const char *)body + req->content_length;
if (!amduatd_json_expect(&p, end, '{')) {
parse_error = "invalid";
goto ingest_json_parse_fail;
}
for (;;) {
const char *key = NULL;
size_t key_len = 0;
const char *sv = NULL;
size_t sv_len = 0;
const char *cur = NULL;
cur = amduatd_json_skip_ws(p, end);
if (cur < end && *cur == '}') {
p = cur + 1;
break;
}
if (!amduatd_json_parse_string_noesc(&p, end, &key, &key_len) ||
!amduatd_json_expect(&p, end, ':')) {
parse_error = "invalid";
goto ingest_json_parse_fail;
}
if (key_len == strlen("record_type") &&
memcmp(key, "record_type", key_len) == 0) {
if (json_have_record_type ||
!amduatd_json_parse_string_noesc(&p, end, &sv, &sv_len) ||
!amduatd_fed_ingest_parse_record_type(sv,
sv_len,
&json_record_type)) {
parse_error = "invalid";
goto ingest_json_parse_fail;
}
json_have_record_type = true;
} else if (key_len == strlen("ref") &&
memcmp(key, "ref", key_len) == 0) {
amduat_reference_t json_ref;
if (json_have_ref ||
!amduatd_json_parse_string_noesc(&p, end, &sv, &sv_len) ||
!amduatd_decode_ref_hex_str(sv, sv_len, &json_ref)) {
parse_error = "invalid";
goto ingest_json_parse_fail;
}
if (have_target_ref_field) {
amduat_reference_free(&json_ref);
parse_error = "invalid";
goto ingest_json_parse_fail;
}
if (have_ref && have_ref_field) {
if (!amduat_reference_eq(ref, json_ref)) {
amduat_reference_free(&json_ref);
parse_error = "invalid";
goto ingest_json_parse_fail;
}
amduat_reference_free(&json_ref);
} else {
if (have_ref) {
amduat_reference_free(&ref);
}
ref = json_ref;
have_ref = true;
}
json_have_ref = true;
have_ref_field = true;
} else if (key_len == strlen("target_ref") &&
memcmp(key, "target_ref", key_len) == 0) {
amduat_reference_t json_ref;
if (json_have_target_ref ||
!amduatd_json_parse_string_noesc(&p, end, &sv, &sv_len) ||
!amduatd_decode_ref_hex_str(sv, sv_len, &json_ref)) {
parse_error = "invalid";
goto ingest_json_parse_fail;
}
if (have_ref_field) {
amduat_reference_free(&json_ref);
parse_error = "invalid";
goto ingest_json_parse_fail;
}
if (have_ref && have_target_ref_field) {
if (!amduat_reference_eq(ref, json_ref)) {
amduat_reference_free(&json_ref);
parse_error = "invalid";
goto ingest_json_parse_fail;
}
amduat_reference_free(&json_ref);
} else {
if (have_ref) {
amduat_reference_free(&ref);
}
ref = json_ref;
have_ref = true;
}
json_have_target_ref = true;
have_target_ref_field = true;
} else {
if (!amduatd_json_skip_value(&p, end, 0)) {
parse_error = "invalid";
goto ingest_json_parse_fail;
}
}
cur = amduatd_json_skip_ws(p, end);
if (cur < end && *cur == ',') {
p = cur + 1;
continue;
}
if (cur < end && *cur == '}') {
p = cur + 1;
break;
}
parse_error = "invalid";
goto ingest_json_parse_fail;
}
if (json_have_record_type) {
if (have_query_record_type && json_record_type != query_record_type) {
parse_error = "invalid";
goto ingest_json_parse_fail;
}
have_record_type = true;
record_type = json_record_type;
}
if (!have_record_type || !have_ref) {
parse_error = "invalid";
goto ingest_json_parse_fail;
}
if (record_type != AMDUAT_FED_REC_TOMBSTONE) {
parse_error = "invalid";
goto ingest_json_parse_fail;
}
free(body);
body = NULL;
have_body = false;
ingest_json_parse_fail:
if (parse_error != NULL) {
free(body);
body = NULL;
if (have_ref) {
bool ok = amduatd_fed_ingest_send_response(fd,
400,
"Bad Request",
"invalid",
false,
&ref,
req->effective_space);
amduat_reference_free(&ref);
return ok;
}
return amduatd_fed_ingest_send_response(fd,
400,
"Bad Request",
"invalid",
false,
NULL,
req->effective_space);
}
} else {
if (req->content_length > 0u) {
body = (uint8_t *)malloc(req->content_length);
if (body == NULL) {
if (have_ref) {
bool ok = amduatd_fed_ingest_send_response(fd,
500,
"Internal Server Error",
"error",
false,
&ref,
req->effective_space);
amduat_reference_free(&ref);
return ok;
}
return amduatd_fed_ingest_send_response(fd,
500,
"Internal Server Error",
"error",
false,
NULL,
req->effective_space);
}
if (!amduatd_read_exact(fd, body, req->content_length)) {
free(body);
if (have_ref) {
amduat_reference_free(&ref);
}
return false;
}
have_body = true;
}
}
if (!have_record_type || !have_ref) {
free(body);
if (have_ref) {
bool ok = amduatd_fed_ingest_send_response(fd,
400,
"Bad Request",
"invalid",
false,
&ref,
req->effective_space);
amduat_reference_free(&ref);
return ok;
}
return amduatd_fed_ingest_send_response(fd,
400,
"Bad Request",
"invalid",
false,
NULL,
req->effective_space);
}
if (record_type != AMDUAT_FED_REC_TOMBSTONE) {
amduat_artifact_t artifact;
amduat_artifact_t existing_artifact;
amduat_asl_store_error_t exist_err;
amduat_octets_t artifact_input;
bool artifact_input_owned = false;
if (!have_body || req->content_length == 0u) {
free(body);
{
bool ok = amduatd_fed_ingest_send_response(fd,
400,
"Bad Request",
"invalid",
false,
&ref,
req->effective_space);
amduat_reference_free(&ref);
return ok;
}
}
if (req->content_type[0] != '\0' &&
strstr(req->content_type,
"application/vnd.amduat.asl.artifact+v1") != NULL) {
input_format = AMDUAT_ASL_IO_ARTIFACT;
}
if (record_type == AMDUAT_FED_REC_TGK_EDGE) {
type_tag = amduat_type_tag(AMDUAT_TYPE_TAG_TGK1_EDGE_V1);
has_type_tag = true;
} else if (record_type == AMDUAT_FED_REC_PER) {
type_tag = amduat_type_tag(AMDUAT_TYPE_TAG_FER1_RECEIPT_1);
has_type_tag = true;
}
memset(&existing_artifact, 0, sizeof(existing_artifact));
exist_err = amduat_asl_store_get(store, ref, &existing_artifact);
if (exist_err == AMDUAT_ASL_STORE_OK) {
already_present = true;
amduat_asl_artifact_free(&existing_artifact);
}
artifact_input = amduat_octets(body, req->content_length);
if (input_format == AMDUAT_ASL_IO_RAW) {
if (!amduat_octets_clone(artifact_input, &artifact_input)) {
free(body);
{
bool ok = amduatd_fed_ingest_send_response(fd,
500,
"Internal Server Error",
"error",
false,
&ref,
req->effective_space);
amduat_reference_free(&ref);
return ok;
}
}
artifact_input_owned = true;
}
memset(&artifact, 0, sizeof(artifact));
if (!amduat_asl_artifact_from_bytes(artifact_input,
input_format,
has_type_tag,
type_tag,
&artifact)) {
if (artifact_input_owned) {
amduat_octets_free(&artifact_input);
}
free(body);
{
bool ok = amduatd_fed_ingest_send_response(fd,
400,
"Bad Request",
"invalid",
false,
&ref,
req->effective_space);
amduat_reference_free(&ref);
return ok;
}
}
free(body);
body = NULL;
if (input_format == AMDUAT_ASL_IO_ARTIFACT && has_type_tag &&
(!artifact.has_type_tag ||
artifact.type_tag.tag_id != type_tag.tag_id)) {
amduat_asl_artifact_free(&artifact);
{
bool ok = amduatd_fed_ingest_send_response(fd,
400,
"Bad Request",
"invalid",
false,
&ref,
req->effective_space);
amduat_reference_free(&ref);
return ok;
}
}
if (store->ops.put_indexed == NULL) {
amduat_asl_artifact_free(&artifact);
{
bool ok = amduatd_fed_ingest_send_response(fd,
501,
"Not Implemented",
"error",
false,
&ref,
req->effective_space);
amduat_reference_free(&ref);
return ok;
}
}
store_err = amduat_asl_store_put_indexed(store, artifact, &stored_ref,
&state);
amduat_asl_artifact_free(&artifact);
if (store_err != AMDUAT_ASL_STORE_OK) {
bool ok = amduatd_fed_ingest_send_response(fd,
500,
"Internal Server Error",
"error",
false,
&ref,
req->effective_space);
amduat_reference_free(&ref);
return ok;
}
if (!amduat_reference_eq(ref, stored_ref)) {
amduat_reference_free(&stored_ref);
{
bool ok = amduatd_fed_ingest_send_response(fd,
400,
"Bad Request",
"invalid",
false,
NULL,
req->effective_space);
amduat_reference_free(&ref);
return ok;
}
}
amduat_reference_free(&stored_ref);
{
bool ok = amduatd_fed_ingest_send_response(
fd,
200,
"OK",
already_present ? "already_present" : "ok",
!already_present,
&ref,
req->effective_space);
amduat_reference_free(&ref);
return ok;
}
}
if (have_body && req->content_length != 0u) {
free(body);
{
bool ok = amduatd_fed_ingest_send_response(fd,
400,
"Bad Request",
"invalid",
false,
&ref,
req->effective_space);
amduat_reference_free(&ref);
return ok;
}
}
free(body);
body = NULL;
if (store->ops.tombstone == NULL) {
bool ok = amduatd_fed_ingest_send_response(fd,
501,
"Not Implemented",
"error",
false,
&ref,
req->effective_space);
amduat_reference_free(&ref);
return ok;
}
store_err = amduat_asl_store_tombstone(store, ref, 0u, 0u, &state);
if (store_err != AMDUAT_ASL_STORE_OK) {
bool ok = amduatd_fed_ingest_send_response(fd,
500,
"Internal Server Error",
"error",
false,
&ref,
req->effective_space);
amduat_reference_free(&ref);
return ok;
}
{
bool ok = amduatd_fed_ingest_send_response(fd,
200,
"OK",
"ok",
true,
&ref,
req->effective_space);
amduat_reference_free(&ref);
return ok;
}
}
static bool amduatd_handle_get_fed_pull_plan(int fd,
amduat_asl_store_t *store,
const amduatd_fed_cfg_t *fed_cfg,
@ -5284,6 +6099,16 @@ static bool amduatd_handle_conn(int fd,
root_path);
goto conn_cleanup;
}
if (strcmp(req.method, "POST") == 0 &&
strcmp(no_query, "/v1/fed/ingest") == 0) {
ok = amduatd_handle_post_fed_ingest(fd,
store,
fed_cfg,
caps,
effective_cfg,
&req);
goto conn_cleanup;
}
if (strcmp(req.method, "POST") == 0 &&
strcmp(no_query, "/v1/fed/cursor") == 0) {
ok = amduatd_handle_post_fed_cursor(fd,

View file

@ -3020,6 +3020,9 @@ static bool amduatd_seed_concept_if_missing(
out_concept_ref == NULL) {
return false;
}
memset(&name_ref, 0, sizeof(name_ref));
memset(&concept_ref, 0, sizeof(concept_ref));
memset(&edge_ref, 0, sizeof(edge_ref));
if (!amduatd_space_scope_name(space, name, &scoped_bytes)) {
return false;
}
@ -3030,10 +3033,6 @@ static bool amduatd_seed_concept_if_missing(
goto seed_cleanup;
}
memset(&name_ref, 0, sizeof(name_ref));
memset(&concept_ref, 0, sizeof(concept_ref));
memset(&edge_ref, 0, sizeof(edge_ref));
if (!amduatd_concepts_put_name_artifact(store, scoped_name, &name_ref)) {
goto seed_cleanup;
}
@ -3219,7 +3218,8 @@ bool amduatd_seed_ms_ui_state(amduat_asl_store_t *store,
amduatd_seed_entry_t entries[3];
amduatd_strbuf_t b;
amduat_artifact_t artifact;
char *seed_latest_hex = NULL;
char *hello_hex = NULL;
bool hello_hex_owned = false;
size_t i;
bool ok = false;
@ -3353,7 +3353,6 @@ bool amduatd_seed_ms_ui_state(amduat_asl_store_t *store,
amduat_reference_t hello_ref;
amduat_reference_t title_ref;
amduat_reference_t status_ref;
char *hello_hex = NULL;
const char *payload = "hello";
memset(&hello_ref, 0, sizeof(hello_ref));
@ -3367,18 +3366,26 @@ bool amduatd_seed_ms_ui_state(amduat_asl_store_t *store,
if (!amduat_asl_ref_encode_hex(hello_ref, &hello_hex)) {
goto seed_cleanup;
}
seed_latest_hex = hello_hex;
hello_hex_owned = true;
artifact = amduat_artifact(amduat_octets("Amduat UI",
strlen("Amduat UI")));
if (!amduatd_seed_store_artifact(store, artifact, &title_ref)) {
free(hello_hex);
if (hello_hex_owned) {
free(hello_hex);
hello_hex = NULL;
hello_hex_owned = false;
}
goto seed_cleanup;
}
artifact = amduat_artifact(amduat_octets("ready", strlen("ready")));
if (!amduatd_seed_store_artifact(store, artifact, &status_ref)) {
free(hello_hex);
if (hello_hex_owned) {
free(hello_hex);
hello_hex = NULL;
hello_hex_owned = false;
}
goto seed_cleanup;
}
@ -3412,12 +3419,12 @@ bool amduatd_seed_ms_ui_state(amduat_asl_store_t *store,
entries[0].value_hex = strdup("Amduat UI");
entries[1].value_hex = strdup("ready");
entries[2].value_hex = hello_hex;
hello_hex = NULL;
hello_hex_owned = false;
if (entries[0].value_hex == NULL || entries[1].value_hex == NULL ||
entries[2].value_hex == NULL) {
goto seed_cleanup;
}
hello_hex = NULL;
seed_latest_hex = NULL;
qsort(fields, 3, sizeof(fields[0]), amduatd_seed_entry_cmp);
qsort(entries, 3, sizeof(entries[0]), amduatd_seed_entry_cmp);
@ -3648,7 +3655,9 @@ seed_cleanup:
free(entries[i].value_hex);
}
amduatd_strbuf_free(&b);
free(seed_latest_hex);
if (hello_hex_owned) {
free(hello_hex);
}
return ok;
}