diff --git a/CMakeLists.txt b/CMakeLists.txt index 8549e66..5f8f532 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -215,6 +215,11 @@ add_test(NAME amduatd_fed_smoke ) set_tests_properties(amduatd_fed_smoke PROPERTIES SKIP_RETURN_CODE 77) +add_test(NAME amduatd_fed_ingest + COMMAND bash ${CMAKE_CURRENT_SOURCE_DIR}/scripts/test_fed_ingest.sh +) +set_tests_properties(amduatd_fed_ingest PROPERTIES SKIP_RETURN_CODE 77) + add_executable(amduatd_test_space_doctor tests/test_amduatd_space_doctor.c src/amduatd_space_doctor.c diff --git a/README.md b/README.md index e94445b..53bfb91 100644 --- a/README.md +++ b/README.md @@ -129,6 +129,39 @@ curl --unix-socket amduatd.sock -X POST \ `/v1/fed/pull` requires the index backend and will not advance the cursor on partial failure. +### Federation ingest (receiver) + +`/v1/fed/ingest` applies a single incoming record (push receiver). The request +is space-scoped via `X-Amduat-Space` and requires federation to be enabled; +otherwise the daemon responds with 503. + +For artifact, per, and tgk_edge records, send raw bytes and provide metadata via +query params: + +```sh +curl --unix-socket amduatd.sock -X POST \ + 'http://localhost/v1/fed/ingest?record_type=artifact&ref=' \ + -H 'Content-Type: application/octet-stream' \ + -H 'X-Amduat-Space: demo' \ + --data-binary 'payload' +``` + +For tombstones, send a small JSON payload: + +```sh +curl --unix-socket amduatd.sock -X POST \ + 'http://localhost/v1/fed/ingest' \ + -H 'Content-Type: application/json' \ + -H 'X-Amduat-Space: demo' \ + -d '{"record_type":"tombstone","ref":""}' +``` + +Notes: + +- Record types: `artifact`, `per`, `tgk_edge`, `tombstone`. +- Size limit: 8 MiB per request. +- Tombstones use deterministic defaults: `scope=0`, `reason_code=0`. + Run the daemon with derivation indexing enabled: ```sh diff --git a/scripts/test_fed_ingest.sh b/scripts/test_fed_ingest.sh new file mode 100644 index 0000000..f2f7fa7 --- /dev/null +++ b/scripts/test_fed_ingest.sh @@ -0,0 +1,371 @@ +#!/usr/bin/env bash +set -euo pipefail + +ROOT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")/.." && pwd)" +HTTP_HELPER="${ROOT_DIR}/build/amduatd_http_unix" +USE_HTTP_HELPER=0 +TMPDIR="${TMPDIR:-/tmp}" +mkdir -p "${TMPDIR}" + +if ! command -v grep >/dev/null 2>&1; then + echo "skip: grep not found" >&2 + exit 77 +fi +if ! command -v awk >/dev/null 2>&1; then + echo "skip: awk not found" >&2 + exit 77 +fi +if command -v curl >/dev/null 2>&1; then + if curl --help 2>/dev/null | grep -q -- '--unix-socket'; then + USE_HTTP_HELPER=0 + else + USE_HTTP_HELPER=1 + fi +else + USE_HTTP_HELPER=1 +fi +if [[ "${USE_HTTP_HELPER}" -eq 1 && ! -x "${HTTP_HELPER}" ]]; then + echo "skip: curl lacks --unix-socket support and helper missing" >&2 + exit 77 +fi + +AMDUATD_BIN="${ROOT_DIR}/build/amduatd" +ASL_BIN="${ROOT_DIR}/vendor/amduat/build/amduat-asl" + +if [[ ! -x "${AMDUATD_BIN}" || ! -x "${ASL_BIN}" ]]; then + echo "missing binaries; build amduatd and amduat-asl first" >&2 + exit 1 +fi + +tmp_root="$(mktemp -d -p "${TMPDIR}" amduatd-fed-ingest-XXXXXX)" +tmp_keep="${AMDUATD_FED_INGEST_KEEP_TMP:-0}" +last_log="${TMPDIR}/amduatd-fed-ingest.last.log" +root_store="${tmp_root}/store" +root_ref="${tmp_root}/ref" +sock="${tmp_root}/amduatd.sock" +log="${tmp_root}/amduatd.log" +space_a="alpha" +space_b="beta" + +show_log() { + if [[ -n "${log}" && -f "${log}" ]]; then + echo "daemon log: ${log}" >&2 + cat "${log}" >&2 + else + echo "daemon log missing: ${log}" >&2 + fi + if [[ -f "${last_log}" ]]; then + echo "last log copy: ${last_log}" >&2 + fi +} + +die() { + echo "$1" >&2 + show_log + exit 1 +} + +cleanup() { + if [[ -n "${pid:-}" ]]; then + kill "${pid}" >/dev/null 2>&1 || true + fi + if [[ -f "${log}" ]]; then + cp -f "${log}" "${last_log}" 2>/dev/null || true + fi + if [[ "${tmp_keep}" -eq 0 ]]; then + rm -rf "${tmp_root}" + else + echo "kept tmpdir: ${tmp_root}" >&2 + fi +} +trap cleanup EXIT + +mkdir -p "${root_store}" "${root_ref}" +"${ASL_BIN}" index init --root "${root_store}" +"${ASL_BIN}" index init --root "${root_ref}" + +"${AMDUATD_BIN}" --root "${root_store}" --sock "${sock}" \ + --store-backend index --space "${space_a}" \ + --fed-enable --fed-transport stub \ + >"${log}" 2>&1 & +pid=$! + +http_get() { + local sock_path="$1" + local path="$2" + shift 2 + if [[ "${USE_HTTP_HELPER}" -eq 1 ]]; then + "${HTTP_HELPER}" --sock "${sock_path}" --method GET --path "${path}" "$@" + else + curl --silent --show-error --fail \ + --unix-socket "${sock_path}" \ + "$@" \ + "http://localhost${path}" + fi +} + +http_get_allow() { + local sock_path="$1" + local path="$2" + shift 2 + if [[ "${USE_HTTP_HELPER}" -eq 1 ]]; then + "${HTTP_HELPER}" --sock "${sock_path}" --method GET --path "${path}" \ + --allow-status "$@" + else + curl --silent --show-error \ + --unix-socket "${sock_path}" \ + "$@" \ + "http://localhost${path}" + fi +} + +http_post() { + local sock_path="$1" + local path="$2" + local data="$3" + shift 3 + if [[ "${USE_HTTP_HELPER}" -eq 1 ]]; then + "${HTTP_HELPER}" --sock "${sock_path}" --method POST --path "${path}" \ + --data "${data}" \ + "$@" + else + curl --silent --show-error --fail \ + --unix-socket "${sock_path}" \ + "$@" \ + --data-binary "${data}" \ + "http://localhost${path}" + fi +} + +http_post_allow() { + local sock_path="$1" + local path="$2" + local data="$3" + shift 3 + if [[ "${USE_HTTP_HELPER}" -eq 1 ]]; then + "${HTTP_HELPER}" --sock "${sock_path}" --method POST --path "${path}" \ + --allow-status --data "${data}" \ + "$@" + else + curl --silent --show-error \ + --unix-socket "${sock_path}" \ + "$@" \ + --data-binary "${data}" \ + "http://localhost${path}" + fi +} + +http_status() { + local sock_path="$1" + local method="$2" + local path="$3" + shift 3 + if [[ "${USE_HTTP_HELPER}" -eq 1 ]]; then + echo "" + return 0 + fi + curl --silent --show-error --output /dev/null --write-out '%{http_code}' \ + --unix-socket "${sock_path}" \ + -X "${method}" \ + "$@" \ + "http://localhost${path}" +} + +wait_for_ready() { + local sock_path="$1" + local pid_val="$2" + local log_path="$3" + local i + for i in $(seq 1 100); do + if ! kill -0 "${pid_val}" >/dev/null 2>&1; then + if [[ -f "${log_path}" ]] && grep -q "bind: Operation not permitted" "${log_path}"; then + echo "skip: bind not permitted for unix socket" >&2 + exit 77 + fi + if [[ -f "${log_path}" ]]; then + cat "${log_path}" >&2 + fi + return 1 + fi + if [[ -S "${sock_path}" ]]; then + if http_get "${sock_path}" "/v1/meta" >/dev/null 2>&1; then + return 0 + fi + fi + sleep 0.1 + done + return 1 +} + +if ! wait_for_ready "${sock}" "${pid}" "${log}"; then + die "daemon not ready" +fi + +payload="fed-ingest" +ref="$( + printf '%s' "${payload}" | "${ASL_BIN}" put --root "${root_ref}" \ + --input - --input-format raw --ref-format hex \ + | tail -n 1 | tr -d '\r\n' +)" + +ingest_resp="$( + http_post "${sock}" "/v1/fed/ingest?record_type=artifact&ref=${ref}" \ + "${payload}" \ + --header "Content-Type: application/octet-stream" \ + --header "X-Amduat-Space: ${space_a}" +)" || { + die "ingest artifact failed" +} + +status="$( + printf '%s' "${ingest_resp}" \ + | tr -d '\r\n' \ + | awk 'match($0, /"status":"[^"]+"/) {print substr($0, RSTART+10, RLENGTH-11)}' +)" +applied="$( + printf '%s' "${ingest_resp}" \ + | tr -d '\r\n' \ + | awk 'match($0, /"applied":[^,}]+/) {print substr($0, RSTART+10, RLENGTH-10)}' +)" +if [[ "${status}" != "ok" || "${applied}" != "true" ]]; then + die "unexpected ingest response: ${ingest_resp}" +fi + +fetched="$(http_get "${sock}" "/v1/artifacts/${ref}")" +if [[ "${fetched}" != "${payload}" ]]; then + die "artifact fetch mismatch" +fi + +ingest_again="$( + http_post "${sock}" "/v1/fed/ingest?record_type=artifact&ref=${ref}" \ + "${payload}" \ + --header "Content-Type: application/octet-stream" \ + --header "X-Amduat-Space: ${space_a}" +)" +status="$( + printf '%s' "${ingest_again}" \ + | tr -d '\r\n' \ + | awk 'match($0, /"status":"[^"]+"/) {print substr($0, RSTART+10, RLENGTH-11)}' +)" +applied="$( + printf '%s' "${ingest_again}" \ + | tr -d '\r\n' \ + | awk 'match($0, /"applied":[^,}]+/) {print substr($0, RSTART+10, RLENGTH-10)}' +)" +if [[ "${status}" != "already_present" && "${applied}" != "false" ]]; then + die "unexpected re-ingest response: ${ingest_again}" +fi + +tombstone_resp="$( + http_post "${sock}" "/v1/fed/ingest" \ + "{\"record_type\":\"tombstone\",\"ref\":\"${ref}\"}" \ + --header "Content-Type: application/json" \ + --header "X-Amduat-Space: ${space_a}" +)" +status="$( + printf '%s' "${tombstone_resp}" \ + | tr -d '\r\n' \ + | awk 'match($0, /"status":"[^"]+"/) {print substr($0, RSTART+10, RLENGTH-11)}' +)" +if [[ "${status}" != "ok" ]]; then + die "unexpected tombstone response: ${tombstone_resp}" +fi + +http_get_allow "${sock}" "/v1/artifacts/${ref}" >/dev/null 2>&1 || true + +tombstone_again="$( + http_post "${sock}" "/v1/fed/ingest" \ + "{\"record_type\":\"tombstone\",\"ref\":\"${ref}\"}" \ + --header "Content-Type: application/json" \ + --header "X-Amduat-Space: ${space_a}" +)" +status="$( + printf '%s' "${tombstone_again}" \ + | tr -d '\r\n' \ + | awk 'match($0, /"status":"[^"]+"/) {print substr($0, RSTART+10, RLENGTH-11)}' +)" +if [[ "${status}" != "ok" && "${status}" != "already_present" ]]; then + die "unexpected tombstone repeat response: ${tombstone_again}" +fi + +cap_resp="$( + http_post "${sock}" "/v1/capabilities" \ + "{\"kind\":\"pointer_name\",\"target\":{\"name\":\"space/${space_a}/fed/records\"},\"expiry_seconds\":3600}" \ + --header "Content-Type: application/json" \ + --header "X-Amduat-Space: ${space_a}" +)" +cap_token="$( + printf '%s' "${cap_resp}" \ + | tr -d '\r\n' \ + | awk 'match($0, /"token":"[^"]+"/) {print substr($0, RSTART+9, RLENGTH-10)}' +)" +if [[ -z "${cap_token}" ]]; then + die "failed to mint capability: ${cap_resp}" +fi + +wrong_space_resp="$( + http_post_allow "${sock}" "/v1/fed/ingest?record_type=artifact&ref=${ref}" \ + "${payload}" \ + --header "Content-Type: application/octet-stream" \ + --header "X-Amduat-Space: ${space_b}" \ + --header "X-Amduat-Capability: ${cap_token}" +)" +status="$( + printf '%s' "${wrong_space_resp}" \ + | tr -d '\r\n' \ + | awk 'match($0, /"status":"[^"]+"/) {print substr($0, RSTART+10, RLENGTH-11)}' +)" +if [[ "${status}" != "invalid" ]]; then + die "unexpected wrong-space response: ${wrong_space_resp}" +fi + +if [[ "${USE_HTTP_HELPER}" -eq 0 ]]; then + code="$(http_status "${sock}" "POST" "/v1/fed/ingest?record_type=artifact&ref=${ref}" \ + --header "Content-Type: application/octet-stream" \ + --header "X-Amduat-Space: ${space_b}" \ + --header "X-Amduat-Capability: ${cap_token}")" + if [[ "${code}" != "403" ]]; then + die "expected 403 for wrong-space capability, got ${code}" + fi +fi + +kill "${pid}" >/dev/null 2>&1 || true +wait "${pid}" >/dev/null 2>&1 || true +pid="" +rm -f "${sock}" + +"${AMDUATD_BIN}" --root "${root_store}" --sock "${sock}" \ + --store-backend index --space "${space_a}" \ + --fed-transport stub \ + >"${log}" 2>&1 & +pid=$! + +if ! wait_for_ready "${sock}" "${pid}" "${log}"; then + die "daemon (fed disabled) not ready" +fi + +disabled_resp="$( + http_post_allow "${sock}" "/v1/fed/ingest?record_type=artifact&ref=${ref}" \ + "${payload}" \ + --header "Content-Type: application/octet-stream" \ + --header "X-Amduat-Space: ${space_a}" +)" +status="$( + printf '%s' "${disabled_resp}" \ + | tr -d '\r\n' \ + | awk 'match($0, /"status":"[^"]+"/) {print substr($0, RSTART+10, RLENGTH-11)}' +)" +if [[ "${status}" != "error" ]]; then + die "unexpected disabled response: ${disabled_resp}" +fi + +if [[ "${USE_HTTP_HELPER}" -eq 0 ]]; then + code="$(http_status "${sock}" "POST" "/v1/fed/ingest?record_type=artifact&ref=${ref}" \ + --header "Content-Type: application/octet-stream" \ + --header "X-Amduat-Space: ${space_a}")" + if [[ "${code}" != "503" ]]; then + die "expected 503 for federation disabled, got ${code}" + fi +fi + +echo "ok" diff --git a/src/amduatd.c b/src/amduatd.c index 662c9fc..7d15f5a 100644 --- a/src/amduatd.c +++ b/src/amduatd.c @@ -113,6 +113,8 @@ static bool amduatd_strbuf_append_char(amduatd_strbuf_t *b, char c); static const char *const AMDUATD_DEFAULT_ROOT = ".amduat-asl"; static const char *const AMDUATD_DEFAULT_SOCK = "amduatd.sock"; static const uint64_t AMDUATD_FED_TICK_MS = 1000u; +static const size_t AMDUATD_FED_INGEST_MAX_BYTES = 8u * 1024u * 1024u; +static const size_t AMDUATD_REF_TEXT_MAX = 256u; static const char k_amduatd_contract_v1_json[] = "{" "\"contract\":\"AMDUATD/API/1\"," @@ -131,6 +133,7 @@ static const char k_amduatd_contract_v1_json[] = "{\"method\":\"GET\",\"path\":\"/v1/fed/pull/plan\"}," "{\"method\":\"POST\",\"path\":\"/v1/fed/pull\"}," "{\"method\":\"POST\",\"path\":\"/v1/fed/pull\"}," + "{\"method\":\"POST\",\"path\":\"/v1/fed/ingest\"}," "{\"method\":\"GET\",\"path\":\"/v1/fed/artifacts/{ref}\"}," "{\"method\":\"GET\",\"path\":\"/v1/fed/status\"}," "{\"method\":\"POST\",\"path\":\"/v1/concepts\"}," @@ -1087,6 +1090,140 @@ static bool amduatd_fed_records_log_name(const amduatd_space_t *space, return amduatd_space_scope_name(space, "fed/records", out_name); } +static bool amduatd_fed_ingest_parse_record_type(const char *s, + size_t len, + amduat_fed_record_type_t *out) { + if (out != NULL) { + *out = AMDUAT_FED_REC_ARTIFACT; + } + if (s == NULL || out == NULL) { + return false; + } + if (len == strlen("artifact") && memcmp(s, "artifact", len) == 0) { + *out = AMDUAT_FED_REC_ARTIFACT; + return true; + } + if (len == strlen("per") && memcmp(s, "per", len) == 0) { + *out = AMDUAT_FED_REC_PER; + return true; + } + if (len == strlen("tgk_edge") && memcmp(s, "tgk_edge", len) == 0) { + *out = AMDUAT_FED_REC_TGK_EDGE; + return true; + } + if (len == strlen("tombstone") && memcmp(s, "tombstone", len) == 0) { + *out = AMDUAT_FED_REC_TOMBSTONE; + return true; + } + return false; +} + +static bool amduatd_fed_ingest_send_response( + int fd, + int code, + const char *reason, + const char *status, + bool applied, + const amduat_reference_t *ref, + const amduatd_space_t *effective_space) { + amduatd_strbuf_t b; + char *ref_hex = NULL; + bool ok = false; + + memset(&b, 0, sizeof(b)); + if (ref != NULL && ref->digest.data != NULL && ref->digest.len != 0u) { + if (!amduat_asl_ref_encode_hex(*ref, &ref_hex)) { + return amduatd_http_send_text(fd, 500, "Internal Server Error", + "encode error\n", false); + } + } + + if (!amduatd_strbuf_append_cstr(&b, "{")) { + goto ingest_response_cleanup; + } + if (!amduatd_strbuf_append_cstr(&b, "\"status\":\"") || + !amduatd_strbuf_append_cstr(&b, status != NULL ? status : "error") || + !amduatd_strbuf_append_cstr(&b, "\",")) { + goto ingest_response_cleanup; + } + if (!amduatd_strbuf_append_cstr(&b, "\"applied\":") || + !amduatd_strbuf_append_cstr(&b, applied ? "true" : "false") || + !amduatd_strbuf_append_cstr(&b, ",")) { + goto ingest_response_cleanup; + } + if (!amduatd_strbuf_append_cstr(&b, "\"ref\":")) { + goto ingest_response_cleanup; + } + if (ref_hex != NULL) { + if (!amduatd_strbuf_append_cstr(&b, "\"") || + !amduatd_strbuf_append_cstr(&b, ref_hex) || + !amduatd_strbuf_append_cstr(&b, "\"")) { + goto ingest_response_cleanup; + } + } else { + if (!amduatd_strbuf_append_cstr(&b, "null")) { + goto ingest_response_cleanup; + } + } + if (!amduatd_strbuf_append_cstr(&b, ",\"effective_space\":{")) { + goto ingest_response_cleanup; + } + if (effective_space != NULL && effective_space->enabled && + effective_space->space_id.data != NULL) { + const char *space_id = (const char *)effective_space->space_id.data; + if (!amduatd_strbuf_append_cstr(&b, "\"mode\":\"scoped\",") || + !amduatd_strbuf_append_cstr(&b, "\"space_id\":\"") || + !amduatd_strbuf_append_cstr(&b, space_id) || + !amduatd_strbuf_append_cstr(&b, "\"")) { + goto ingest_response_cleanup; + } + } else { + if (!amduatd_strbuf_append_cstr(&b, "\"mode\":\"unscoped\",") || + !amduatd_strbuf_append_cstr(&b, "\"space_id\":null")) { + goto ingest_response_cleanup; + } + } + if (!amduatd_strbuf_append_cstr(&b, "}}\n")) { + goto ingest_response_cleanup; + } + + ok = amduatd_http_send_json(fd, + code, + reason != NULL ? reason : "OK", + b.data, + false); + +ingest_response_cleanup: + amduatd_strbuf_free(&b); + free(ref_hex); + if (!ok) { + return amduatd_http_send_text(fd, 500, "Internal Server Error", + "error\n", false); + } + return true; +} + +static void amduatd_fed_ingest_discard_body(int fd, + const amduatd_http_req_t *req, + size_t max_len) { + uint8_t buf[4096]; + size_t remaining; + if (req == NULL || req->content_length == 0u) { + return; + } + if (max_len != 0u && req->content_length > max_len) { + return; + } + remaining = req->content_length; + while (remaining > 0u) { + size_t chunk = remaining < sizeof(buf) ? remaining : sizeof(buf); + if (!amduatd_read_exact(fd, buf, chunk)) { + break; + } + remaining -= chunk; + } +} + static bool amduatd_handle_get_fed_records(int fd, amduat_asl_store_t *store, const amduatd_fed_cfg_t *fed_cfg, @@ -1856,6 +1993,684 @@ fed_cursor_post_cleanup: return true; } +static bool amduatd_handle_post_fed_ingest(int fd, + amduat_asl_store_t *store, + const amduatd_fed_cfg_t *fed_cfg, + const amduatd_caps_t *caps, + const amduatd_cfg_t *dcfg, + const amduatd_http_req_t *req) { + char record_type_buf[32]; + char ref_buf[AMDUATD_REF_TEXT_MAX + 1u]; + char target_ref_buf[AMDUATD_REF_TEXT_MAX + 1u]; + uint8_t *body = NULL; + const char *p = NULL; + const char *end = NULL; + bool have_record_type = false; + bool have_query_record_type = false; + bool have_ref = false; + bool have_ref_field = false; + bool have_target_ref_field = false; + bool have_body = false; + bool already_present = false; + amduat_fed_record_type_t record_type = AMDUAT_FED_REC_ARTIFACT; + amduat_fed_record_type_t query_record_type = AMDUAT_FED_REC_ARTIFACT; + amduat_reference_t ref; + amduat_reference_t stored_ref; + amduat_asl_store_error_t store_err; + amduat_asl_index_state_t state; + amduat_asl_io_format_t input_format = AMDUAT_ASL_IO_RAW; + amduat_type_tag_t type_tag = amduat_type_tag(0u); + bool has_type_tag = false; + + memset(&ref, 0, sizeof(ref)); + memset(&stored_ref, 0, sizeof(stored_ref)); + + if (store == NULL || fed_cfg == NULL || req == NULL || dcfg == NULL) { + return amduatd_fed_ingest_send_response(fd, + 500, + "Internal Server Error", + "error", + false, + NULL, + req != NULL ? req->effective_space + : NULL); + } + if (!fed_cfg->enabled) { + amduatd_fed_ingest_discard_body(fd, req, AMDUATD_FED_INGEST_MAX_BYTES); + return amduatd_fed_ingest_send_response(fd, + 503, + "Service Unavailable", + "error", + false, + NULL, + req->effective_space); + } + { + amduat_octets_t scoped = amduat_octets(NULL, 0u); + const char *err = NULL; + if (!amduatd_fed_scope_names(fed_cfg, + req->effective_space, + "fed", + &scoped, + &err)) { + amduatd_fed_ingest_discard_body(fd, req, AMDUATD_FED_INGEST_MAX_BYTES); + bool ok = amduatd_fed_ingest_send_response(fd, + 400, + "Bad Request", + "invalid", + false, + NULL, + req->effective_space); + amduat_octets_free(&scoped); + return ok; + } + amduat_octets_free(&scoped); + } + if (caps != NULL && caps->enabled && req->x_capability[0] != '\0') { + const char *reason = NULL; + if (!amduatd_caps_check_space(caps, dcfg, req, &reason)) { + amduatd_fed_ingest_discard_body(fd, req, AMDUATD_FED_INGEST_MAX_BYTES); + if (reason != NULL && strcmp(reason, "wrong-space") == 0) { + return amduatd_fed_ingest_send_response(fd, + 403, + "Forbidden", + "invalid", + false, + NULL, + req->effective_space); + } + return amduatd_fed_ingest_send_response(fd, + 403, + "Forbidden", + "invalid", + false, + NULL, + req->effective_space); + } + } + + if (amduatd_query_param(req->path, + "record_type", + record_type_buf, + sizeof(record_type_buf)) != NULL) { + size_t len = strlen(record_type_buf); + if (!amduatd_fed_ingest_parse_record_type(record_type_buf, + len, + &query_record_type)) { + amduatd_fed_ingest_discard_body(fd, req, AMDUATD_FED_INGEST_MAX_BYTES); + return amduatd_fed_ingest_send_response(fd, + 400, + "Bad Request", + "invalid", + false, + NULL, + req->effective_space); + } + have_record_type = true; + have_query_record_type = true; + record_type = query_record_type; + } + if (amduatd_query_param(req->path, "ref", ref_buf, sizeof(ref_buf)) != NULL && + ref_buf[0] != '\0') { + if (!amduat_asl_ref_decode_hex(ref_buf, &ref)) { + amduatd_fed_ingest_discard_body(fd, req, AMDUATD_FED_INGEST_MAX_BYTES); + return amduatd_fed_ingest_send_response(fd, + 400, + "Bad Request", + "invalid", + false, + NULL, + req->effective_space); + } + have_ref = true; + have_ref_field = true; + } + if (amduatd_query_param(req->path, + "target_ref", + target_ref_buf, + sizeof(target_ref_buf)) != NULL && + target_ref_buf[0] != '\0') { + if (have_ref_field) { + if (have_ref) { + amduatd_fed_ingest_discard_body(fd, req, AMDUATD_FED_INGEST_MAX_BYTES); + bool ok = amduatd_fed_ingest_send_response(fd, + 400, + "Bad Request", + "invalid", + false, + &ref, + req->effective_space); + amduat_reference_free(&ref); + return ok; + } + amduatd_fed_ingest_discard_body(fd, req, AMDUATD_FED_INGEST_MAX_BYTES); + return amduatd_fed_ingest_send_response(fd, + 400, + "Bad Request", + "invalid", + false, + NULL, + req->effective_space); + } + if (!amduat_asl_ref_decode_hex(target_ref_buf, &ref)) { + amduatd_fed_ingest_discard_body(fd, req, AMDUATD_FED_INGEST_MAX_BYTES); + return amduatd_fed_ingest_send_response(fd, + 400, + "Bad Request", + "invalid", + false, + NULL, + req->effective_space); + } + have_ref = true; + have_target_ref_field = true; + } + + if (req->content_length > AMDUATD_FED_INGEST_MAX_BYTES) { + amduatd_fed_ingest_discard_body(fd, req, AMDUATD_FED_INGEST_MAX_BYTES); + if (have_ref) { + bool ok = amduatd_fed_ingest_send_response(fd, + 413, + "Payload Too Large", + "invalid", + false, + &ref, + req->effective_space); + amduat_reference_free(&ref); + return ok; + } + return amduatd_fed_ingest_send_response(fd, + 413, + "Payload Too Large", + "invalid", + false, + NULL, + req->effective_space); + } + + if (req->content_type[0] != '\0' && + strstr(req->content_type, "application/json") != NULL) { + const char *parse_error = NULL; + bool json_have_record_type = false; + bool json_have_ref = false; + bool json_have_target_ref = false; + amduat_fed_record_type_t json_record_type = AMDUAT_FED_REC_ARTIFACT; + + if (req->content_length == 0u) { + if (have_ref) { + bool ok = amduatd_fed_ingest_send_response(fd, + 400, + "Bad Request", + "invalid", + false, + &ref, + req->effective_space); + amduat_reference_free(&ref); + return ok; + } + return amduatd_fed_ingest_send_response(fd, + 400, + "Bad Request", + "invalid", + false, + NULL, + req->effective_space); + } + body = (uint8_t *)malloc(req->content_length); + if (body == NULL) { + if (have_ref) { + bool ok = amduatd_fed_ingest_send_response(fd, + 500, + "Internal Server Error", + "error", + false, + &ref, + req->effective_space); + amduat_reference_free(&ref); + return ok; + } + return amduatd_fed_ingest_send_response(fd, + 500, + "Internal Server Error", + "error", + false, + NULL, + req->effective_space); + } + if (!amduatd_read_exact(fd, body, req->content_length)) { + free(body); + if (have_ref) { + amduat_reference_free(&ref); + } + return false; + } + + p = (const char *)body; + end = (const char *)body + req->content_length; + if (!amduatd_json_expect(&p, end, '{')) { + parse_error = "invalid"; + goto ingest_json_parse_fail; + } + for (;;) { + const char *key = NULL; + size_t key_len = 0; + const char *sv = NULL; + size_t sv_len = 0; + const char *cur = NULL; + + cur = amduatd_json_skip_ws(p, end); + if (cur < end && *cur == '}') { + p = cur + 1; + break; + } + if (!amduatd_json_parse_string_noesc(&p, end, &key, &key_len) || + !amduatd_json_expect(&p, end, ':')) { + parse_error = "invalid"; + goto ingest_json_parse_fail; + } + + if (key_len == strlen("record_type") && + memcmp(key, "record_type", key_len) == 0) { + if (json_have_record_type || + !amduatd_json_parse_string_noesc(&p, end, &sv, &sv_len) || + !amduatd_fed_ingest_parse_record_type(sv, + sv_len, + &json_record_type)) { + parse_error = "invalid"; + goto ingest_json_parse_fail; + } + json_have_record_type = true; + } else if (key_len == strlen("ref") && + memcmp(key, "ref", key_len) == 0) { + amduat_reference_t json_ref; + if (json_have_ref || + !amduatd_json_parse_string_noesc(&p, end, &sv, &sv_len) || + !amduatd_decode_ref_hex_str(sv, sv_len, &json_ref)) { + parse_error = "invalid"; + goto ingest_json_parse_fail; + } + if (have_target_ref_field) { + amduat_reference_free(&json_ref); + parse_error = "invalid"; + goto ingest_json_parse_fail; + } + if (have_ref && have_ref_field) { + if (!amduat_reference_eq(ref, json_ref)) { + amduat_reference_free(&json_ref); + parse_error = "invalid"; + goto ingest_json_parse_fail; + } + amduat_reference_free(&json_ref); + } else { + if (have_ref) { + amduat_reference_free(&ref); + } + ref = json_ref; + have_ref = true; + } + json_have_ref = true; + have_ref_field = true; + } else if (key_len == strlen("target_ref") && + memcmp(key, "target_ref", key_len) == 0) { + amduat_reference_t json_ref; + if (json_have_target_ref || + !amduatd_json_parse_string_noesc(&p, end, &sv, &sv_len) || + !amduatd_decode_ref_hex_str(sv, sv_len, &json_ref)) { + parse_error = "invalid"; + goto ingest_json_parse_fail; + } + if (have_ref_field) { + amduat_reference_free(&json_ref); + parse_error = "invalid"; + goto ingest_json_parse_fail; + } + if (have_ref && have_target_ref_field) { + if (!amduat_reference_eq(ref, json_ref)) { + amduat_reference_free(&json_ref); + parse_error = "invalid"; + goto ingest_json_parse_fail; + } + amduat_reference_free(&json_ref); + } else { + if (have_ref) { + amduat_reference_free(&ref); + } + ref = json_ref; + have_ref = true; + } + json_have_target_ref = true; + have_target_ref_field = true; + } else { + if (!amduatd_json_skip_value(&p, end, 0)) { + parse_error = "invalid"; + goto ingest_json_parse_fail; + } + } + + cur = amduatd_json_skip_ws(p, end); + if (cur < end && *cur == ',') { + p = cur + 1; + continue; + } + if (cur < end && *cur == '}') { + p = cur + 1; + break; + } + parse_error = "invalid"; + goto ingest_json_parse_fail; + } + + if (json_have_record_type) { + if (have_query_record_type && json_record_type != query_record_type) { + parse_error = "invalid"; + goto ingest_json_parse_fail; + } + have_record_type = true; + record_type = json_record_type; + } + if (!have_record_type || !have_ref) { + parse_error = "invalid"; + goto ingest_json_parse_fail; + } + if (record_type != AMDUAT_FED_REC_TOMBSTONE) { + parse_error = "invalid"; + goto ingest_json_parse_fail; + } + free(body); + body = NULL; + have_body = false; + +ingest_json_parse_fail: + if (parse_error != NULL) { + free(body); + body = NULL; + if (have_ref) { + bool ok = amduatd_fed_ingest_send_response(fd, + 400, + "Bad Request", + "invalid", + false, + &ref, + req->effective_space); + amduat_reference_free(&ref); + return ok; + } + return amduatd_fed_ingest_send_response(fd, + 400, + "Bad Request", + "invalid", + false, + NULL, + req->effective_space); + } + } else { + if (req->content_length > 0u) { + body = (uint8_t *)malloc(req->content_length); + if (body == NULL) { + if (have_ref) { + bool ok = amduatd_fed_ingest_send_response(fd, + 500, + "Internal Server Error", + "error", + false, + &ref, + req->effective_space); + amduat_reference_free(&ref); + return ok; + } + return amduatd_fed_ingest_send_response(fd, + 500, + "Internal Server Error", + "error", + false, + NULL, + req->effective_space); + } + if (!amduatd_read_exact(fd, body, req->content_length)) { + free(body); + if (have_ref) { + amduat_reference_free(&ref); + } + return false; + } + have_body = true; + } + } + + if (!have_record_type || !have_ref) { + free(body); + if (have_ref) { + bool ok = amduatd_fed_ingest_send_response(fd, + 400, + "Bad Request", + "invalid", + false, + &ref, + req->effective_space); + amduat_reference_free(&ref); + return ok; + } + return amduatd_fed_ingest_send_response(fd, + 400, + "Bad Request", + "invalid", + false, + NULL, + req->effective_space); + } + + if (record_type != AMDUAT_FED_REC_TOMBSTONE) { + amduat_artifact_t artifact; + amduat_artifact_t existing_artifact; + amduat_asl_store_error_t exist_err; + amduat_octets_t artifact_input; + bool artifact_input_owned = false; + + if (!have_body || req->content_length == 0u) { + free(body); + { + bool ok = amduatd_fed_ingest_send_response(fd, + 400, + "Bad Request", + "invalid", + false, + &ref, + req->effective_space); + amduat_reference_free(&ref); + return ok; + } + } + + if (req->content_type[0] != '\0' && + strstr(req->content_type, + "application/vnd.amduat.asl.artifact+v1") != NULL) { + input_format = AMDUAT_ASL_IO_ARTIFACT; + } + if (record_type == AMDUAT_FED_REC_TGK_EDGE) { + type_tag = amduat_type_tag(AMDUAT_TYPE_TAG_TGK1_EDGE_V1); + has_type_tag = true; + } else if (record_type == AMDUAT_FED_REC_PER) { + type_tag = amduat_type_tag(AMDUAT_TYPE_TAG_FER1_RECEIPT_1); + has_type_tag = true; + } + + memset(&existing_artifact, 0, sizeof(existing_artifact)); + exist_err = amduat_asl_store_get(store, ref, &existing_artifact); + if (exist_err == AMDUAT_ASL_STORE_OK) { + already_present = true; + amduat_asl_artifact_free(&existing_artifact); + } + + artifact_input = amduat_octets(body, req->content_length); + if (input_format == AMDUAT_ASL_IO_RAW) { + if (!amduat_octets_clone(artifact_input, &artifact_input)) { + free(body); + { + bool ok = amduatd_fed_ingest_send_response(fd, + 500, + "Internal Server Error", + "error", + false, + &ref, + req->effective_space); + amduat_reference_free(&ref); + return ok; + } + } + artifact_input_owned = true; + } + + memset(&artifact, 0, sizeof(artifact)); + if (!amduat_asl_artifact_from_bytes(artifact_input, + input_format, + has_type_tag, + type_tag, + &artifact)) { + if (artifact_input_owned) { + amduat_octets_free(&artifact_input); + } + free(body); + { + bool ok = amduatd_fed_ingest_send_response(fd, + 400, + "Bad Request", + "invalid", + false, + &ref, + req->effective_space); + amduat_reference_free(&ref); + return ok; + } + } + free(body); + body = NULL; + + if (input_format == AMDUAT_ASL_IO_ARTIFACT && has_type_tag && + (!artifact.has_type_tag || + artifact.type_tag.tag_id != type_tag.tag_id)) { + amduat_asl_artifact_free(&artifact); + { + bool ok = amduatd_fed_ingest_send_response(fd, + 400, + "Bad Request", + "invalid", + false, + &ref, + req->effective_space); + amduat_reference_free(&ref); + return ok; + } + } + + if (store->ops.put_indexed == NULL) { + amduat_asl_artifact_free(&artifact); + { + bool ok = amduatd_fed_ingest_send_response(fd, + 501, + "Not Implemented", + "error", + false, + &ref, + req->effective_space); + amduat_reference_free(&ref); + return ok; + } + } + store_err = amduat_asl_store_put_indexed(store, artifact, &stored_ref, + &state); + amduat_asl_artifact_free(&artifact); + if (store_err != AMDUAT_ASL_STORE_OK) { + bool ok = amduatd_fed_ingest_send_response(fd, + 500, + "Internal Server Error", + "error", + false, + &ref, + req->effective_space); + amduat_reference_free(&ref); + return ok; + } + if (!amduat_reference_eq(ref, stored_ref)) { + amduat_reference_free(&stored_ref); + { + bool ok = amduatd_fed_ingest_send_response(fd, + 400, + "Bad Request", + "invalid", + false, + NULL, + req->effective_space); + amduat_reference_free(&ref); + return ok; + } + } + amduat_reference_free(&stored_ref); + { + bool ok = amduatd_fed_ingest_send_response( + fd, + 200, + "OK", + already_present ? "already_present" : "ok", + !already_present, + &ref, + req->effective_space); + amduat_reference_free(&ref); + return ok; + } + } + + if (have_body && req->content_length != 0u) { + free(body); + { + bool ok = amduatd_fed_ingest_send_response(fd, + 400, + "Bad Request", + "invalid", + false, + &ref, + req->effective_space); + amduat_reference_free(&ref); + return ok; + } + } + free(body); + body = NULL; + if (store->ops.tombstone == NULL) { + bool ok = amduatd_fed_ingest_send_response(fd, + 501, + "Not Implemented", + "error", + false, + &ref, + req->effective_space); + amduat_reference_free(&ref); + return ok; + } + store_err = amduat_asl_store_tombstone(store, ref, 0u, 0u, &state); + if (store_err != AMDUAT_ASL_STORE_OK) { + bool ok = amduatd_fed_ingest_send_response(fd, + 500, + "Internal Server Error", + "error", + false, + &ref, + req->effective_space); + amduat_reference_free(&ref); + return ok; + } + { + bool ok = amduatd_fed_ingest_send_response(fd, + 200, + "OK", + "ok", + true, + &ref, + req->effective_space); + amduat_reference_free(&ref); + return ok; + } +} + static bool amduatd_handle_get_fed_pull_plan(int fd, amduat_asl_store_t *store, const amduatd_fed_cfg_t *fed_cfg, @@ -5284,6 +6099,16 @@ static bool amduatd_handle_conn(int fd, root_path); goto conn_cleanup; } + if (strcmp(req.method, "POST") == 0 && + strcmp(no_query, "/v1/fed/ingest") == 0) { + ok = amduatd_handle_post_fed_ingest(fd, + store, + fed_cfg, + caps, + effective_cfg, + &req); + goto conn_cleanup; + } if (strcmp(req.method, "POST") == 0 && strcmp(no_query, "/v1/fed/cursor") == 0) { ok = amduatd_handle_post_fed_cursor(fd, diff --git a/src/amduatd_concepts.c b/src/amduatd_concepts.c index f66f621..feb1d85 100644 --- a/src/amduatd_concepts.c +++ b/src/amduatd_concepts.c @@ -3020,6 +3020,9 @@ static bool amduatd_seed_concept_if_missing( out_concept_ref == NULL) { return false; } + memset(&name_ref, 0, sizeof(name_ref)); + memset(&concept_ref, 0, sizeof(concept_ref)); + memset(&edge_ref, 0, sizeof(edge_ref)); if (!amduatd_space_scope_name(space, name, &scoped_bytes)) { return false; } @@ -3030,10 +3033,6 @@ static bool amduatd_seed_concept_if_missing( goto seed_cleanup; } - memset(&name_ref, 0, sizeof(name_ref)); - memset(&concept_ref, 0, sizeof(concept_ref)); - memset(&edge_ref, 0, sizeof(edge_ref)); - if (!amduatd_concepts_put_name_artifact(store, scoped_name, &name_ref)) { goto seed_cleanup; } @@ -3219,7 +3218,8 @@ bool amduatd_seed_ms_ui_state(amduat_asl_store_t *store, amduatd_seed_entry_t entries[3]; amduatd_strbuf_t b; amduat_artifact_t artifact; - char *seed_latest_hex = NULL; + char *hello_hex = NULL; + bool hello_hex_owned = false; size_t i; bool ok = false; @@ -3353,7 +3353,6 @@ bool amduatd_seed_ms_ui_state(amduat_asl_store_t *store, amduat_reference_t hello_ref; amduat_reference_t title_ref; amduat_reference_t status_ref; - char *hello_hex = NULL; const char *payload = "hello"; memset(&hello_ref, 0, sizeof(hello_ref)); @@ -3367,18 +3366,26 @@ bool amduatd_seed_ms_ui_state(amduat_asl_store_t *store, if (!amduat_asl_ref_encode_hex(hello_ref, &hello_hex)) { goto seed_cleanup; } - seed_latest_hex = hello_hex; + hello_hex_owned = true; artifact = amduat_artifact(amduat_octets("Amduat UI", strlen("Amduat UI"))); if (!amduatd_seed_store_artifact(store, artifact, &title_ref)) { - free(hello_hex); + if (hello_hex_owned) { + free(hello_hex); + hello_hex = NULL; + hello_hex_owned = false; + } goto seed_cleanup; } artifact = amduat_artifact(amduat_octets("ready", strlen("ready"))); if (!amduatd_seed_store_artifact(store, artifact, &status_ref)) { - free(hello_hex); + if (hello_hex_owned) { + free(hello_hex); + hello_hex = NULL; + hello_hex_owned = false; + } goto seed_cleanup; } @@ -3412,12 +3419,12 @@ bool amduatd_seed_ms_ui_state(amduat_asl_store_t *store, entries[0].value_hex = strdup("Amduat UI"); entries[1].value_hex = strdup("ready"); entries[2].value_hex = hello_hex; + hello_hex = NULL; + hello_hex_owned = false; if (entries[0].value_hex == NULL || entries[1].value_hex == NULL || entries[2].value_hex == NULL) { goto seed_cleanup; } - hello_hex = NULL; - seed_latest_hex = NULL; qsort(fields, 3, sizeof(fields[0]), amduatd_seed_entry_cmp); qsort(entries, 3, sizeof(entries[0]), amduatd_seed_entry_cmp); @@ -3648,7 +3655,9 @@ seed_cleanup: free(entries[i].value_hex); } amduatd_strbuf_free(&b); - free(seed_latest_hex); + if (hello_hex_owned) { + free(hello_hex); + } return ok; }