diff --git a/CMakeLists.txt b/CMakeLists.txt index 5f8f532..2a1b933 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -28,7 +28,8 @@ set(amduatd_sources src/amduatd.c src/amduatd_http.c src/amduatd_caps.c src/amduatd_space.c src/amduatd_concepts.c src/amduatd_store.c src/amduatd_derivation_index.c src/amduatd_fed.c src/amduatd_fed_cursor.c - src/amduatd_fed_pull_plan.c src/amduatd_fed_pull_apply.c + src/amduatd_fed_pull_plan.c src/amduatd_fed_push_plan.c + src/amduatd_fed_pull_apply.c src/amduatd_fed_push_apply.c src/amduatd_space_doctor.c) if(AMDUATD_ENABLE_UI) list(APPEND amduatd_sources src/amduatd_ui.c) @@ -176,12 +177,57 @@ target_include_directories(amduatd_test_fed_pull_plan ) target_link_libraries(amduatd_test_fed_pull_plan - PRIVATE amduat_asl amduat_asl_record amduat_enc amduat_util amduat_hash_asl1 + PRIVATE amduat_asl amduat_asl_record amduat_asl_log_store amduat_enc amduat_util amduat_hash_asl1 amduat_asl_pointer_fs ) add_test(NAME amduatd_fed_pull_plan COMMAND amduatd_test_fed_pull_plan) +add_executable(amduatd_test_fed_push_plan + tests/test_amduatd_fed_push_plan.c + src/amduatd_fed_push_plan.c + src/amduatd_fed_cursor.c + src/amduatd_fed.c + src/amduatd_space.c +) + +target_include_directories(amduatd_test_fed_push_plan + PRIVATE ${CMAKE_CURRENT_SOURCE_DIR} + PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}/src + PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}/vendor/amduat/include +) + +target_link_libraries(amduatd_test_fed_push_plan + PRIVATE amduat_asl amduat_asl_record amduat_asl_log_store amduat_enc amduat_util amduat_hash_asl1 + amduat_asl_pointer_fs +) + +add_test(NAME amduatd_fed_push_plan COMMAND amduatd_test_fed_push_plan) + +add_executable(amduatd_test_fed_push_apply + tests/test_amduatd_fed_push_apply.c + src/amduatd_fed_push_apply.c + src/amduatd_fed_push_plan.c + src/amduatd_fed_cursor.c + src/amduatd_fed.c + src/amduatd_space.c + src/amduatd_store.c +) + +target_include_directories(amduatd_test_fed_push_apply + PRIVATE ${CMAKE_CURRENT_SOURCE_DIR} + PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}/src + PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}/vendor/amduat/include +) + +target_link_libraries(amduatd_test_fed_push_apply + PRIVATE amduat_asl_store_fs amduat_asl_store_index_fs amduat_asl_record + amduat_asl amduat_asl_log_store amduat_enc amduat_asl_pointer_fs amduat_util + amduat_hash_asl1 +) + +add_test(NAME amduatd_fed_push_apply COMMAND amduatd_test_fed_push_apply) + add_executable(amduatd_test_fed_pull_apply tests/test_amduatd_fed_pull_apply.c src/amduatd_fed_pull_apply.c diff --git a/README.md b/README.md index 53bfb91..c217c16 100644 --- a/README.md +++ b/README.md @@ -45,7 +45,7 @@ Run the daemon with the index-backed store: ./build/amduatd --root .amduat-asl --sock amduatd.sock --store-backend index ``` -Note: `/v1/fed/records` requires the index backend. +Note: `/v1/fed/records` and `/v1/fed/push/plan` require the index backend. ## Federation (dev) @@ -79,6 +79,9 @@ records (`fed/cursor` schema). A peer key should be a stable identifier for the remote (for example a federation registry domain id rendered as a decimal string). +Push cursors are separate from pull cursors and live under +`fed/push_cursor//head` (space scoped). + Read the current cursor for a peer: ```sh @@ -129,6 +132,34 @@ curl --unix-socket amduatd.sock -X POST \ `/v1/fed/pull` requires the index backend and will not advance the cursor on partial failure. +### Federation push plan (sender dry run) + +Compute a read-only plan of what would be sent to a peer from the local log +since the push cursor (does not advance the cursor): + +```sh +curl --unix-socket amduatd.sock \ + 'http://localhost/v1/fed/push/plan?peer=2&limit=128' \ + -H 'X-Amduat-Space: demo' +``` + +`/v1/fed/push/plan` requires the index backend and uses a push cursor separate +from the pull cursor. + +### Federation push (sender apply) + +Send local records to a peer (advances the push cursor only after all records +apply successfully on the peer): + +```sh +curl --unix-socket amduatd.sock -X POST \ + 'http://localhost/v1/fed/push?peer=2&limit=128' \ + -H 'X-Amduat-Space: demo' +``` + +`/v1/fed/push` uses `/v1/fed/ingest` on the peer and only advances the push +cursor after the batch completes. It requires the index backend. + ### Federation ingest (receiver) `/v1/fed/ingest` applies a single incoming record (push receiver). The request @@ -182,8 +213,8 @@ Dev loop (build + restart): ## Federation smoke test -Run the end-to-end federation smoke test (starts two local daemons, pulls a -record, and verifies the artifact was replicated): +Run the end-to-end federation smoke test (starts two local daemons, verifies +pull replication A→B and push replication B→A, and checks cursors): ```sh ./scripts/test_fed_smoke.sh @@ -335,7 +366,9 @@ When the daemon uses the `fs` store backend, index-only checks are reported as - `GET /v1/fed/cursor?peer=...` → `{peer_key, space_id, last_logseq, last_record_hash, ref}` - `POST /v1/fed/cursor?peer=...` → `{ref}` (CAS update; `expected_ref` in body) - `GET /v1/fed/pull/plan?peer=...&limit=...` → `{peer, effective_space, cursor, remote_scan, records, next_cursor_candidate, ...}` +- `GET /v1/fed/push/plan?peer=...&limit=...` → `{peer, domain_id, effective_space, cursor, scan, records, required_artifacts, next_cursor_candidate}` - `POST /v1/fed/pull?peer=...&limit=...` → `{peer, effective_space, cursor_before, plan_summary, applied, cursor_after, errors}` +- `POST /v1/fed/push?peer=...&limit=...` → `{peer, domain_id, effective_space, cursor_before, plan_summary, sent, cursor_after, errors}` - `GET /v1/fed/artifacts/{ref}` → raw bytes for federation resolve - `GET /v1/fed/status` → `{status, domain_id, registry_ref, last_tick_ms}` - `POST /v1/artifacts` diff --git a/federation/transport_unix.c b/federation/transport_unix.c index e9fc052..647563a 100644 --- a/federation/transport_unix.c +++ b/federation/transport_unix.c @@ -1102,3 +1102,145 @@ bool amduat_fed_transport_unix_get_artifact_with_status( free(buf); return true; } + +static const char *amduat_fed_transport_unix_record_type_name( + amduat_fed_record_type_t type) { + switch (type) { + case AMDUAT_FED_REC_ARTIFACT: + return "artifact"; + case AMDUAT_FED_REC_PER: + return "per"; + case AMDUAT_FED_REC_TGK_EDGE: + return "tgk_edge"; + case AMDUAT_FED_REC_TOMBSTONE: + return "tombstone"; + default: + return NULL; + } +} + +bool amduat_fed_transport_unix_post_ingest( + amduat_fed_transport_unix_t *transport, + amduat_fed_record_type_t record_type, + amduat_reference_t ref, + amduat_octets_t bytes, + int *out_status, + char **out_body) { + char *ref_hex = NULL; + const char *type_name = NULL; + char space_header[AMDUAT_ASL_POINTER_NAME_MAX + 32u]; + const char *space_line = ""; + char req[2048]; + char *body_copy = NULL; + const uint8_t *resp_body = NULL; + size_t resp_len = 0; + int status = 0; + int fd; + uint8_t *buf = NULL; + size_t buf_len = 0; + const char *body = NULL; + size_t body_len = 0; + const char *content_type = "application/octet-stream"; + char json_body[512]; + + if (out_status != NULL) { + *out_status = 0; + } + if (out_body != NULL) { + *out_body = NULL; + } + if (transport == NULL || out_status == NULL) { + return false; + } + type_name = amduat_fed_transport_unix_record_type_name(record_type); + if (type_name == NULL) { + return false; + } + if (!amduat_asl_ref_encode_hex(ref, &ref_hex)) { + return false; + } + + if (transport->has_space) { + snprintf(space_header, + sizeof(space_header), + "X-Amduat-Space: %s\r\n", + transport->space_id); + space_line = space_header; + } + + if (record_type == AMDUAT_FED_REC_TOMBSTONE) { + int n = snprintf(json_body, + sizeof(json_body), + "{\"record_type\":\"tombstone\",\"ref\":\"%s\"}", + ref_hex); + if (n <= 0 || (size_t)n >= sizeof(json_body)) { + free(ref_hex); + return false; + } + content_type = "application/json"; + body = json_body; + body_len = (size_t)n; + } else { + body = (const char *)bytes.data; + body_len = bytes.len; + if (body_len != 0u && body == NULL) { + free(ref_hex); + return false; + } + } + + snprintf(req, sizeof(req), + "POST /v1/fed/ingest?record_type=%s&ref=%s HTTP/1.1\r\n" + "Host: localhost\r\n" + "%s" + "Content-Type: %s\r\n" + "Content-Length: %zu\r\n" + "Connection: close\r\n" + "\r\n", + type_name, + ref_hex, + space_line, + content_type, + body_len); + free(ref_hex); + + fd = amduat_fed_transport_unix_connect(transport->socket_path); + if (fd < 0) { + return false; + } + if (!amduat_fed_transport_unix_send_all(fd, req, strlen(req)) || + (body_len != 0u && + !amduat_fed_transport_unix_send_all(fd, body, body_len))) { + close(fd); + return false; + } + if (!amduat_fed_transport_unix_read_all(fd, &buf, &buf_len)) { + close(fd); + return false; + } + close(fd); + + if (!amduat_fed_transport_unix_split_response(buf, + buf_len, + &resp_body, + &resp_len, + &status)) { + free(buf); + return false; + } + *out_status = status; + if (out_body != NULL && resp_body != NULL) { + body_copy = (char *)malloc(resp_len + 1u); + if (body_copy == NULL) { + free(buf); + return false; + } + if (resp_len != 0u) { + memcpy(body_copy, resp_body, resp_len); + } + body_copy[resp_len] = '\0'; + *out_body = body_copy; + } + free(buf); + return true; +} diff --git a/federation/transport_unix.h b/federation/transport_unix.h index 3c940e7..ccfd95b 100644 --- a/federation/transport_unix.h +++ b/federation/transport_unix.h @@ -3,6 +3,7 @@ #include "federation/coord.h" #include "amduat/asl/asl_pointer_fs.h" +#include "amduat/fed/replay.h" #ifdef __cplusplus extern "C" { @@ -46,6 +47,15 @@ bool amduat_fed_transport_unix_get_artifact_with_status( amduat_octets_t *out_bytes, char **out_body); +/* Returns true on successful HTTP exchange. Caller frees out_body with free. */ +bool amduat_fed_transport_unix_post_ingest( + amduat_fed_transport_unix_t *transport, + amduat_fed_record_type_t record_type, + amduat_reference_t ref, + amduat_octets_t bytes, + int *out_status, + char **out_body); + #ifdef __cplusplus } /* extern "C" */ #endif diff --git a/scripts/test_fed_smoke.sh b/scripts/test_fed_smoke.sh index 39166e9..5fd251b 100644 --- a/scripts/test_fed_smoke.sh +++ b/scripts/test_fed_smoke.sh @@ -123,6 +123,24 @@ http_post() { fi } +http_post_allow() { + local sock="$1" + local path="$2" + local data="$3" + shift 3 + if [[ "${USE_HTTP_HELPER}" -eq 1 ]]; then + "${HTTP_HELPER}" --sock "${sock}" --method POST --path "${path}" \ + --data "${data}" --allow-status \ + "$@" + else + curl --silent --show-error \ + --unix-socket "${sock}" \ + "$@" \ + --data-binary "${data}" \ + "http://localhost${path}" + fi +} + wait_for_ready() { local sock="$1" local pid="$2" @@ -195,6 +213,11 @@ plan_resp="$( fi exit 1 } +if ! echo "${plan_resp}" | grep -q "\"record_count\":"; then + echo "pull plan malformed" >&2 + echo "plan response: ${plan_resp}" >&2 + exit 1 +fi if echo "${plan_resp}" | grep -q "\"record_count\":0"; then echo "pull plan empty" >&2 echo "plan response: ${plan_resp}" >&2 @@ -253,4 +276,113 @@ if [[ "${payload_b}" != "${payload}" ]]; then exit 1 fi +payload_push="fed-smoke-push" +artifact_resp_push="$( + http_post "${sock_b}" "/v1/artifacts" "${payload_push}" \ + --header "Content-Type: application/octet-stream" \ + --header "X-Amduat-Space: ${space_id}" +)" || { + echo "artifact POST failed on B" >&2 + if [[ -f "${log_b}" ]]; then + cat "${log_b}" >&2 + fi + exit 1 +} +ref_push="$( + printf '%s' "${artifact_resp_push}" \ + | tr -d '\r\n' \ + | awk 'match($0, /"ref":"[^"]+"/) {print substr($0, RSTART+7, RLENGTH-8)}' +)" + +if [[ -z "${ref_push}" ]]; then + echo "failed to parse ref from daemon B" >&2 + echo "artifact response: ${artifact_resp_push}" >&2 + if [[ -f "${log_b}" ]]; then + cat "${log_b}" >&2 + fi + exit 1 +fi + +push_plan_resp="$( + http_get_allow "${sock_b}" "/v1/fed/push/plan?peer=1&limit=8" \ + --header "X-Amduat-Space: ${space_id}" +)" || { + echo "push plan failed" >&2 + if [[ -f "${log_b}" ]]; then + cat "${log_b}" >&2 + fi + exit 1 +} +if ! echo "${push_plan_resp}" | grep -q "\"record_count\":"; then + echo "push plan malformed (missing endpoint?)" >&2 + echo "push plan response: ${push_plan_resp}" >&2 + exit 1 +fi +if echo "${push_plan_resp}" | grep -q "\"record_count\":0"; then + echo "push plan empty" >&2 + echo "push plan response: ${push_plan_resp}" >&2 + exit 1 +fi +push_cursor_before="$( + printf '%s' "${push_plan_resp}" \ + | tr -d '\r\n' \ + | awk 'match($0, /"cursor":\{[^}]*\}/) {seg=substr($0, RSTART, RLENGTH); if (match(seg, /"last_logseq":[0-9]+/)) {print substr(seg, RSTART+14, RLENGTH-14)}}' +)" + +push_resp="$( + http_post_allow "${sock_b}" "/v1/fed/push?peer=1&limit=8" "" \ + --header "X-Amduat-Space: ${space_id}" +)" || { + echo "push apply failed" >&2 + if [[ -f "${log_b}" ]]; then + cat "${log_b}" >&2 + fi + exit 1 +} +if ! echo "${push_resp}" | grep -q "\"advanced\":true"; then + echo "push did not advance cursor" >&2 + echo "push response: ${push_resp}" >&2 + exit 1 +fi + +payload_a="$( + http_get "${sock_a}" "/v1/artifacts/${ref_push}" \ + --header "X-Amduat-Space: ${space_id}" +)" || { + echo "artifact fetch failed on A" >&2 + if [[ -f "${log_a}" ]]; then + cat "${log_a}" >&2 + fi + exit 1 +} + +if [[ "${payload_a}" != "${payload_push}" ]]; then + echo "payload mismatch after push" >&2 + exit 1 +fi + +push_plan_after="$( + http_get_allow "${sock_b}" "/v1/fed/push/plan?peer=1&limit=1" \ + --header "X-Amduat-Space: ${space_id}" +)" || { + echo "push plan after failed" >&2 + if [[ -f "${log_b}" ]]; then + cat "${log_b}" >&2 + fi + exit 1 +} +push_cursor_after="$( + printf '%s' "${push_plan_after}" \ + | tr -d '\r\n' \ + | awk 'match($0, /"cursor":\{[^}]*\}/) {seg=substr($0, RSTART, RLENGTH); if (match(seg, /"last_logseq":[0-9]+/)) {print substr(seg, RSTART+14, RLENGTH-14)}}' +)" +if [[ -n "${push_cursor_before}" && -n "${push_cursor_after}" ]]; then + if [[ "${push_cursor_after}" -lt "${push_cursor_before}" ]]; then + echo "push cursor did not advance" >&2 + echo "cursor before: ${push_cursor_before}" >&2 + echo "cursor after: ${push_cursor_after}" >&2 + exit 1 + fi +fi + echo "fed smoke ok" diff --git a/src/amduatd.c b/src/amduatd.c index 7d15f5a..11f6ed9 100644 --- a/src/amduatd.c +++ b/src/amduatd.c @@ -43,6 +43,8 @@ #include "amduatd_fed.h" #include "amduatd_fed_cursor.h" #include "amduatd_fed_pull_plan.h" +#include "amduatd_fed_push_plan.h" +#include "amduatd_fed_push_apply.h" #include "amduatd_fed_pull_apply.h" #include "amduatd_store.h" #include "amduatd_derivation_index.h" @@ -68,11 +70,6 @@ #include #include -enum { - AMDUATD_FED_LOG_KIND_ARTIFACT = 1u, - AMDUATD_FED_LOG_KIND_TOMBSTONE = 2u -}; - typedef struct amduatd_strbuf { char *data; size_t len; @@ -131,7 +128,9 @@ static const char k_amduatd_contract_v1_json[] = "{\"method\":\"GET\",\"path\":\"/v1/fed/cursor\"}," "{\"method\":\"POST\",\"path\":\"/v1/fed/cursor\"}," "{\"method\":\"GET\",\"path\":\"/v1/fed/pull/plan\"}," + "{\"method\":\"GET\",\"path\":\"/v1/fed/push/plan\"}," "{\"method\":\"POST\",\"path\":\"/v1/fed/pull\"}," + "{\"method\":\"POST\",\"path\":\"/v1/fed/push\"}," "{\"method\":\"POST\",\"path\":\"/v1/fed/pull\"}," "{\"method\":\"POST\",\"path\":\"/v1/fed/ingest\"}," "{\"method\":\"GET\",\"path\":\"/v1/fed/artifacts/{ref}\"}," @@ -2890,6 +2889,133 @@ static bool amduatd_handle_get_fed_pull_plan(int fd, } } +static bool amduatd_handle_get_fed_push_plan(int fd, + amduat_asl_store_t *store, + const amduatd_fed_cfg_t *fed_cfg, + const amduatd_caps_t *caps, + const amduatd_cfg_t *dcfg, + const amduatd_http_req_t *req, + const char *root_path) { + char peer_buf[AMDUAT_ASL_POINTER_NAME_MAX + 1u]; + char limit_buf[32]; + uint64_t limit = 128u; + uint32_t domain_id = 0u; + amduatd_fed_push_plan_status_t plan_status; + amduat_asl_pointer_store_t pointer_store; + amduat_asl_index_state_t state; + amduatd_fed_push_plan_scan_t scan; + char *json = NULL; + amduatd_fed_push_plan_input_t input; + + if (store == NULL || fed_cfg == NULL || req == NULL || root_path == NULL || + dcfg == NULL) { + return amduatd_send_json_error(fd, 500, "Internal Server Error", + "internal error"); + } + if (!amduatd_fed_require_space(fd, fed_cfg, req)) { + return false; + } + plan_status = amduatd_fed_push_plan_check(fed_cfg, store); + if (plan_status == AMDUATD_FED_PUSH_PLAN_ERR_DISABLED) { + return amduatd_send_json_error(fd, 503, "Service Unavailable", + "federation disabled"); + } + if (plan_status == AMDUATD_FED_PUSH_PLAN_ERR_UNSUPPORTED) { + return amduatd_send_json_error(fd, 501, "Not Implemented", + "requires index backend"); + } + if (plan_status != AMDUATD_FED_PUSH_PLAN_OK) { + return amduatd_send_json_error(fd, 500, "Internal Server Error", + "plan unavailable"); + } + + if (caps != NULL && caps->enabled && req->x_capability[0] != '\0') { + const char *reason = NULL; + if (!amduatd_caps_check_space(caps, dcfg, req, &reason)) { + if (reason != NULL && strcmp(reason, "wrong-space") == 0) { + return amduatd_send_json_error(fd, 403, "Forbidden", + "space not permitted by capability"); + } + return amduatd_send_json_error(fd, 403, "Forbidden", + "invalid capability"); + } + } + + if (amduatd_query_param(req->path, "peer", + peer_buf, sizeof(peer_buf)) == NULL || + peer_buf[0] == '\0') { + return amduatd_send_json_error(fd, 400, "Bad Request", + "missing peer"); + } + { + amduat_octets_t scoped = amduat_octets(NULL, 0u); + if (!amduatd_fed_push_cursor_pointer_name(req->effective_space, + peer_buf, + &scoped)) { + return amduatd_send_json_error(fd, 400, "Bad Request", "invalid peer"); + } + amduat_octets_free(&scoped); + } + if (!amduatd_parse_u32_str(peer_buf, &domain_id)) { + return amduatd_send_json_error(fd, 400, "Bad Request", "invalid peer"); + } + + if (amduatd_query_param(req->path, "limit", + limit_buf, sizeof(limit_buf)) != NULL) { + if (!amduatd_parse_u64_str(limit_buf, &limit) || limit == 0u || + limit > 2048u) { + return amduatd_send_json_error(fd, 400, "Bad Request", "invalid limit"); + } + } + + if (!amduat_asl_pointer_store_init(&pointer_store, root_path)) { + return amduatd_send_json_error(fd, 500, "Internal Server Error", + "pointer store error"); + } + if (!amduat_asl_index_current_state(store, &state)) { + return amduatd_send_json_error(fd, 500, "Internal Server Error", + "store error"); + } + (void)state; + + plan_status = amduatd_fed_push_plan_scan(store, + &pointer_store, + req->effective_space, + peer_buf, + limit, + root_path, + &scan); + if (plan_status != AMDUATD_FED_PUSH_PLAN_OK) { + amduatd_fed_push_plan_scan_free(&scan); + return amduatd_send_json_error(fd, 500, "Internal Server Error", + "plan scan failed"); + } + + memset(&input, 0, sizeof(input)); + input.peer_key = peer_buf; + input.domain_id = domain_id; + input.effective_space = req->effective_space; + input.cursor_present = scan.cursor_present; + input.cursor = scan.cursor_present ? &scan.cursor : NULL; + input.cursor_ref = scan.cursor_present ? &scan.cursor_ref : NULL; + input.records = scan.records; + input.record_count = scan.record_count; + + plan_status = amduatd_fed_push_plan_json(&input, &json); + if (plan_status != AMDUATD_FED_PUSH_PLAN_OK || json == NULL) { + amduatd_fed_push_plan_scan_free(&scan); + return amduatd_send_json_error(fd, 500, "Internal Server Error", + "plan encode failed"); + } + + { + bool ok = amduatd_http_send_json(fd, 200, "OK", json, false); + free(json); + amduatd_fed_push_plan_scan_free(&scan); + return ok; + } +} + static bool amduatd_handle_post_artifacts(int fd, amduat_asl_store_t *store, const amduatd_http_req_t *req, @@ -4359,6 +4485,22 @@ static bool amduatd_fed_pull_unix_get_artifact(void *ctx, out_body); } +static bool amduatd_fed_push_unix_post_ingest( + void *ctx, + amduat_fed_record_type_t record_type, + amduat_reference_t ref, + amduat_octets_t bytes, + int *out_status, + char **out_body) { + return amduat_fed_transport_unix_post_ingest( + (amduat_fed_transport_unix_t *)ctx, + record_type, + ref, + bytes, + out_status, + out_body); +} + static bool amduatd_handle_post_fed_pull(int fd, amduat_asl_store_t *store, const amduatd_fed_cfg_t *fed_cfg, @@ -4730,6 +4872,432 @@ fed_pull_cleanup: } return true; } + +static bool amduatd_handle_post_fed_push(int fd, + amduat_asl_store_t *store, + const amduatd_fed_cfg_t *fed_cfg, + const amduatd_caps_t *caps, + const amduatd_cfg_t *dcfg, + const amduatd_http_req_t *req, + const char *root_path) { + char peer_buf[AMDUAT_ASL_POINTER_NAME_MAX + 1u]; + char limit_buf[32]; + uint64_t limit = 128u; + uint32_t domain_id = 0u; + amduat_asl_pointer_store_t pointer_store; + amduat_fed_transport_unix_t transport; + amduatd_fed_push_transport_t push_transport; + amduatd_fed_push_apply_report_t report; + amduatd_fed_push_apply_status_t status; + amduatd_strbuf_t b; + char *cursor_ref_hex = NULL; + char *cursor_after_hex = NULL; + char *candidate_ref_hex = NULL; + bool ok = false; + int http_status = 200; + const char *http_reason = "OK"; + + if (store == NULL || fed_cfg == NULL || req == NULL || root_path == NULL || + dcfg == NULL) { + return amduatd_send_json_error(fd, 500, "Internal Server Error", + "internal error"); + } + if (!amduatd_fed_require_space(fd, fed_cfg, req)) { + return false; + } + { + amduatd_fed_push_plan_status_t check = + amduatd_fed_push_plan_check(fed_cfg, store); + if (check == AMDUATD_FED_PUSH_PLAN_ERR_DISABLED) { + return amduatd_send_json_error(fd, 503, "Service Unavailable", + "federation disabled"); + } + if (check == AMDUATD_FED_PUSH_PLAN_ERR_UNSUPPORTED) { + return amduatd_send_json_error(fd, 501, "Not Implemented", + "requires index backend"); + } + } + + if (caps != NULL && caps->enabled && req->x_capability[0] != '\0') { + const char *reason = NULL; + if (!amduatd_caps_check_space(caps, dcfg, req, &reason)) { + if (reason != NULL && strcmp(reason, "wrong-space") == 0) { + return amduatd_send_json_error(fd, 403, "Forbidden", + "space not permitted by capability"); + } + return amduatd_send_json_error(fd, 403, "Forbidden", + "invalid capability"); + } + } + + if (amduatd_query_param(req->path, "peer", + peer_buf, sizeof(peer_buf)) == NULL || + peer_buf[0] == '\0') { + return amduatd_send_json_error(fd, 400, "Bad Request", + "missing peer"); + } + { + amduat_octets_t scoped = amduat_octets(NULL, 0u); + if (!amduatd_fed_push_cursor_pointer_name(req->effective_space, + peer_buf, + &scoped)) { + return amduatd_send_json_error(fd, 400, "Bad Request", "invalid peer"); + } + amduat_octets_free(&scoped); + } + if (!amduatd_parse_u32_str(peer_buf, &domain_id)) { + return amduatd_send_json_error(fd, 400, "Bad Request", "invalid peer"); + } + + if (amduatd_query_param(req->path, "limit", + limit_buf, sizeof(limit_buf)) != NULL) { + if (!amduatd_parse_u64_str(limit_buf, &limit) || limit == 0u || + limit > 2048u) { + return amduatd_send_json_error(fd, 400, "Bad Request", "invalid limit"); + } + } + + if (!amduat_asl_pointer_store_init(&pointer_store, root_path)) { + return amduatd_send_json_error(fd, 500, "Internal Server Error", + "pointer store error"); + } + + if (fed_cfg->transport_kind != AMDUATD_FED_TRANSPORT_UNIX || + !fed_cfg->unix_socket_set) { + return amduatd_send_json_error(fd, 501, "Not Implemented", + "federation transport unavailable"); + } + if (!amduat_fed_transport_unix_init(&transport, fed_cfg->unix_socket_path)) { + return amduatd_send_json_error(fd, 500, "Internal Server Error", + "transport init failed"); + } + if (req->effective_space != NULL && req->effective_space->enabled && + req->effective_space->space_id.data != NULL) { + (void)amduat_fed_transport_unix_set_space( + &transport, (const char *)req->effective_space->space_id.data); + } + + memset(&push_transport, 0, sizeof(push_transport)); + push_transport.ctx = &transport; + push_transport.post_ingest = amduatd_fed_push_unix_post_ingest; + + amduatd_fed_push_apply_report_init(&report); + status = amduatd_fed_push_apply(store, + &pointer_store, + req->effective_space, + peer_buf, + limit, + root_path, + fed_cfg, + &push_transport, + &report); + + if (status == AMDUATD_FED_PUSH_APPLY_ERR_DISABLED) { + http_status = 503; + http_reason = "Service Unavailable"; + } else if (status == AMDUATD_FED_PUSH_APPLY_ERR_UNSUPPORTED) { + http_status = 501; + http_reason = "Not Implemented"; + } else if (status == AMDUATD_FED_PUSH_APPLY_ERR_INVALID) { + http_status = 400; + http_reason = "Bad Request"; + } else if (status == AMDUATD_FED_PUSH_APPLY_ERR_REMOTE) { + http_status = 502; + http_reason = "Bad Gateway"; + } else if (status == AMDUATD_FED_PUSH_APPLY_ERR_CONFLICT) { + http_status = 409; + http_reason = "Conflict"; + } else if (status != AMDUATD_FED_PUSH_APPLY_OK) { + http_status = 500; + http_reason = "Internal Server Error"; + } + + if (report.cursor_ref_set) { + (void)amduat_asl_ref_encode_hex(report.cursor_ref, &cursor_ref_hex); + } + if (report.cursor_after_ref_set) { + (void)amduat_asl_ref_encode_hex(report.cursor_after_ref, + &cursor_after_hex); + } + if (report.plan_candidate.has_ref) { + (void)amduat_asl_ref_encode_hex(report.plan_candidate.ref, + &candidate_ref_hex); + } + + memset(&b, 0, sizeof(b)); + if (!amduatd_strbuf_append_cstr(&b, "{")) { + goto fed_push_cleanup; + } + if (!amduatd_strbuf_append_cstr(&b, "\"peer\":\"") || + !amduatd_strbuf_append_cstr(&b, peer_buf) || + !amduatd_strbuf_append_cstr(&b, "\",")) { + goto fed_push_cleanup; + } + { + char tmp[64]; + snprintf(tmp, sizeof(tmp), "%u", (unsigned int)domain_id); + if (!amduatd_strbuf_append_cstr(&b, "\"domain_id\":") || + !amduatd_strbuf_append_cstr(&b, tmp) || + !amduatd_strbuf_append_cstr(&b, ",")) { + goto fed_push_cleanup; + } + } + if (!amduatd_strbuf_append_cstr(&b, "\"effective_space\":{")) { + goto fed_push_cleanup; + } + if (req->effective_space != NULL && req->effective_space->enabled && + req->effective_space->space_id.data != NULL) { + const char *space_id = (const char *)req->effective_space->space_id.data; + if (!amduatd_strbuf_append_cstr(&b, "\"mode\":\"scoped\",") || + !amduatd_strbuf_append_cstr(&b, "\"space_id\":\"") || + !amduatd_strbuf_append_cstr(&b, space_id) || + !amduatd_strbuf_append_cstr(&b, "\"")) { + goto fed_push_cleanup; + } + } else { + if (!amduatd_strbuf_append_cstr(&b, "\"mode\":\"unscoped\",") || + !amduatd_strbuf_append_cstr(&b, "\"space_id\":null")) { + goto fed_push_cleanup; + } + } + if (!amduatd_strbuf_append_cstr(&b, "},")) { + goto fed_push_cleanup; + } + { + char tmp[64]; + snprintf(tmp, sizeof(tmp), "%llu", (unsigned long long)limit); + if (!amduatd_strbuf_append_cstr(&b, "\"limit\":") || + !amduatd_strbuf_append_cstr(&b, tmp) || + !amduatd_strbuf_append_cstr(&b, ",")) { + goto fed_push_cleanup; + } + } + if (!amduatd_strbuf_append_cstr(&b, "\"cursor_before\":{")) { + goto fed_push_cleanup; + } + if (!amduatd_strbuf_append_cstr(&b, "\"present\":") || + !amduatd_strbuf_append_cstr(&b, + report.cursor_present ? "true" : "false")) { + goto fed_push_cleanup; + } + if (!amduatd_strbuf_append_cstr(&b, ",\"last_logseq\":")) { + goto fed_push_cleanup; + } + if (report.cursor_present && report.cursor_has_logseq) { + char tmp[64]; + snprintf(tmp, sizeof(tmp), "%llu", + (unsigned long long)report.cursor_logseq); + if (!amduatd_strbuf_append_cstr(&b, tmp)) { + goto fed_push_cleanup; + } + } else { + if (!amduatd_strbuf_append_cstr(&b, "null")) { + goto fed_push_cleanup; + } + } + if (!amduatd_strbuf_append_cstr(&b, ",\"ref\":")) { + goto fed_push_cleanup; + } + if (cursor_ref_hex != NULL) { + if (!amduatd_strbuf_append_cstr(&b, "\"") || + !amduatd_strbuf_append_cstr(&b, cursor_ref_hex) || + !amduatd_strbuf_append_cstr(&b, "\"")) { + goto fed_push_cleanup; + } + } else { + if (!amduatd_strbuf_append_cstr(&b, "null")) { + goto fed_push_cleanup; + } + } + if (!amduatd_strbuf_append_cstr(&b, "},")) { + goto fed_push_cleanup; + } + + if (!amduatd_strbuf_append_cstr(&b, "\"plan_summary\":{")) { + goto fed_push_cleanup; + } + { + char tmp[64]; + snprintf(tmp, sizeof(tmp), "%zu", report.plan_record_count); + if (!amduatd_strbuf_append_cstr(&b, "\"record_count\":") || + !amduatd_strbuf_append_cstr(&b, tmp) || + !amduatd_strbuf_append_cstr(&b, ",\"next_cursor_candidate\":{")) { + goto fed_push_cleanup; + } + } + if (!amduatd_strbuf_append_cstr(&b, "\"last_logseq\":")) { + goto fed_push_cleanup; + } + if (report.plan_candidate.has_logseq) { + char tmp[64]; + snprintf(tmp, sizeof(tmp), "%llu", + (unsigned long long)report.plan_candidate.logseq); + if (!amduatd_strbuf_append_cstr(&b, tmp)) { + goto fed_push_cleanup; + } + } else { + if (!amduatd_strbuf_append_cstr(&b, "null")) { + goto fed_push_cleanup; + } + } + if (!amduatd_strbuf_append_cstr(&b, ",\"ref\":")) { + goto fed_push_cleanup; + } + if (candidate_ref_hex != NULL) { + if (!amduatd_strbuf_append_cstr(&b, "\"") || + !amduatd_strbuf_append_cstr(&b, candidate_ref_hex) || + !amduatd_strbuf_append_cstr(&b, "\"")) { + goto fed_push_cleanup; + } + } else { + if (!amduatd_strbuf_append_cstr(&b, "null")) { + goto fed_push_cleanup; + } + } + if (!amduatd_strbuf_append_cstr(&b, "}},")) { + goto fed_push_cleanup; + } + + if (!amduatd_strbuf_append_cstr(&b, "\"sent\":{")) { + goto fed_push_cleanup; + } + { + char tmp[64]; + snprintf(tmp, sizeof(tmp), "%zu", report.sent_record_count); + if (!amduatd_strbuf_append_cstr(&b, "\"record_count\":") || + !amduatd_strbuf_append_cstr(&b, tmp) || + !amduatd_strbuf_append_cstr(&b, ",")) { + goto fed_push_cleanup; + } + snprintf(tmp, sizeof(tmp), "%llu", + (unsigned long long)report.sent_bytes_total); + if (!amduatd_strbuf_append_cstr(&b, "\"bytes_total\":") || + !amduatd_strbuf_append_cstr(&b, tmp) || + !amduatd_strbuf_append_cstr(&b, ",\"by_type\":{")) { + goto fed_push_cleanup; + } + snprintf(tmp, sizeof(tmp), "%zu", report.sent_artifact_count); + if (!amduatd_strbuf_append_cstr(&b, "\"artifact\":") || + !amduatd_strbuf_append_cstr(&b, tmp) || + !amduatd_strbuf_append_cstr(&b, ",")) { + goto fed_push_cleanup; + } + snprintf(tmp, sizeof(tmp), "%zu", report.sent_per_count); + if (!amduatd_strbuf_append_cstr(&b, "\"per\":") || + !amduatd_strbuf_append_cstr(&b, tmp) || + !amduatd_strbuf_append_cstr(&b, ",")) { + goto fed_push_cleanup; + } + snprintf(tmp, sizeof(tmp), "%zu", report.sent_tgk_edge_count); + if (!amduatd_strbuf_append_cstr(&b, "\"tgk_edge\":") || + !amduatd_strbuf_append_cstr(&b, tmp) || + !amduatd_strbuf_append_cstr(&b, ",")) { + goto fed_push_cleanup; + } + snprintf(tmp, sizeof(tmp), "%zu", report.sent_tombstone_count); + if (!amduatd_strbuf_append_cstr(&b, "\"tombstone\":") || + !amduatd_strbuf_append_cstr(&b, tmp) || + !amduatd_strbuf_append_cstr(&b, "},")) { + goto fed_push_cleanup; + } + snprintf(tmp, sizeof(tmp), "%zu", report.peer_ok_count); + if (!amduatd_strbuf_append_cstr(&b, "\"peer_ok_count\":") || + !amduatd_strbuf_append_cstr(&b, tmp) || + !amduatd_strbuf_append_cstr(&b, ",")) { + goto fed_push_cleanup; + } + snprintf(tmp, sizeof(tmp), "%zu", report.peer_already_present_count); + if (!amduatd_strbuf_append_cstr(&b, "\"peer_already_present_count\":") || + !amduatd_strbuf_append_cstr(&b, tmp) || + !amduatd_strbuf_append_cstr(&b, "},")) { + goto fed_push_cleanup; + } + } + + if (!amduatd_strbuf_append_cstr(&b, "\"cursor_after\":{")) { + goto fed_push_cleanup; + } + if (!amduatd_strbuf_append_cstr(&b, "\"advanced\":") || + !amduatd_strbuf_append_cstr(&b, + report.cursor_advanced ? "true" : "false")) { + goto fed_push_cleanup; + } + if (!amduatd_strbuf_append_cstr(&b, ",\"last_logseq\":")) { + goto fed_push_cleanup; + } + if (report.cursor_advanced && report.cursor_after_has_logseq) { + char tmp[64]; + snprintf(tmp, sizeof(tmp), "%llu", + (unsigned long long)report.cursor_after_logseq); + if (!amduatd_strbuf_append_cstr(&b, tmp)) { + goto fed_push_cleanup; + } + } else { + if (!amduatd_strbuf_append_cstr(&b, "null")) { + goto fed_push_cleanup; + } + } + if (!amduatd_strbuf_append_cstr(&b, ",\"ref\":")) { + goto fed_push_cleanup; + } + if (report.cursor_advanced && cursor_after_hex != NULL) { + if (!amduatd_strbuf_append_cstr(&b, "\"") || + !amduatd_strbuf_append_cstr(&b, cursor_after_hex) || + !amduatd_strbuf_append_cstr(&b, "\"")) { + goto fed_push_cleanup; + } + } else { + if (!amduatd_strbuf_append_cstr(&b, "null")) { + goto fed_push_cleanup; + } + } + if (!amduatd_strbuf_append_cstr(&b, "},")) { + goto fed_push_cleanup; + } + + if (!amduatd_strbuf_append_cstr(&b, "\"errors\":[")) { + goto fed_push_cleanup; + } + if (status == AMDUATD_FED_PUSH_APPLY_OK) { + if (!amduatd_strbuf_append_cstr(&b, "]}\n")) { + goto fed_push_cleanup; + } + } else { + char tmp[64]; + if (!amduatd_strbuf_append_cstr(&b, "{\"message\":\"") || + !amduatd_strbuf_append_cstr(&b, + report.error[0] != '\0' + ? report.error + : "error") || + !amduatd_strbuf_append_cstr(&b, "\"")) { + goto fed_push_cleanup; + } + if (report.remote_status != 0) { + snprintf(tmp, sizeof(tmp), "%d", report.remote_status); + if (!amduatd_strbuf_append_cstr(&b, ",\"remote_status\":") || + !amduatd_strbuf_append_cstr(&b, tmp)) { + goto fed_push_cleanup; + } + } + if (!amduatd_strbuf_append_cstr(&b, "}]}\n")) { + goto fed_push_cleanup; + } + } + + ok = amduatd_http_send_json(fd, http_status, http_reason, b.data, false); + +fed_push_cleanup: + amduatd_strbuf_free(&b); + free(cursor_ref_hex); + free(cursor_after_hex); + free(candidate_ref_hex); + amduatd_fed_push_apply_report_free(&report); + if (!ok) { + return amduatd_send_json_error(fd, 500, "Internal Server Error", + "encode error"); + } + return true; +} static bool amduatd_handle_post_pel_programs(int fd, amduat_asl_store_t *store, const amduatd_http_req_t *req) { @@ -6078,6 +6646,17 @@ static bool amduatd_handle_conn(int fd, root_path); goto conn_cleanup; } + if (strcmp(req.method, "GET") == 0 && + strcmp(no_query, "/v1/fed/push/plan") == 0) { + ok = amduatd_handle_get_fed_push_plan(fd, + store, + fed_cfg, + caps, + effective_cfg, + &req, + root_path); + goto conn_cleanup; + } if (strcmp(req.method, "GET") == 0 && strcmp(no_query, "/v1/fed/status") == 0) { ok = amduatd_handle_get_fed_status(fd, coord, fed_cfg, &req); @@ -6099,6 +6678,17 @@ static bool amduatd_handle_conn(int fd, root_path); goto conn_cleanup; } + if (strcmp(req.method, "POST") == 0 && + strcmp(no_query, "/v1/fed/push") == 0) { + ok = amduatd_handle_post_fed_push(fd, + store, + fed_cfg, + caps, + effective_cfg, + &req, + root_path); + goto conn_cleanup; + } if (strcmp(req.method, "POST") == 0 && strcmp(no_query, "/v1/fed/ingest") == 0) { ok = amduatd_handle_post_fed_ingest(fd, diff --git a/src/amduatd_fed.h b/src/amduatd_fed.h index 7ef3ded..98fba48 100644 --- a/src/amduatd_fed.h +++ b/src/amduatd_fed.h @@ -19,6 +19,11 @@ typedef enum { AMDUATD_FED_TRANSPORT_UNIX = 1 } amduatd_fed_transport_kind_t; +enum { + AMDUATD_FED_LOG_KIND_ARTIFACT = 1u, + AMDUATD_FED_LOG_KIND_TOMBSTONE = 2u +}; + typedef struct { bool enabled; bool require_space; diff --git a/src/amduatd_fed_cursor.c b/src/amduatd_fed_cursor.c index ecd45d9..428ec61 100644 --- a/src/amduatd_fed_cursor.c +++ b/src/amduatd_fed_cursor.c @@ -110,6 +110,45 @@ static bool amduatd_fed_cursor_peer_key_is_valid(const char *peer_key) { return amduat_asl_pointer_name_is_valid(peer_key); } +static bool amduatd_fed_cursor_pointer_name_with_prefix( + const amduatd_space_t *space, + const char *peer_key, + const char *prefix, + amduat_octets_t *out_name) { + const char suffix[] = "/head"; + size_t peer_len; + size_t total_len; + size_t prefix_len; + char *base = NULL; + bool ok; + + if (out_name != NULL) { + *out_name = amduat_octets(NULL, 0u); + } + if (out_name == NULL || prefix == NULL || + !amduatd_fed_cursor_peer_key_is_valid(peer_key)) { + return false; + } + peer_len = strlen(peer_key); + prefix_len = strlen(prefix); + if (peer_len > SIZE_MAX - prefix_len - (sizeof(suffix) - 1u)) { + return false; + } + total_len = prefix_len + peer_len + (sizeof(suffix) - 1u); + base = (char *)malloc(total_len + 1u); + if (base == NULL) { + return false; + } + memcpy(base, prefix, prefix_len); + memcpy(base + prefix_len, peer_key, peer_len); + memcpy(base + prefix_len + peer_len, suffix, sizeof(suffix) - 1u); + base[total_len] = '\0'; + + ok = amduatd_space_scope_name(space, base, out_name); + free(base); + return ok; +} + static bool amduatd_fed_cursor_record_encode( const amduatd_fed_cursor_record_t *record, amduat_octets_t *out_payload) { @@ -358,36 +397,19 @@ void amduatd_fed_cursor_record_free(amduatd_fed_cursor_record_t *record) { bool amduatd_fed_cursor_pointer_name(const amduatd_space_t *space, const char *peer_key, amduat_octets_t *out_name) { - const char prefix[] = "fed/cursor/"; - const char suffix[] = "/head"; - size_t peer_len; - size_t total_len; - char *base = NULL; - bool ok; + return amduatd_fed_cursor_pointer_name_with_prefix(space, + peer_key, + "fed/cursor/", + out_name); +} - if (out_name != NULL) { - *out_name = amduat_octets(NULL, 0u); - } - if (out_name == NULL || !amduatd_fed_cursor_peer_key_is_valid(peer_key)) { - return false; - } - peer_len = strlen(peer_key); - if (peer_len > SIZE_MAX - (sizeof(prefix) - 1u) - (sizeof(suffix) - 1u)) { - return false; - } - total_len = (sizeof(prefix) - 1u) + peer_len + (sizeof(suffix) - 1u); - base = (char *)malloc(total_len + 1u); - if (base == NULL) { - return false; - } - memcpy(base, prefix, sizeof(prefix) - 1u); - memcpy(base + sizeof(prefix) - 1u, peer_key, peer_len); - memcpy(base + sizeof(prefix) - 1u + peer_len, suffix, sizeof(suffix) - 1u); - base[total_len] = '\0'; - - ok = amduatd_space_scope_name(space, base, out_name); - free(base); - return ok; +bool amduatd_fed_push_cursor_pointer_name(const amduatd_space_t *space, + const char *peer_key, + amduat_octets_t *out_name) { + return amduatd_fed_cursor_pointer_name_with_prefix(space, + peer_key, + "fed/push_cursor/", + out_name); } amduatd_fed_cursor_status_t amduatd_fed_cursor_check_enabled( @@ -401,11 +423,12 @@ amduatd_fed_cursor_status_t amduatd_fed_cursor_check_enabled( return AMDUATD_FED_CURSOR_OK; } -amduatd_fed_cursor_status_t amduatd_fed_cursor_get( +static amduatd_fed_cursor_status_t amduatd_fed_cursor_get_with_prefix( amduat_asl_store_t *store, amduat_asl_pointer_store_t *pointer_store, const amduatd_space_t *effective_space, const char *peer_key, + const char *prefix, amduatd_fed_cursor_record_t *out_cursor, amduat_reference_t *out_ref) { amduat_octets_t pointer_name = amduat_octets(NULL, 0u); @@ -423,9 +446,10 @@ amduatd_fed_cursor_status_t amduatd_fed_cursor_get( return AMDUATD_FED_CURSOR_ERR_INVALID; } - if (!amduatd_fed_cursor_pointer_name(effective_space, - peer_key, - &pointer_name)) { + if (!amduatd_fed_cursor_pointer_name_with_prefix(effective_space, + peer_key, + prefix, + &pointer_name)) { return AMDUATD_FED_CURSOR_ERR_INVALID; } @@ -494,11 +518,44 @@ amduatd_fed_cursor_status_t amduatd_fed_cursor_get( return AMDUATD_FED_CURSOR_OK; } -amduatd_fed_cursor_status_t amduatd_fed_cursor_cas_set( +amduatd_fed_cursor_status_t amduatd_fed_cursor_get( amduat_asl_store_t *store, amduat_asl_pointer_store_t *pointer_store, const amduatd_space_t *effective_space, const char *peer_key, + amduatd_fed_cursor_record_t *out_cursor, + amduat_reference_t *out_ref) { + return amduatd_fed_cursor_get_with_prefix(store, + pointer_store, + effective_space, + peer_key, + "fed/cursor/", + out_cursor, + out_ref); +} + +amduatd_fed_cursor_status_t amduatd_fed_push_cursor_get( + amduat_asl_store_t *store, + amduat_asl_pointer_store_t *pointer_store, + const amduatd_space_t *effective_space, + const char *peer_key, + amduatd_fed_cursor_record_t *out_cursor, + amduat_reference_t *out_ref) { + return amduatd_fed_cursor_get_with_prefix(store, + pointer_store, + effective_space, + peer_key, + "fed/push_cursor/", + out_cursor, + out_ref); +} + +static amduatd_fed_cursor_status_t amduatd_fed_cursor_cas_set_with_prefix( + amduat_asl_store_t *store, + amduat_asl_pointer_store_t *pointer_store, + const amduatd_space_t *effective_space, + const char *peer_key, + const char *prefix, const amduat_reference_t *expected_ref, const amduatd_fed_cursor_record_t *new_cursor, amduat_reference_t *out_new_ref) { @@ -534,9 +591,10 @@ amduatd_fed_cursor_status_t amduatd_fed_cursor_cas_set( return AMDUATD_FED_CURSOR_ERR_INVALID; } - if (!amduatd_fed_cursor_pointer_name(effective_space, - peer_key, - &pointer_name)) { + if (!amduatd_fed_cursor_pointer_name_with_prefix(effective_space, + peer_key, + prefix, + &pointer_name)) { return AMDUATD_FED_CURSOR_ERR_INVALID; } @@ -582,3 +640,39 @@ amduatd_fed_cursor_status_t amduatd_fed_cursor_cas_set( amduat_reference_free(&record_ref); return AMDUATD_FED_CURSOR_OK; } + +amduatd_fed_cursor_status_t amduatd_fed_cursor_cas_set( + amduat_asl_store_t *store, + amduat_asl_pointer_store_t *pointer_store, + const amduatd_space_t *effective_space, + const char *peer_key, + const amduat_reference_t *expected_ref, + const amduatd_fed_cursor_record_t *new_cursor, + amduat_reference_t *out_new_ref) { + return amduatd_fed_cursor_cas_set_with_prefix(store, + pointer_store, + effective_space, + peer_key, + "fed/cursor/", + expected_ref, + new_cursor, + out_new_ref); +} + +amduatd_fed_cursor_status_t amduatd_fed_push_cursor_cas_set( + amduat_asl_store_t *store, + amduat_asl_pointer_store_t *pointer_store, + const amduatd_space_t *effective_space, + const char *peer_key, + const amduat_reference_t *expected_ref, + const amduatd_fed_cursor_record_t *new_cursor, + amduat_reference_t *out_new_ref) { + return amduatd_fed_cursor_cas_set_with_prefix(store, + pointer_store, + effective_space, + peer_key, + "fed/push_cursor/", + expected_ref, + new_cursor, + out_new_ref); +} diff --git a/src/amduatd_fed_cursor.h b/src/amduatd_fed_cursor.h index 2f0c11c..44b53d1 100644 --- a/src/amduatd_fed_cursor.h +++ b/src/amduatd_fed_cursor.h @@ -42,6 +42,10 @@ bool amduatd_fed_cursor_pointer_name(const amduatd_space_t *space, const char *peer_key, amduat_octets_t *out_name); +bool amduatd_fed_push_cursor_pointer_name(const amduatd_space_t *space, + const char *peer_key, + amduat_octets_t *out_name); + amduatd_fed_cursor_status_t amduatd_fed_cursor_check_enabled( const amduatd_fed_cfg_t *cfg); @@ -53,6 +57,14 @@ amduatd_fed_cursor_status_t amduatd_fed_cursor_get( amduatd_fed_cursor_record_t *out_cursor, amduat_reference_t *out_ref); +amduatd_fed_cursor_status_t amduatd_fed_push_cursor_get( + amduat_asl_store_t *store, + amduat_asl_pointer_store_t *pointer_store, + const amduatd_space_t *effective_space, + const char *peer_key, + amduatd_fed_cursor_record_t *out_cursor, + amduat_reference_t *out_ref); + amduatd_fed_cursor_status_t amduatd_fed_cursor_cas_set( amduat_asl_store_t *store, amduat_asl_pointer_store_t *pointer_store, @@ -62,6 +74,15 @@ amduatd_fed_cursor_status_t amduatd_fed_cursor_cas_set( const amduatd_fed_cursor_record_t *new_cursor, amduat_reference_t *out_new_ref); +amduatd_fed_cursor_status_t amduatd_fed_push_cursor_cas_set( + amduat_asl_store_t *store, + amduat_asl_pointer_store_t *pointer_store, + const amduatd_space_t *effective_space, + const char *peer_key, + const amduat_reference_t *expected_ref, + const amduatd_fed_cursor_record_t *new_cursor, + amduat_reference_t *out_new_ref); + #ifdef __cplusplus } /* extern "C" */ #endif diff --git a/src/amduatd_fed_push_apply.c b/src/amduatd_fed_push_apply.c new file mode 100644 index 0000000..cd173bc --- /dev/null +++ b/src/amduatd_fed_push_apply.c @@ -0,0 +1,342 @@ +#include "amduatd_fed_push_apply.h" + +#include "amduat/asl/artifact_io.h" +#include "amduat/asl/store.h" + +#include +#include +#include + +static bool amduatd_fed_push_parse_u32(const char *s, uint32_t *out) { + char *end = NULL; + unsigned long val; + if (s == NULL || out == NULL || s[0] == '\0') { + return false; + } + val = strtoul(s, &end, 10); + if (end == s || *end != '\0' || val > UINT32_MAX) { + return false; + } + *out = (uint32_t)val; + return true; +} + +static bool amduatd_fed_push_strdup(const char *s, char **out) { + size_t len; + char *buf; + if (out == NULL) { + return false; + } + *out = NULL; + if (s == NULL) { + return false; + } + len = strlen(s); + if (len > SIZE_MAX - 1u) { + return false; + } + buf = (char *)malloc(len + 1u); + if (buf == NULL) { + return false; + } + if (len != 0u) { + memcpy(buf, s, len); + } + buf[len] = '\0'; + *out = buf; + return true; +} + +static void amduatd_fed_push_report_error( + amduatd_fed_push_apply_report_t *report, + const char *msg) { + if (report == NULL || msg == NULL) { + return; + } + memset(report->error, 0, sizeof(report->error)); + strncpy(report->error, msg, sizeof(report->error) - 1u); +} + +static bool amduatd_fed_push_body_is_already_present(const char *body) { + if (body == NULL) { + return false; + } + return strstr(body, "\"status\":\"already_present\"") != NULL; +} + +void amduatd_fed_push_apply_report_init( + amduatd_fed_push_apply_report_t *report) { + if (report == NULL) { + return; + } + memset(report, 0, sizeof(*report)); + report->cursor_ref = amduat_reference(0u, amduat_octets(NULL, 0u)); + report->cursor_after_ref = amduat_reference(0u, amduat_octets(NULL, 0u)); + amduatd_fed_push_plan_candidate_init(&report->plan_candidate); +} + +void amduatd_fed_push_apply_report_free( + amduatd_fed_push_apply_report_t *report) { + if (report == NULL) { + return; + } + if (report->cursor_ref_set) { + amduat_reference_free(&report->cursor_ref); + } + if (report->cursor_after_ref_set) { + amduat_reference_free(&report->cursor_after_ref); + } + amduatd_fed_push_plan_candidate_free(&report->plan_candidate); + memset(report, 0, sizeof(*report)); +} + +amduatd_fed_push_apply_status_t amduatd_fed_push_apply( + amduat_asl_store_t *store, + amduat_asl_pointer_store_t *pointer_store, + const amduatd_space_t *effective_space, + const char *peer_key, + uint64_t limit, + const char *root_path, + const amduatd_fed_cfg_t *fed_cfg, + const amduatd_fed_push_transport_t *transport, + amduatd_fed_push_apply_report_t *out_report) { + amduatd_fed_push_plan_scan_t scan; + amduatd_fed_push_cursor_candidate_t candidate; + amduatd_fed_cursor_record_t next_cursor; + amduat_reference_t next_ref; + size_t i; + + if (out_report == NULL) { + return AMDUATD_FED_PUSH_APPLY_ERR_INVALID; + } + amduatd_fed_push_apply_report_init(out_report); + out_report->peer_key = peer_key; + out_report->effective_space = effective_space; + out_report->limit = limit; + + if (store == NULL || pointer_store == NULL || peer_key == NULL || + fed_cfg == NULL || transport == NULL || root_path == NULL) { + amduatd_fed_push_report_error(out_report, "invalid inputs"); + return AMDUATD_FED_PUSH_APPLY_ERR_INVALID; + } + if (!fed_cfg->enabled) { + amduatd_fed_push_report_error(out_report, "federation disabled"); + return AMDUATD_FED_PUSH_APPLY_ERR_DISABLED; + } + if (store->ops.log_scan == NULL || store->ops.current_state == NULL) { + amduatd_fed_push_report_error(out_report, "requires index backend"); + return AMDUATD_FED_PUSH_APPLY_ERR_UNSUPPORTED; + } + if (transport->post_ingest == NULL) { + amduatd_fed_push_report_error(out_report, "transport unavailable"); + return AMDUATD_FED_PUSH_APPLY_ERR_UNSUPPORTED; + } + { + amduat_octets_t scoped = amduat_octets(NULL, 0u); + if (!amduatd_fed_push_cursor_pointer_name(effective_space, + peer_key, + &scoped)) { + amduatd_fed_push_report_error(out_report, "invalid peer"); + return AMDUATD_FED_PUSH_APPLY_ERR_INVALID; + } + amduat_octets_free(&scoped); + } + { + uint32_t domain_id = 0u; + if (!amduatd_fed_push_parse_u32(peer_key, &domain_id)) { + amduatd_fed_push_report_error(out_report, "invalid peer"); + return AMDUATD_FED_PUSH_APPLY_ERR_INVALID; + } + } + + if (amduatd_fed_push_plan_scan(store, + pointer_store, + effective_space, + peer_key, + limit, + root_path, + &scan) != AMDUATD_FED_PUSH_PLAN_OK) { + amduatd_fed_push_plan_scan_free(&scan); + amduatd_fed_push_report_error(out_report, "plan scan failed"); + return AMDUATD_FED_PUSH_APPLY_ERR_STORE; + } + + out_report->cursor_present = scan.cursor_present; + if (scan.cursor_present) { + out_report->cursor_has_logseq = scan.cursor.has_logseq; + out_report->cursor_logseq = scan.cursor.last_logseq; + if (scan.cursor_ref.digest.data != NULL) { + if (!amduat_reference_clone(scan.cursor_ref, &out_report->cursor_ref)) { + amduatd_fed_push_plan_scan_free(&scan); + amduatd_fed_push_report_error(out_report, "oom"); + return AMDUATD_FED_PUSH_APPLY_ERR_OOM; + } + out_report->cursor_ref_set = true; + } + } + + if (!amduatd_fed_push_plan_next_cursor_candidate(scan.cursor_present + ? &scan.cursor + : NULL, + scan.records, + scan.record_count, + &candidate)) { + amduatd_fed_push_plan_scan_free(&scan); + amduatd_fed_push_report_error(out_report, "oom"); + return AMDUATD_FED_PUSH_APPLY_ERR_OOM; + } + out_report->plan_record_count = scan.record_count; + out_report->plan_candidate = candidate; + + if (scan.record_count == 0u) { + amduatd_fed_push_plan_scan_free(&scan); + return AMDUATD_FED_PUSH_APPLY_OK; + } + + for (i = 0; i < scan.record_count; ++i) { + const amduat_fed_record_t *rec = &scan.records[i]; + amduat_octets_t bytes = amduat_octets(NULL, 0u); + amduat_artifact_t artifact; + amduat_asl_store_error_t store_err; + int status = 0; + char *body = NULL; + bool already_present = false; + + if (rec->id.type != AMDUAT_FED_REC_TOMBSTONE) { + memset(&artifact, 0, sizeof(artifact)); + store_err = amduat_asl_store_get(store, rec->id.ref, &artifact); + if (store_err != AMDUAT_ASL_STORE_OK) { + amduatd_fed_push_plan_scan_free(&scan); + amduatd_fed_push_report_error(out_report, "artifact missing"); + return AMDUATD_FED_PUSH_APPLY_ERR_STORE; + } + bytes = artifact.bytes; + out_report->sent_bytes_total += bytes.len; + } + + if (!transport->post_ingest(transport->ctx, + rec->id.type, + rec->id.ref, + bytes, + &status, + &body)) { + if (rec->id.type != AMDUAT_FED_REC_TOMBSTONE) { + amduat_asl_artifact_free(&artifact); + } + free(body); + amduatd_fed_push_plan_scan_free(&scan); + amduatd_fed_push_report_error(out_report, "ingest failed"); + return AMDUATD_FED_PUSH_APPLY_ERR_REMOTE; + } + if (rec->id.type != AMDUAT_FED_REC_TOMBSTONE) { + amduat_asl_artifact_free(&artifact); + } + + out_report->sent_record_count++; + if (rec->id.type == AMDUAT_FED_REC_ARTIFACT) { + out_report->sent_artifact_count++; + } else if (rec->id.type == AMDUAT_FED_REC_PER) { + out_report->sent_per_count++; + } else if (rec->id.type == AMDUAT_FED_REC_TGK_EDGE) { + out_report->sent_tgk_edge_count++; + } else if (rec->id.type == AMDUAT_FED_REC_TOMBSTONE) { + out_report->sent_tombstone_count++; + } + + if (status != 200) { + out_report->remote_status = status; + free(body); + amduatd_fed_push_plan_scan_free(&scan); + amduatd_fed_push_report_error(out_report, "peer error"); + return AMDUATD_FED_PUSH_APPLY_ERR_REMOTE; + } + + already_present = amduatd_fed_push_body_is_already_present(body); + free(body); + if (already_present) { + out_report->peer_already_present_count++; + } else { + out_report->peer_ok_count++; + } + } + + amduatd_fed_cursor_record_init(&next_cursor); + if (!amduatd_fed_push_strdup(peer_key, &next_cursor.peer_key)) { + amduatd_fed_cursor_record_free(&next_cursor); + amduatd_fed_push_plan_scan_free(&scan); + amduatd_fed_push_report_error(out_report, "oom"); + return AMDUATD_FED_PUSH_APPLY_ERR_OOM; + } + if (effective_space != NULL && effective_space->enabled && + effective_space->space_id.data != NULL) { + const char *space_id = (const char *)effective_space->space_id.data; + if (!amduatd_fed_push_strdup(space_id, &next_cursor.space_id)) { + amduatd_fed_cursor_record_free(&next_cursor); + amduatd_fed_push_plan_scan_free(&scan); + amduatd_fed_push_report_error(out_report, "oom"); + return AMDUATD_FED_PUSH_APPLY_ERR_OOM; + } + } else { + next_cursor.space_id = NULL; + } + if (out_report->plan_candidate.has_logseq) { + next_cursor.has_logseq = true; + next_cursor.last_logseq = out_report->plan_candidate.logseq; + } + if (out_report->plan_candidate.has_ref) { + next_cursor.has_record_ref = true; + if (!amduat_reference_clone(out_report->plan_candidate.ref, + &next_cursor.last_record_ref)) { + amduatd_fed_cursor_record_free(&next_cursor); + amduatd_fed_push_plan_scan_free(&scan); + amduatd_fed_push_report_error(out_report, "oom"); + return AMDUATD_FED_PUSH_APPLY_ERR_OOM; + } + } + if (!next_cursor.has_logseq && !next_cursor.has_record_ref) { + amduatd_fed_cursor_record_free(&next_cursor); + amduatd_fed_push_plan_scan_free(&scan); + amduatd_fed_push_report_error(out_report, "invalid cursor"); + return AMDUATD_FED_PUSH_APPLY_ERR_INVALID; + } + + memset(&next_ref, 0, sizeof(next_ref)); + { + amduatd_fed_cursor_status_t st; + st = amduatd_fed_push_cursor_cas_set(store, + pointer_store, + effective_space, + peer_key, + scan.cursor_present + ? &scan.cursor_ref + : NULL, + &next_cursor, + &next_ref); + amduatd_fed_cursor_record_free(&next_cursor); + if (st == AMDUATD_FED_CURSOR_ERR_CONFLICT) { + amduatd_fed_push_plan_scan_free(&scan); + amduatd_fed_push_report_error(out_report, "cursor conflict"); + return AMDUATD_FED_PUSH_APPLY_ERR_CONFLICT; + } + if (st != AMDUATD_FED_CURSOR_OK) { + amduatd_fed_push_plan_scan_free(&scan); + amduatd_fed_push_report_error(out_report, "cursor update failed"); + return AMDUATD_FED_PUSH_APPLY_ERR_STORE; + } + } + + out_report->cursor_advanced = true; + if (out_report->plan_candidate.has_logseq) { + out_report->cursor_after_has_logseq = true; + out_report->cursor_after_logseq = out_report->plan_candidate.logseq; + } + if (next_ref.digest.data != NULL) { + out_report->cursor_after_ref_set = true; + out_report->cursor_after_ref = next_ref; + } else { + amduat_reference_free(&next_ref); + } + + amduatd_fed_push_plan_scan_free(&scan); + return AMDUATD_FED_PUSH_APPLY_OK; +} diff --git a/src/amduatd_fed_push_apply.h b/src/amduatd_fed_push_apply.h new file mode 100644 index 0000000..0a25dca --- /dev/null +++ b/src/amduatd_fed_push_apply.h @@ -0,0 +1,89 @@ +#ifndef AMDUATD_FED_PUSH_APPLY_H +#define AMDUATD_FED_PUSH_APPLY_H + +#include "amduat/asl/core.h" +#include "amduat/fed/replay.h" +#include "amduatd_fed.h" +#include "amduatd_fed_cursor.h" +#include "amduatd_fed_push_plan.h" +#include "amduatd_space.h" + +#include +#include +#include + +#ifdef __cplusplus +extern "C" { +#endif + +typedef struct { + void *ctx; + bool (*post_ingest)(void *ctx, + amduat_fed_record_type_t record_type, + amduat_reference_t ref, + amduat_octets_t bytes, + int *out_status, + char **out_body); +} amduatd_fed_push_transport_t; + +typedef enum { + AMDUATD_FED_PUSH_APPLY_OK = 0, + AMDUATD_FED_PUSH_APPLY_ERR_INVALID = 1, + AMDUATD_FED_PUSH_APPLY_ERR_DISABLED = 2, + AMDUATD_FED_PUSH_APPLY_ERR_UNSUPPORTED = 3, + AMDUATD_FED_PUSH_APPLY_ERR_REMOTE = 4, + AMDUATD_FED_PUSH_APPLY_ERR_STORE = 5, + AMDUATD_FED_PUSH_APPLY_ERR_CONFLICT = 6, + AMDUATD_FED_PUSH_APPLY_ERR_OOM = 7 +} amduatd_fed_push_apply_status_t; + +typedef struct { + const char *peer_key; + const amduatd_space_t *effective_space; + uint64_t limit; + bool cursor_present; + bool cursor_has_logseq; + uint64_t cursor_logseq; + bool cursor_ref_set; + amduat_reference_t cursor_ref; + size_t plan_record_count; + amduatd_fed_push_cursor_candidate_t plan_candidate; + size_t sent_record_count; + uint64_t sent_bytes_total; + size_t sent_artifact_count; + size_t sent_per_count; + size_t sent_tgk_edge_count; + size_t sent_tombstone_count; + size_t peer_ok_count; + size_t peer_already_present_count; + bool cursor_advanced; + bool cursor_after_has_logseq; + uint64_t cursor_after_logseq; + bool cursor_after_ref_set; + amduat_reference_t cursor_after_ref; + int remote_status; + char error[256]; +} amduatd_fed_push_apply_report_t; + +void amduatd_fed_push_apply_report_init( + amduatd_fed_push_apply_report_t *report); + +void amduatd_fed_push_apply_report_free( + amduatd_fed_push_apply_report_t *report); + +amduatd_fed_push_apply_status_t amduatd_fed_push_apply( + amduat_asl_store_t *store, + amduat_asl_pointer_store_t *pointer_store, + const amduatd_space_t *effective_space, + const char *peer_key, + uint64_t limit, + const char *root_path, + const amduatd_fed_cfg_t *fed_cfg, + const amduatd_fed_push_transport_t *transport, + amduatd_fed_push_apply_report_t *out_report); + +#ifdef __cplusplus +} /* extern "C" */ +#endif + +#endif /* AMDUATD_FED_PUSH_APPLY_H */ diff --git a/src/amduatd_fed_push_plan.c b/src/amduatd_fed_push_plan.c new file mode 100644 index 0000000..0ff5590 --- /dev/null +++ b/src/amduatd_fed_push_plan.c @@ -0,0 +1,645 @@ +#include "amduatd_fed_push_plan.h" + +#include "amduat/asl/log_store.h" +#include "amduat/asl/artifact_io.h" +#include "amduat/asl/ref_text.h" +#include "amduat/asl/store.h" +#include "amduat/enc/fer1_receipt.h" +#include "amduat/enc/tgk1_edge.h" + +#include +#include +#include + +typedef struct { + char *data; + size_t len; + size_t cap; +} amduatd_fed_push_plan_strbuf_t; + +static void amduatd_fed_push_plan_strbuf_free( + amduatd_fed_push_plan_strbuf_t *b) { + if (b == NULL) { + return; + } + free(b->data); + b->data = NULL; + b->len = 0; + b->cap = 0; +} + +static bool amduatd_fed_push_plan_strbuf_reserve( + amduatd_fed_push_plan_strbuf_t *b, + size_t extra) { + size_t need; + size_t next_cap; + char *next; + + if (b == NULL) { + return false; + } + if (extra > (SIZE_MAX - b->len)) { + return false; + } + need = b->len + extra; + if (need <= b->cap) { + return true; + } + next_cap = b->cap != 0 ? b->cap : 256u; + while (next_cap < need) { + if (next_cap > (SIZE_MAX / 2u)) { + next_cap = need; + break; + } + next_cap *= 2u; + } + next = (char *)realloc(b->data, next_cap); + if (next == NULL) { + return false; + } + b->data = next; + b->cap = next_cap; + return true; +} + +static bool amduatd_fed_push_plan_strbuf_append( + amduatd_fed_push_plan_strbuf_t *b, + const char *s, + size_t n) { + if (b == NULL) { + return false; + } + if (n == 0u) { + return true; + } + if (s == NULL) { + return false; + } + if (!amduatd_fed_push_plan_strbuf_reserve(b, n + 1u)) { + return false; + } + memcpy(b->data + b->len, s, n); + b->len += n; + b->data[b->len] = '\0'; + return true; +} + +static bool amduatd_fed_push_plan_strbuf_append_cstr( + amduatd_fed_push_plan_strbuf_t *b, + const char *s) { + return amduatd_fed_push_plan_strbuf_append( + b, s != NULL ? s : "", s != NULL ? strlen(s) : 0u); +} + +static const char *amduatd_fed_push_plan_record_type_name( + amduat_fed_record_type_t type) { + switch (type) { + case AMDUAT_FED_REC_ARTIFACT: + return "artifact"; + case AMDUAT_FED_REC_PER: + return "per"; + case AMDUAT_FED_REC_TGK_EDGE: + return "tgk_edge"; + case AMDUAT_FED_REC_TOMBSTONE: + return "tombstone"; + default: + return "unknown"; + } +} + +void amduatd_fed_push_plan_candidate_init( + amduatd_fed_push_cursor_candidate_t *candidate) { + if (candidate == NULL) { + return; + } + memset(candidate, 0, sizeof(*candidate)); + candidate->ref = amduat_reference(0u, amduat_octets(NULL, 0u)); +} + +void amduatd_fed_push_plan_candidate_free( + amduatd_fed_push_cursor_candidate_t *candidate) { + if (candidate == NULL) { + return; + } + if (candidate->has_ref) { + amduat_reference_free(&candidate->ref); + } + memset(candidate, 0, sizeof(*candidate)); +} + +bool amduatd_fed_push_plan_next_cursor_candidate( + const amduatd_fed_cursor_record_t *cursor, + const amduat_fed_record_t *records, + size_t record_count, + amduatd_fed_push_cursor_candidate_t *out_candidate) { + if (out_candidate == NULL) { + return false; + } + amduatd_fed_push_plan_candidate_init(out_candidate); + + if (record_count > 0u && records != NULL) { + const amduat_fed_record_t *last = &records[record_count - 1u]; + out_candidate->has_logseq = true; + out_candidate->logseq = last->logseq; + out_candidate->has_ref = true; + if (!amduat_reference_clone(last->id.ref, &out_candidate->ref)) { + amduatd_fed_push_plan_candidate_free(out_candidate); + return false; + } + return true; + } + (void)cursor; + return true; +} + +amduatd_fed_push_plan_status_t amduatd_fed_push_plan_check( + const amduatd_fed_cfg_t *cfg, + const amduat_asl_store_t *store) { + if (cfg == NULL || store == NULL) { + return AMDUATD_FED_PUSH_PLAN_ERR_INVALID; + } + if (!cfg->enabled) { + return AMDUATD_FED_PUSH_PLAN_ERR_DISABLED; + } + if (store->ops.log_scan == NULL || store->ops.current_state == NULL) { + return AMDUATD_FED_PUSH_PLAN_ERR_UNSUPPORTED; + } + return AMDUATD_FED_PUSH_PLAN_OK; +} + +static void amduatd_fed_push_plan_records_free(amduat_fed_record_t *records, + size_t record_count) { + size_t i; + if (records == NULL) { + return; + } + for (i = 0; i < record_count; ++i) { + amduat_reference_free(&records[i].id.ref); + } + free(records); +} + +static bool amduatd_fed_push_entry_record_type( + amduat_asl_store_t *store, + const amduat_asl_log_entry_t *entry, + amduat_fed_record_type_t *out_type) { + amduat_fed_record_type_t rec_type = AMDUAT_FED_REC_ARTIFACT; + if (store == NULL || entry == NULL || out_type == NULL) { + return false; + } + if (entry->kind == AMDUATD_FED_LOG_KIND_ARTIFACT) { + amduat_artifact_t artifact; + amduat_asl_store_error_t store_err; + memset(&artifact, 0, sizeof(artifact)); + store_err = amduat_asl_store_get(store, entry->payload_ref, &artifact); + if (store_err == AMDUAT_ASL_STORE_OK && artifact.has_type_tag) { + if (artifact.type_tag.tag_id == AMDUAT_TYPE_TAG_TGK1_EDGE_V1) { + rec_type = AMDUAT_FED_REC_TGK_EDGE; + } else if (artifact.type_tag.tag_id == AMDUAT_TYPE_TAG_FER1_RECEIPT_1) { + rec_type = AMDUAT_FED_REC_PER; + } + } + if (store_err == AMDUAT_ASL_STORE_OK) { + amduat_asl_artifact_free(&artifact); + } + } else if (entry->kind == AMDUATD_FED_LOG_KIND_TOMBSTONE) { + rec_type = AMDUAT_FED_REC_TOMBSTONE; + } else { + return false; + } + *out_type = rec_type; + return true; +} + +void amduatd_fed_push_plan_scan_init(amduatd_fed_push_plan_scan_t *scan) { + if (scan == NULL) { + return; + } + memset(scan, 0, sizeof(*scan)); + amduatd_fed_cursor_record_init(&scan->cursor); + scan->cursor_ref = amduat_reference(0u, amduat_octets(NULL, 0u)); +} + +void amduatd_fed_push_plan_scan_free(amduatd_fed_push_plan_scan_t *scan) { + if (scan == NULL) { + return; + } + if (scan->cursor_present) { + amduatd_fed_cursor_record_free(&scan->cursor); + amduat_reference_free(&scan->cursor_ref); + } + amduatd_fed_push_plan_records_free(scan->records, scan->record_count); + memset(scan, 0, sizeof(*scan)); +} + +amduatd_fed_push_plan_status_t amduatd_fed_push_plan_scan( + amduat_asl_store_t *store, + amduat_asl_pointer_store_t *pointer_store, + const amduatd_space_t *effective_space, + const char *peer_key, + uint64_t limit, + const char *root_path, + amduatd_fed_push_plan_scan_t *out_scan) { + amduat_asl_log_store_t log_store; + amduat_asl_log_entry_t *entries = NULL; + size_t entry_count = 0u; + uint64_t next_offset = 0u; + bool end = false; + amduat_octets_t log_name = amduat_octets(NULL, 0u); + uint64_t from_logseq = 0u; + + if (store == NULL || pointer_store == NULL || peer_key == NULL || + root_path == NULL || out_scan == NULL) { + return AMDUATD_FED_PUSH_PLAN_ERR_INVALID; + } + amduatd_fed_push_plan_scan_init(out_scan); + + { + amduatd_fed_cursor_status_t cursor_status; + cursor_status = amduatd_fed_push_cursor_get(store, + pointer_store, + effective_space, + peer_key, + &out_scan->cursor, + &out_scan->cursor_ref); + if (cursor_status == AMDUATD_FED_CURSOR_ERR_NOT_FOUND) { + out_scan->cursor_present = false; + } else if (cursor_status == AMDUATD_FED_CURSOR_OK) { + out_scan->cursor_present = true; + if (out_scan->cursor.has_logseq) { + if (out_scan->cursor.last_logseq == UINT64_MAX) { + amduatd_fed_push_plan_scan_free(out_scan); + return AMDUATD_FED_PUSH_PLAN_ERR_INVALID; + } + from_logseq = out_scan->cursor.last_logseq + 1u; + } + } else { + amduatd_fed_push_plan_scan_free(out_scan); + return AMDUATD_FED_PUSH_PLAN_ERR_INVALID; + } + } + + if (!amduat_asl_log_store_init(&log_store, root_path, store, + pointer_store)) { + amduatd_fed_push_plan_scan_free(out_scan); + return AMDUATD_FED_PUSH_PLAN_ERR_INVALID; + } + if (!amduatd_space_scope_name(effective_space, "fed/records", &log_name)) { + amduatd_fed_push_plan_scan_free(out_scan); + return AMDUATD_FED_PUSH_PLAN_ERR_INVALID; + } + + { + amduat_asl_store_error_t read_err = + amduat_asl_log_read(&log_store, + (const char *)log_name.data, + from_logseq, + (size_t)limit, + &entries, + &entry_count, + &next_offset, + &end); + amduat_octets_free(&log_name); + if (read_err != AMDUAT_ASL_STORE_OK) { + amduatd_fed_push_plan_scan_free(out_scan); + return AMDUATD_FED_PUSH_PLAN_ERR_INVALID; + } + } + (void)next_offset; + (void)end; + + if (entry_count != 0u) { + out_scan->records = + (amduat_fed_record_t *)calloc(entry_count, sizeof(*out_scan->records)); + if (out_scan->records == NULL) { + amduat_asl_log_entries_free(entries, entry_count); + amduatd_fed_push_plan_scan_free(out_scan); + return AMDUATD_FED_PUSH_PLAN_ERR_OOM; + } + } + + { + size_t i; + for (i = 0; i < entry_count; ++i) { + const amduat_asl_log_entry_t *entry = &entries[i]; + amduat_fed_record_type_t rec_type = AMDUAT_FED_REC_ARTIFACT; + uint64_t logseq; + + if (entry->payload_ref.digest.data == NULL || + entry->payload_ref.digest.len == 0u) { + continue; + } + if (from_logseq > UINT64_MAX - (uint64_t)i) { + amduat_asl_log_entries_free(entries, entry_count); + amduatd_fed_push_plan_scan_free(out_scan); + return AMDUATD_FED_PUSH_PLAN_ERR_INVALID; + } + logseq = from_logseq + (uint64_t)i; + if (!amduatd_fed_push_entry_record_type(store, entry, &rec_type)) { + continue; + } + memset(&out_scan->records[out_scan->record_count], 0, + sizeof(out_scan->records[out_scan->record_count])); + out_scan->records[out_scan->record_count].id.type = rec_type; + out_scan->records[out_scan->record_count].logseq = logseq; + if (!amduat_reference_clone(entry->payload_ref, + &out_scan->records[out_scan->record_count] + .id.ref)) { + amduat_asl_log_entries_free(entries, entry_count); + amduatd_fed_push_plan_scan_free(out_scan); + return AMDUATD_FED_PUSH_PLAN_ERR_OOM; + } + out_scan->record_count++; + } + } + + amduat_asl_log_entries_free(entries, entry_count); + return AMDUATD_FED_PUSH_PLAN_OK; +} + +amduatd_fed_push_plan_status_t amduatd_fed_push_plan_json( + const amduatd_fed_push_plan_input_t *input, + char **out_json) { + amduatd_fed_push_plan_strbuf_t b; + size_t i; + const amduat_fed_record_t *first = NULL; + const amduat_fed_record_t *last = NULL; + amduatd_fed_push_cursor_candidate_t candidate; + char *ref_hex = NULL; + char *cursor_ref_hex = NULL; + char tmp[64]; + + if (out_json != NULL) { + *out_json = NULL; + } + if (input == NULL || out_json == NULL || input->peer_key == NULL) { + return AMDUATD_FED_PUSH_PLAN_ERR_INVALID; + } + if (input->record_count > 0u && input->records == NULL) { + return AMDUATD_FED_PUSH_PLAN_ERR_INVALID; + } + if (input->cursor_present && input->cursor == NULL) { + return AMDUATD_FED_PUSH_PLAN_ERR_INVALID; + } + + if (input->record_count > 0u && input->records != NULL) { + first = &input->records[0]; + last = &input->records[input->record_count - 1u]; + } + if (!amduatd_fed_push_plan_next_cursor_candidate( + input->cursor_present ? input->cursor : NULL, + input->records, + input->record_count, + &candidate)) { + return AMDUATD_FED_PUSH_PLAN_ERR_OOM; + } + + if (input->cursor_present && + input->cursor_ref != NULL && + input->cursor_ref->digest.data != NULL) { + if (!amduat_asl_ref_encode_hex(*input->cursor_ref, &cursor_ref_hex)) { + amduatd_fed_push_plan_candidate_free(&candidate); + return AMDUATD_FED_PUSH_PLAN_ERR_OOM; + } + } + + memset(&b, 0, sizeof(b)); + if (!amduatd_fed_push_plan_strbuf_append_cstr(&b, "{")) { + goto plan_oom; + } + + if (!amduatd_fed_push_plan_strbuf_append_cstr(&b, "\"peer\":\"") || + !amduatd_fed_push_plan_strbuf_append_cstr(&b, input->peer_key) || + !amduatd_fed_push_plan_strbuf_append_cstr(&b, "\",")) { + goto plan_oom; + } + + snprintf(tmp, sizeof(tmp), "%u", (unsigned int)input->domain_id); + if (!amduatd_fed_push_plan_strbuf_append_cstr(&b, "\"domain_id\":") || + !amduatd_fed_push_plan_strbuf_append_cstr(&b, tmp) || + !amduatd_fed_push_plan_strbuf_append_cstr(&b, ",")) { + goto plan_oom; + } + + if (!amduatd_fed_push_plan_strbuf_append_cstr(&b, "\"effective_space\":{")) { + goto plan_oom; + } + if (input->effective_space != NULL && + input->effective_space->enabled && + input->effective_space->space_id.data != NULL) { + const char *space_id = (const char *)input->effective_space->space_id.data; + if (!amduatd_fed_push_plan_strbuf_append_cstr(&b, "\"mode\":\"scoped\",") || + !amduatd_fed_push_plan_strbuf_append_cstr(&b, "\"space_id\":\"") || + !amduatd_fed_push_plan_strbuf_append_cstr(&b, space_id) || + !amduatd_fed_push_plan_strbuf_append_cstr(&b, "\"")) { + goto plan_oom; + } + } else { + if (!amduatd_fed_push_plan_strbuf_append_cstr( + &b, "\"mode\":\"unscoped\",") || + !amduatd_fed_push_plan_strbuf_append_cstr(&b, "\"space_id\":null")) { + goto plan_oom; + } + } + if (!amduatd_fed_push_plan_strbuf_append_cstr(&b, "},")) { + goto plan_oom; + } + + if (!amduatd_fed_push_plan_strbuf_append_cstr(&b, "\"cursor\":{")) { + goto plan_oom; + } + if (!amduatd_fed_push_plan_strbuf_append_cstr(&b, "\"present\":") || + !amduatd_fed_push_plan_strbuf_append_cstr(&b, + input->cursor_present ? "true" + : "false")) { + goto plan_oom; + } + if (!amduatd_fed_push_plan_strbuf_append_cstr(&b, ",\"last_logseq\":")) { + goto plan_oom; + } + if (input->cursor_present && input->cursor != NULL && + input->cursor->has_logseq) { + snprintf(tmp, sizeof(tmp), "%llu", + (unsigned long long)input->cursor->last_logseq); + if (!amduatd_fed_push_plan_strbuf_append_cstr(&b, tmp)) { + goto plan_oom; + } + } else { + if (!amduatd_fed_push_plan_strbuf_append_cstr(&b, "null")) { + goto plan_oom; + } + } + if (!amduatd_fed_push_plan_strbuf_append_cstr(&b, ",\"ref\":")) { + goto plan_oom; + } + if (cursor_ref_hex != NULL) { + if (!amduatd_fed_push_plan_strbuf_append_cstr(&b, "\"") || + !amduatd_fed_push_plan_strbuf_append_cstr(&b, cursor_ref_hex) || + !amduatd_fed_push_plan_strbuf_append_cstr(&b, "\"")) { + goto plan_oom; + } + } else { + if (!amduatd_fed_push_plan_strbuf_append_cstr(&b, "null")) { + goto plan_oom; + } + } + if (!amduatd_fed_push_plan_strbuf_append_cstr(&b, "},")) { + goto plan_oom; + } + + if (!amduatd_fed_push_plan_strbuf_append_cstr(&b, "\"scan\":{")) { + goto plan_oom; + } + snprintf(tmp, sizeof(tmp), "%zu", input->record_count); + if (!amduatd_fed_push_plan_strbuf_append_cstr(&b, "\"record_count\":") || + !amduatd_fed_push_plan_strbuf_append_cstr(&b, tmp)) { + goto plan_oom; + } + if (!amduatd_fed_push_plan_strbuf_append_cstr(&b, ",\"first_logseq\":")) { + goto plan_oom; + } + if (first != NULL) { + snprintf(tmp, sizeof(tmp), "%llu", (unsigned long long)first->logseq); + if (!amduatd_fed_push_plan_strbuf_append_cstr(&b, tmp)) { + goto plan_oom; + } + } else { + if (!amduatd_fed_push_plan_strbuf_append_cstr(&b, "null")) { + goto plan_oom; + } + } + if (!amduatd_fed_push_plan_strbuf_append_cstr(&b, ",\"last_logseq\":")) { + goto plan_oom; + } + if (last != NULL) { + snprintf(tmp, sizeof(tmp), "%llu", (unsigned long long)last->logseq); + if (!amduatd_fed_push_plan_strbuf_append_cstr(&b, tmp)) { + goto plan_oom; + } + } else { + if (!amduatd_fed_push_plan_strbuf_append_cstr(&b, "null")) { + goto plan_oom; + } + } + if (!amduatd_fed_push_plan_strbuf_append_cstr(&b, "},")) { + goto plan_oom; + } + + if (!amduatd_fed_push_plan_strbuf_append_cstr(&b, "\"records\":[")) { + goto plan_oom; + } + for (i = 0; i < input->record_count; ++i) { + const amduat_fed_record_t *rec = &input->records[i]; + const char *type_name = amduatd_fed_push_plan_record_type_name(rec->id.type); + if (i > 0) { + if (!amduatd_fed_push_plan_strbuf_append_cstr(&b, ",")) { + goto plan_oom; + } + } + if (!amduat_asl_ref_encode_hex(rec->id.ref, &ref_hex)) { + goto plan_oom; + } + if (!amduatd_fed_push_plan_strbuf_append_cstr(&b, "{\"logseq\":")) { + goto plan_oom; + } + snprintf(tmp, sizeof(tmp), "%llu", (unsigned long long)rec->logseq); + if (!amduatd_fed_push_plan_strbuf_append_cstr(&b, tmp) || + !amduatd_fed_push_plan_strbuf_append_cstr(&b, ",\"record_type\":\"") || + !amduatd_fed_push_plan_strbuf_append_cstr(&b, type_name) || + !amduatd_fed_push_plan_strbuf_append_cstr(&b, "\",\"ref\":\"") || + !amduatd_fed_push_plan_strbuf_append_cstr(&b, ref_hex) || + !amduatd_fed_push_plan_strbuf_append_cstr(&b, "\"}")) { + goto plan_oom; + } + free(ref_hex); + ref_hex = NULL; + } + if (!amduatd_fed_push_plan_strbuf_append_cstr(&b, "],")) { + goto plan_oom; + } + + if (!amduatd_fed_push_plan_strbuf_append_cstr(&b, + "\"required_artifacts\":[")) { + goto plan_oom; + } + { + bool first_artifact = true; + for (i = 0; i < input->record_count; ++i) { + const amduat_fed_record_t *rec = &input->records[i]; + if (rec->id.type == AMDUAT_FED_REC_TOMBSTONE) { + continue; + } + if (!amduat_asl_ref_encode_hex(rec->id.ref, &ref_hex)) { + goto plan_oom; + } + if (!first_artifact) { + if (!amduatd_fed_push_plan_strbuf_append_cstr(&b, ",")) { + goto plan_oom; + } + } + if (!amduatd_fed_push_plan_strbuf_append_cstr(&b, "\"") || + !amduatd_fed_push_plan_strbuf_append_cstr(&b, ref_hex) || + !amduatd_fed_push_plan_strbuf_append_cstr(&b, "\"")) { + goto plan_oom; + } + first_artifact = false; + free(ref_hex); + ref_hex = NULL; + } + } + if (!amduatd_fed_push_plan_strbuf_append_cstr(&b, "],")) { + goto plan_oom; + } + + if (!amduatd_fed_push_plan_strbuf_append_cstr( + &b, "\"next_cursor_candidate\":{")) { + goto plan_oom; + } + if (!amduatd_fed_push_plan_strbuf_append_cstr(&b, "\"last_logseq\":")) { + goto plan_oom; + } + if (candidate.has_logseq) { + snprintf(tmp, sizeof(tmp), "%llu", (unsigned long long)candidate.logseq); + if (!amduatd_fed_push_plan_strbuf_append_cstr(&b, tmp)) { + goto plan_oom; + } + } else { + if (!amduatd_fed_push_plan_strbuf_append_cstr(&b, "null")) { + goto plan_oom; + } + } + if (!amduatd_fed_push_plan_strbuf_append_cstr(&b, ",\"ref\":")) { + goto plan_oom; + } + if (candidate.has_ref) { + if (!amduat_asl_ref_encode_hex(candidate.ref, &ref_hex)) { + goto plan_oom; + } + if (!amduatd_fed_push_plan_strbuf_append_cstr(&b, "\"") || + !amduatd_fed_push_plan_strbuf_append_cstr(&b, ref_hex) || + !amduatd_fed_push_plan_strbuf_append_cstr(&b, "\"")) { + goto plan_oom; + } + free(ref_hex); + ref_hex = NULL; + } else { + if (!amduatd_fed_push_plan_strbuf_append_cstr(&b, "null")) { + goto plan_oom; + } + } + if (!amduatd_fed_push_plan_strbuf_append_cstr(&b, "}}\n")) { + goto plan_oom; + } + + amduatd_fed_push_plan_candidate_free(&candidate); + free(cursor_ref_hex); + *out_json = b.data; + return AMDUATD_FED_PUSH_PLAN_OK; + +plan_oom: + free(ref_hex); + amduatd_fed_push_plan_candidate_free(&candidate); + free(cursor_ref_hex); + amduatd_fed_push_plan_strbuf_free(&b); + return AMDUATD_FED_PUSH_PLAN_ERR_OOM; +} diff --git a/src/amduatd_fed_push_plan.h b/src/amduatd_fed_push_plan.h new file mode 100644 index 0000000..00b48b1 --- /dev/null +++ b/src/amduatd_fed_push_plan.h @@ -0,0 +1,88 @@ +#ifndef AMDUATD_FED_PUSH_PLAN_H +#define AMDUATD_FED_PUSH_PLAN_H + +#include "amduat/fed/replay.h" +#include "amduatd_fed.h" +#include "amduatd_fed_cursor.h" +#include "amduatd_space.h" + +#include +#include +#include + +#ifdef __cplusplus +extern "C" { +#endif + +typedef enum { + AMDUATD_FED_PUSH_PLAN_OK = 0, + AMDUATD_FED_PUSH_PLAN_ERR_INVALID = 1, + AMDUATD_FED_PUSH_PLAN_ERR_DISABLED = 2, + AMDUATD_FED_PUSH_PLAN_ERR_UNSUPPORTED = 3, + AMDUATD_FED_PUSH_PLAN_ERR_OOM = 4 +} amduatd_fed_push_plan_status_t; + +typedef struct { + const char *peer_key; + uint32_t domain_id; + const amduatd_space_t *effective_space; + bool cursor_present; + const amduatd_fed_cursor_record_t *cursor; + const amduat_reference_t *cursor_ref; + const amduat_fed_record_t *records; + size_t record_count; +} amduatd_fed_push_plan_input_t; + +typedef struct { + bool has_logseq; + uint64_t logseq; + bool has_ref; + amduat_reference_t ref; +} amduatd_fed_push_cursor_candidate_t; + +typedef struct { + bool cursor_present; + amduatd_fed_cursor_record_t cursor; + amduat_reference_t cursor_ref; + amduat_fed_record_t *records; + size_t record_count; +} amduatd_fed_push_plan_scan_t; + +amduatd_fed_push_plan_status_t amduatd_fed_push_plan_check( + const amduatd_fed_cfg_t *cfg, + const amduat_asl_store_t *store); + +void amduatd_fed_push_plan_scan_init(amduatd_fed_push_plan_scan_t *scan); + +void amduatd_fed_push_plan_scan_free(amduatd_fed_push_plan_scan_t *scan); + +amduatd_fed_push_plan_status_t amduatd_fed_push_plan_scan( + amduat_asl_store_t *store, + amduat_asl_pointer_store_t *pointer_store, + const amduatd_space_t *effective_space, + const char *peer_key, + uint64_t limit, + const char *root_path, + amduatd_fed_push_plan_scan_t *out_scan); + +void amduatd_fed_push_plan_candidate_init( + amduatd_fed_push_cursor_candidate_t *candidate); + +void amduatd_fed_push_plan_candidate_free( + amduatd_fed_push_cursor_candidate_t *candidate); + +bool amduatd_fed_push_plan_next_cursor_candidate( + const amduatd_fed_cursor_record_t *cursor, + const amduat_fed_record_t *records, + size_t record_count, + amduatd_fed_push_cursor_candidate_t *out_candidate); + +amduatd_fed_push_plan_status_t amduatd_fed_push_plan_json( + const amduatd_fed_push_plan_input_t *input, + char **out_json); + +#ifdef __cplusplus +} /* extern "C" */ +#endif + +#endif /* AMDUATD_FED_PUSH_PLAN_H */ diff --git a/tests/test_amduatd_fed_push_apply.c b/tests/test_amduatd_fed_push_apply.c new file mode 100644 index 0000000..654dad6 --- /dev/null +++ b/tests/test_amduatd_fed_push_apply.c @@ -0,0 +1,277 @@ +#ifndef _POSIX_C_SOURCE +#define _POSIX_C_SOURCE 200809L +#endif + +#include "amduatd_fed_push_apply.h" +#include "amduatd_fed_cursor.h" +#include "amduatd_space.h" +#include "amduatd_store.h" + +#include "amduat/asl/asl_store_fs_meta.h" +#include "amduat/asl/artifact_io.h" +#include "amduat/asl/log_store.h" +#include "amduat/hash/asl1.h" + +#include +#include +#include +#include + +typedef struct { + size_t call_count; + size_t already_present_at; +} amduatd_test_push_transport_t; + +static int failures = 0; + +static void expect(bool cond, const char *msg) { + if (!cond) { + fprintf(stderr, "FAIL: %s\n", msg); + failures++; + } +} + +static char *amduatd_test_make_temp_dir(void) { + char tmpl[] = "/tmp/amduatd-fed-push-XXXXXX"; + char *dir = mkdtemp(tmpl); + size_t len; + char *copy; + if (dir == NULL) { + perror("mkdtemp"); + return NULL; + } + len = strlen(dir); + copy = (char *)malloc(len + 1u); + if (copy == NULL) { + fprintf(stderr, "failed to allocate temp dir copy\n"); + return NULL; + } + memcpy(copy, dir, len + 1u); + return copy; +} + +static bool amduatd_test_store_artifact(amduat_asl_store_t *store, + const char *payload, + amduat_reference_t *out_ref) { + amduat_artifact_t artifact; + amduat_octets_t payload_bytes = amduat_octets(NULL, 0u); + amduat_asl_index_state_t state; + amduat_asl_store_error_t err; + + if (store == NULL || payload == NULL || out_ref == NULL) { + return false; + } + if (!amduat_octets_clone(amduat_octets(payload, strlen(payload)), + &payload_bytes)) { + return false; + } + if (!amduat_asl_artifact_from_bytes(payload_bytes, + AMDUAT_ASL_IO_RAW, + false, + amduat_type_tag(0u), + &artifact)) { + amduat_octets_free(&payload_bytes); + return false; + } + err = amduat_asl_store_put_indexed(store, artifact, out_ref, &state); + amduat_asl_artifact_free(&artifact); + return err == AMDUAT_ASL_STORE_OK; +} + +static bool amduatd_test_append_fed_log(amduat_asl_store_t *store, + amduat_asl_pointer_store_t *pointer_store, + const amduatd_space_t *space, + const char *root_path, + amduat_reference_t ref) { + amduat_asl_log_store_t log_store; + amduat_octets_t log_name = amduat_octets(NULL, 0u); + amduat_asl_log_entry_t entry; + uint64_t offset = 0u; + amduat_asl_store_error_t err; + + if (!amduat_asl_log_store_init(&log_store, + root_path, + store, + pointer_store)) { + return false; + } + if (!amduatd_space_scope_name(space, "fed/records", &log_name)) { + return false; + } + memset(&entry, 0, sizeof(entry)); + entry.kind = AMDUATD_FED_LOG_KIND_ARTIFACT; + entry.has_timestamp = false; + entry.timestamp = 0u; + entry.payload_ref = ref; + entry.has_actor = false; + entry.actor = amduat_octets(NULL, 0u); + err = amduat_asl_log_append(&log_store, + (const char *)log_name.data, + &entry, + 1u, + &offset); + amduat_octets_free(&log_name); + return err == AMDUAT_ASL_STORE_OK; +} + +static bool amduatd_test_push_post_ingest(void *ctx, + amduat_fed_record_type_t record_type, + amduat_reference_t ref, + amduat_octets_t bytes, + int *out_status, + char **out_body) { + amduatd_test_push_transport_t *t = (amduatd_test_push_transport_t *)ctx; + const char *status = "ok"; + const char *applied = "true"; + char buf[256]; + int n; + + (void)record_type; + (void)ref; + (void)bytes; + + if (out_status == NULL || out_body == NULL || t == NULL) { + return false; + } + t->call_count++; + if (t->already_present_at != 0u && t->call_count == t->already_present_at) { + status = "already_present"; + applied = "false"; + } + n = snprintf(buf, + sizeof(buf), + "{\"status\":\"%s\",\"applied\":%s," + "\"ref\":null,\"effective_space\":{" + "\"mode\":\"unscoped\",\"space_id\":null}}", + status, + applied); + if (n <= 0 || (size_t)n >= sizeof(buf)) { + return false; + } + *out_status = 200; + *out_body = strdup(buf); + return *out_body != NULL; +} + +int main(void) { + char *root = amduatd_test_make_temp_dir(); + amduat_asl_store_fs_config_t cfg; + amduatd_store_ctx_t store_ctx; + amduat_asl_store_t store; + amduat_asl_pointer_store_t pointer_store; + amduatd_space_t space; + amduatd_fed_cfg_t fed_cfg; + amduat_reference_t ref0; + amduat_reference_t ref1; + amduatd_fed_push_transport_t transport; + amduatd_test_push_transport_t stub; + amduatd_fed_push_apply_report_t report; + amduatd_fed_push_apply_status_t status; + amduatd_fed_cursor_record_t cursor; + amduat_reference_t cursor_ref; + + if (root == NULL) { + return 1; + } + + memset(&cfg, 0, sizeof(cfg)); + if (!amduat_asl_store_fs_init_root(root, NULL, &cfg)) { + fprintf(stderr, "failed to init store root\n"); + free(root); + return 1; + } + memset(&store_ctx, 0, sizeof(store_ctx)); + memset(&store, 0, sizeof(store)); + if (!amduatd_store_init(&store, + &cfg, + &store_ctx, + root, + AMDUATD_STORE_BACKEND_INDEX)) { + fprintf(stderr, "failed to init store\n"); + free(root); + return 1; + } + if (!amduat_asl_pointer_store_init(&pointer_store, root)) { + fprintf(stderr, "failed to init pointer store\n"); + free(root); + return 1; + } + if (!amduatd_space_init(&space, "demo", false)) { + fprintf(stderr, "failed to init space\n"); + free(root); + return 1; + } + + if (!amduatd_test_store_artifact(&store, "alpha", &ref0) || + !amduatd_test_store_artifact(&store, "beta", &ref1)) { + fprintf(stderr, "failed to store artifacts\n"); + free(root); + return 1; + } + if (!amduatd_test_append_fed_log(&store, + &pointer_store, + &space, + root, + ref0) || + !amduatd_test_append_fed_log(&store, + &pointer_store, + &space, + root, + ref1)) { + fprintf(stderr, "failed to append fed log\n"); + amduat_reference_free(&ref0); + amduat_reference_free(&ref1); + free(root); + return 1; + } + + amduatd_fed_cfg_init(&fed_cfg); + fed_cfg.enabled = true; + + memset(&stub, 0, sizeof(stub)); + stub.already_present_at = 1u; + memset(&transport, 0, sizeof(transport)); + transport.ctx = &stub; + transport.post_ingest = amduatd_test_push_post_ingest; + + status = amduatd_fed_push_apply(&store, + &pointer_store, + &space, + "2", + 16u, + root, + &fed_cfg, + &transport, + &report); + expect(status == AMDUATD_FED_PUSH_APPLY_OK, "push apply ok"); + expect(report.sent_record_count == 2u, "sent record count"); + expect(report.peer_ok_count == 1u, "peer ok count"); + expect(report.peer_already_present_count == 1u, "peer already present"); + expect(report.cursor_advanced, "cursor advanced"); + expect(report.cursor_after_has_logseq && + report.cursor_after_logseq == 1u, + "cursor after logseq"); + + amduatd_fed_cursor_record_init(&cursor); + memset(&cursor_ref, 0, sizeof(cursor_ref)); + { + amduatd_fed_cursor_status_t st; + st = amduatd_fed_push_cursor_get(&store, + &pointer_store, + &space, + "2", + &cursor, + &cursor_ref); + expect(st == AMDUATD_FED_CURSOR_OK, "push cursor stored"); + expect(cursor.has_logseq && cursor.last_logseq == 1u, + "push cursor logseq"); + } + + amduatd_fed_cursor_record_free(&cursor); + amduat_reference_free(&cursor_ref); + amduatd_fed_push_apply_report_free(&report); + amduat_reference_free(&ref0); + amduat_reference_free(&ref1); + free(root); + return failures == 0 ? 0 : 1; +} diff --git a/tests/test_amduatd_fed_push_plan.c b/tests/test_amduatd_fed_push_plan.c new file mode 100644 index 0000000..9dfd18c --- /dev/null +++ b/tests/test_amduatd_fed_push_plan.c @@ -0,0 +1,175 @@ +#ifndef _POSIX_C_SOURCE +#define _POSIX_C_SOURCE 200809L +#endif + +#include "amduatd_fed_push_plan.h" +#include "amduatd_fed_cursor.h" + +#include "amduat/asl/ref_text.h" +#include "amduat/hash/asl1.h" + +#include +#include +#include +#include + +static int failures = 0; + +static void expect(bool cond, const char *msg) { + if (!cond) { + fprintf(stderr, "FAIL: %s\n", msg); + failures++; + } +} + +static bool amduatd_make_test_ref(uint8_t fill, amduat_reference_t *out_ref) { + uint8_t digest_bytes[32]; + amduat_octets_t digest; + if (out_ref == NULL) { + return false; + } + memset(digest_bytes, fill, sizeof(digest_bytes)); + if (!amduat_octets_clone(amduat_octets(digest_bytes, sizeof(digest_bytes)), + &digest)) { + return false; + } + *out_ref = amduat_reference(AMDUAT_HASH_ASL1_ID_SHA256, digest); + return true; +} + +int main(void) { + amduatd_fed_cfg_t cfg; + amduat_asl_store_t store; + amduatd_fed_push_plan_status_t status; + + amduatd_fed_cfg_init(&cfg); + memset(&store, 0, sizeof(store)); + status = amduatd_fed_push_plan_check(&cfg, &store); + expect(status == AMDUATD_FED_PUSH_PLAN_ERR_DISABLED, + "disabled federation check"); + + cfg.enabled = true; + status = amduatd_fed_push_plan_check(&cfg, &store); + expect(status == AMDUATD_FED_PUSH_PLAN_ERR_UNSUPPORTED, + "unsupported backend check"); + + { + amduatd_fed_push_plan_input_t input; + amduat_fed_record_t records[2]; + amduat_reference_t ref0; + amduat_reference_t ref1; + char *json = NULL; + char *ref0_hex = NULL; + + if (!amduatd_make_test_ref(0x01, &ref0) || + !amduatd_make_test_ref(0x02, &ref1)) { + fprintf(stderr, "FAIL: make refs\n"); + return 1; + } + + memset(records, 0, sizeof(records)); + records[0].id.type = AMDUAT_FED_REC_ARTIFACT; + records[0].id.ref = ref0; + records[0].logseq = 1u; + records[1].id.type = AMDUAT_FED_REC_PER; + records[1].id.ref = ref1; + records[1].logseq = 2u; + + memset(&input, 0, sizeof(input)); + input.peer_key = "1"; + input.domain_id = 42u; + input.cursor_present = false; + input.records = records; + input.record_count = 2u; + + status = amduatd_fed_push_plan_json(&input, &json); + expect(status == AMDUATD_FED_PUSH_PLAN_OK, "plan json missing cursor"); + expect(json != NULL && strstr(json, "\"present\":false") != NULL, + "cursor present false"); + expect(json != NULL && strstr(json, "\"record_count\":2") != NULL, + "record count"); + expect(json != NULL && strstr(json, "\"last_logseq\":2") != NULL, + "next cursor candidate"); + expect(json != NULL && strstr(json, "\"domain_id\":42") != NULL, + "domain id"); + expect(json != NULL && strstr(json, "\"record_type\":\"per\"") != NULL, + "record type per"); + + if (amduat_asl_ref_encode_hex(ref0, &ref0_hex)) { + expect(json != NULL && strstr(json, ref0_hex) != NULL, + "required artifacts include ref"); + } else { + fprintf(stderr, "FAIL: encode ref\n"); + failures++; + } + free(ref0_hex); + free(json); + amduat_reference_free(&ref0); + amduat_reference_free(&ref1); + } + + { + amduatd_fed_push_plan_input_t input; + amduatd_fed_cursor_record_t cursor; + amduat_reference_t cursor_ref; + amduat_reference_t record_ref; + char *json = NULL; + char *cursor_ref_hex = NULL; + + amduatd_fed_cursor_record_init(&cursor); + cursor.peer_key = strdup("7"); + cursor.space_id = NULL; + if (cursor.peer_key == NULL) { + fprintf(stderr, "FAIL: cursor peer allocation\n"); + return 1; + } + cursor.has_logseq = true; + cursor.last_logseq = 5u; + if (!amduatd_make_test_ref(0x03, &record_ref)) { + fprintf(stderr, "FAIL: make cursor record ref\n"); + return 1; + } + cursor.has_record_ref = true; + cursor.last_record_ref = record_ref; + if (!amduatd_make_test_ref(0x04, &cursor_ref)) { + fprintf(stderr, "FAIL: make cursor ref\n"); + amduatd_fed_cursor_record_free(&cursor); + return 1; + } + + if (!amduat_asl_ref_encode_hex(cursor_ref, &cursor_ref_hex)) { + fprintf(stderr, "FAIL: encode cursor ref\n"); + amduat_reference_free(&cursor_ref); + amduatd_fed_cursor_record_free(&cursor); + return 1; + } + + memset(&input, 0, sizeof(input)); + input.peer_key = "7"; + input.domain_id = 7u; + input.cursor_present = true; + input.cursor = &cursor; + input.cursor_ref = &cursor_ref; + input.records = NULL; + input.record_count = 0u; + + status = amduatd_fed_push_plan_json(&input, &json); + expect(status == AMDUATD_FED_PUSH_PLAN_OK, "plan json with cursor"); + expect(json != NULL && strstr(json, "\"present\":true") != NULL, + "cursor present true"); + expect(json != NULL && strstr(json, "\"last_logseq\":5") != NULL, + "cursor logseq echoed"); + expect(json != NULL && + strstr(json, "\"next_cursor_candidate\":{\"last_logseq\":null") != + NULL, + "next cursor candidate empty"); + expect(json != NULL && strstr(json, cursor_ref_hex) != NULL, + "cursor ref echoed"); + free(cursor_ref_hex); + free(json); + amduat_reference_free(&cursor_ref); + amduatd_fed_cursor_record_free(&cursor); + } + + return failures == 0 ? 0 : 1; +}