From f99ec3ee895453545177c5d7f4f0bd9e2e0d61d0 Mon Sep 17 00:00:00 2001 From: Carl Niklas Rydberg Date: Sat, 24 Jan 2026 18:11:44 +0100 Subject: [PATCH] Add bounded fed pull/push until endpoints with tests --- CMakeLists.txt | 57 +++ README.md | 17 + registry/amduatd-api-contract.v1.json | 2 + src/amduatd.c | 630 ++++++++++++++++++++++++++ src/amduatd_fed_until.c | 236 ++++++++++ src/amduatd_fed_until.h | 63 +++ tests/test_amduatd_fed_pull_until.c | 512 +++++++++++++++++++++ tests/test_amduatd_fed_push_until.c | 441 ++++++++++++++++++ 8 files changed, 1958 insertions(+) create mode 100644 src/amduatd_fed_until.c create mode 100644 src/amduatd_fed_until.h create mode 100644 tests/test_amduatd_fed_pull_until.c create mode 100644 tests/test_amduatd_fed_push_until.c diff --git a/CMakeLists.txt b/CMakeLists.txt index b8254e5..f992dc4 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -28,6 +28,7 @@ set(amduatd_sources src/amduatd.c src/amduatd_http.c src/amduatd_caps.c src/amduatd_space.c src/amduatd_concepts.c src/amduatd_store.c src/amduatd_derivation_index.c src/amduatd_fed.c src/amduatd_fed_cursor.c + src/amduatd_fed_until.c src/amduatd_fed_pull_plan.c src/amduatd_fed_push_plan.c src/amduatd_fed_pull_apply.c src/amduatd_fed_push_apply.c src/amduatd_space_doctor.c src/amduatd_space_roots.c) @@ -228,6 +229,62 @@ target_link_libraries(amduatd_test_fed_push_apply add_test(NAME amduatd_fed_push_apply COMMAND amduatd_test_fed_push_apply) +add_executable(amduatd_test_fed_pull_until + tests/test_amduatd_fed_pull_until.c + src/amduatd_fed_until.c + src/amduatd_fed_pull_apply.c + src/amduatd_fed_pull_plan.c + src/amduatd_fed_push_apply.c + src/amduatd_fed_push_plan.c + src/amduatd_fed_cursor.c + src/amduatd_fed.c + src/amduatd_space.c + src/amduatd_store.c +) + +target_include_directories(amduatd_test_fed_pull_until + PRIVATE ${CMAKE_CURRENT_SOURCE_DIR} + PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}/src + PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}/vendor/amduat/include +) + +target_link_libraries(amduatd_test_fed_pull_until + PRIVATE amduat_asl_store_fs amduat_asl_store_index_fs amduat_asl_record + amduat_asl amduat_enc amduat_asl_pointer_fs amduat_util + amduat_asl_log_store + amduat_hash_asl1 amduat_fed +) + +add_test(NAME amduatd_fed_pull_until COMMAND amduatd_test_fed_pull_until) + +add_executable(amduatd_test_fed_push_until + tests/test_amduatd_fed_push_until.c + src/amduatd_fed_until.c + src/amduatd_fed_pull_apply.c + src/amduatd_fed_pull_plan.c + src/amduatd_fed_push_apply.c + src/amduatd_fed_push_plan.c + src/amduatd_fed_cursor.c + src/amduatd_fed.c + src/amduatd_space.c + src/amduatd_store.c +) + +target_include_directories(amduatd_test_fed_push_until + PRIVATE ${CMAKE_CURRENT_SOURCE_DIR} + PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}/src + PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}/vendor/amduat/include +) + +target_link_libraries(amduatd_test_fed_push_until + PRIVATE amduat_asl_store_fs amduat_asl_store_index_fs amduat_asl_record + amduat_asl amduat_enc amduat_asl_pointer_fs amduat_util + amduat_asl_log_store + amduat_hash_asl1 amduat_fed +) + +add_test(NAME amduatd_fed_push_until COMMAND amduatd_test_fed_push_until) + add_executable(amduatd_test_fed_pull_apply tests/test_amduatd_fed_pull_apply.c src/amduatd_fed_pull_apply.c diff --git a/README.md b/README.md index c96670d..f94bd14 100644 --- a/README.md +++ b/README.md @@ -160,6 +160,23 @@ curl --unix-socket amduatd.sock -X POST \ `/v1/fed/push` uses `/v1/fed/ingest` on the peer and only advances the push cursor after the batch completes. It requires the index backend. +### Federation sync until caught up + +Use the bounded helpers to repeatedly apply pull/push until no work remains, +with a hard `max_rounds` cap to keep requests bounded: + +```sh +curl --unix-socket amduatd.sock -X POST \ + 'http://localhost/v1/fed/pull/until?peer=2&limit=128&max_rounds=10' \ + -H 'X-Amduat-Space: demo' +``` + +```sh +curl --unix-socket amduatd.sock -X POST \ + 'http://localhost/v1/fed/push/until?peer=2&limit=128&max_rounds=10' \ + -H 'X-Amduat-Space: demo' +``` + ### Federation ingest (receiver) `/v1/fed/ingest` applies a single incoming record (push receiver). The request diff --git a/registry/amduatd-api-contract.v1.json b/registry/amduatd-api-contract.v1.json index 1eccb38..1dbb6b8 100644 --- a/registry/amduatd-api-contract.v1.json +++ b/registry/amduatd-api-contract.v1.json @@ -14,6 +14,8 @@ {"method": "GET", "path": "/v1/fed/records"}, {"method": "GET", "path": "/v1/fed/artifacts/{ref}"}, {"method": "GET", "path": "/v1/fed/status"}, + {"method": "POST", "path": "/v1/fed/pull/until"}, + {"method": "POST", "path": "/v1/fed/push/until"}, {"method": "POST", "path": "/v1/concepts"}, {"method": "GET", "path": "/v1/concepts"}, {"method": "GET", "path": "/v1/concepts/{name}"}, diff --git a/src/amduatd.c b/src/amduatd.c index 15e9874..35ab098 100644 --- a/src/amduatd.c +++ b/src/amduatd.c @@ -46,6 +46,7 @@ #include "amduatd_fed_push_plan.h" #include "amduatd_fed_push_apply.h" #include "amduatd_fed_pull_apply.h" +#include "amduatd_fed_until.h" #include "amduatd_store.h" #include "amduatd_derivation_index.h" #include "amduatd_space_doctor.h" @@ -133,7 +134,9 @@ static const char k_amduatd_contract_v1_json[] = "{\"method\":\"GET\",\"path\":\"/v1/fed/pull/plan\"}," "{\"method\":\"GET\",\"path\":\"/v1/fed/push/plan\"}," "{\"method\":\"POST\",\"path\":\"/v1/fed/pull\"}," + "{\"method\":\"POST\",\"path\":\"/v1/fed/pull/until\"}," "{\"method\":\"POST\",\"path\":\"/v1/fed/push\"}," + "{\"method\":\"POST\",\"path\":\"/v1/fed/push/until\"}," "{\"method\":\"POST\",\"path\":\"/v1/fed/pull\"}," "{\"method\":\"POST\",\"path\":\"/v1/fed/ingest\"}," "{\"method\":\"GET\",\"path\":\"/v1/fed/artifacts/{ref}\"}," @@ -5250,6 +5253,345 @@ fed_pull_cleanup: return true; } +static void amduatd_fed_pull_apply_status_http( + amduatd_fed_pull_apply_status_t status, + int *out_status, + const char **out_reason) { + int http_status = 200; + const char *http_reason = "OK"; + + if (status == AMDUATD_FED_PULL_APPLY_ERR_DISABLED) { + http_status = 503; + http_reason = "Service Unavailable"; + } else if (status == AMDUATD_FED_PULL_APPLY_ERR_UNSUPPORTED) { + http_status = 501; + http_reason = "Not Implemented"; + } else if (status == AMDUATD_FED_PULL_APPLY_ERR_INVALID) { + http_status = 400; + http_reason = "Bad Request"; + } else if (status == AMDUATD_FED_PULL_APPLY_ERR_REMOTE) { + http_status = 502; + http_reason = "Bad Gateway"; + } else if (status == AMDUATD_FED_PULL_APPLY_ERR_CONFLICT) { + http_status = 409; + http_reason = "Conflict"; + } else if (status != AMDUATD_FED_PULL_APPLY_OK) { + http_status = 500; + http_reason = "Internal Server Error"; + } + + if (out_status != NULL) { + *out_status = http_status; + } + if (out_reason != NULL) { + *out_reason = http_reason; + } +} + +static void amduatd_fed_push_apply_status_http( + amduatd_fed_push_apply_status_t status, + int *out_status, + const char **out_reason) { + int http_status = 200; + const char *http_reason = "OK"; + + if (status == AMDUATD_FED_PUSH_APPLY_ERR_DISABLED) { + http_status = 503; + http_reason = "Service Unavailable"; + } else if (status == AMDUATD_FED_PUSH_APPLY_ERR_UNSUPPORTED) { + http_status = 501; + http_reason = "Not Implemented"; + } else if (status == AMDUATD_FED_PUSH_APPLY_ERR_INVALID) { + http_status = 400; + http_reason = "Bad Request"; + } else if (status == AMDUATD_FED_PUSH_APPLY_ERR_REMOTE) { + http_status = 502; + http_reason = "Bad Gateway"; + } else if (status == AMDUATD_FED_PUSH_APPLY_ERR_CONFLICT) { + http_status = 409; + http_reason = "Conflict"; + } else if (status != AMDUATD_FED_PUSH_APPLY_OK) { + http_status = 500; + http_reason = "Internal Server Error"; + } + + if (out_status != NULL) { + *out_status = http_status; + } + if (out_reason != NULL) { + *out_reason = http_reason; + } +} + +static bool amduatd_handle_post_fed_pull_until( + int fd, + amduat_asl_store_t *store, + const amduatd_fed_cfg_t *fed_cfg, + const amduatd_caps_t *caps, + const amduatd_cfg_t *dcfg, + const amduatd_http_req_t *req, + const char *root_path) { + char peer_buf[AMDUAT_ASL_POINTER_NAME_MAX + 1u]; + char limit_buf[32]; + char rounds_buf[32]; + uint64_t limit = 128u; + uint64_t max_rounds = 10u; + amduat_asl_pointer_store_t pointer_store; + amduat_fed_transport_unix_t transport; + amduat_fed_transport_t ops; + amduatd_fed_pull_transport_t pull_transport; + amduatd_fed_until_report_t report; + amduatd_fed_pull_apply_status_t status; + amduatd_strbuf_t b; + char *cursor_ref_hex = NULL; + bool ok = false; + int http_status = 200; + const char *http_reason = "OK"; + + if (store == NULL || fed_cfg == NULL || req == NULL || root_path == NULL || + dcfg == NULL) { + return amduatd_send_json_error(fd, 500, "Internal Server Error", + "internal error"); + } + if (!amduatd_fed_require_space(fd, fed_cfg, req)) { + return false; + } + { + amduatd_fed_pull_plan_status_t check = + amduatd_fed_pull_plan_check(fed_cfg, store); + if (check == AMDUATD_FED_PULL_PLAN_ERR_DISABLED) { + return amduatd_send_json_error(fd, 503, "Service Unavailable", + "federation disabled"); + } + if (check == AMDUATD_FED_PULL_PLAN_ERR_UNSUPPORTED) { + return amduatd_send_json_error(fd, 501, "Not Implemented", + "requires index backend"); + } + if (check != AMDUATD_FED_PULL_PLAN_OK) { + return amduatd_send_json_error(fd, 500, "Internal Server Error", + "plan unavailable"); + } + } + + if (caps != NULL && caps->enabled && req->x_capability[0] != '\0') { + const char *reason = NULL; + if (!amduatd_caps_check_space(caps, dcfg, req, &reason)) { + if (reason != NULL && strcmp(reason, "wrong-space") == 0) { + return amduatd_send_json_error(fd, 403, "Forbidden", + "space not permitted by capability"); + } + return amduatd_send_json_error(fd, 403, "Forbidden", + "invalid capability"); + } + } + + if (amduatd_query_param(req->path, "peer", + peer_buf, sizeof(peer_buf)) == NULL || + peer_buf[0] == '\0') { + return amduatd_send_json_error(fd, 400, "Bad Request", + "missing peer"); + } + { + amduat_octets_t scoped = amduat_octets(NULL, 0u); + if (!amduatd_fed_cursor_pointer_name(req->effective_space, + peer_buf, + &scoped)) { + return amduatd_send_json_error(fd, 400, "Bad Request", "invalid peer"); + } + amduat_octets_free(&scoped); + } + + if (amduatd_query_param(req->path, "limit", + limit_buf, sizeof(limit_buf)) != NULL) { + if (!amduatd_parse_u64_str(limit_buf, &limit) || limit == 0u || + limit > 10000u) { + return amduatd_send_json_error(fd, 400, "Bad Request", "invalid limit"); + } + } + + if (amduatd_query_param(req->path, "max_rounds", + rounds_buf, sizeof(rounds_buf)) != NULL) { + if (!amduatd_parse_u64_str(rounds_buf, &max_rounds) || max_rounds == 0u || + max_rounds > 100u) { + return amduatd_send_json_error(fd, 400, "Bad Request", + "invalid max_rounds"); + } + } + + if (!amduat_asl_pointer_store_init(&pointer_store, root_path)) { + return amduatd_send_json_error(fd, 500, "Internal Server Error", + "pointer store error"); + } + + if (fed_cfg->transport_kind != AMDUATD_FED_TRANSPORT_UNIX || + !fed_cfg->unix_socket_set) { + return amduatd_send_json_error(fd, 501, "Not Implemented", + "federation transport unavailable"); + } + if (!amduat_fed_transport_unix_init(&transport, fed_cfg->unix_socket_path)) { + return amduatd_send_json_error(fd, 500, "Internal Server Error", + "transport init failed"); + } + if (req->effective_space != NULL && req->effective_space->enabled && + req->effective_space->space_id.data != NULL) { + (void)amduat_fed_transport_unix_set_space( + &transport, (const char *)req->effective_space->space_id.data); + } + + ops = amduat_fed_transport_unix_ops(&transport); + memset(&pull_transport, 0, sizeof(pull_transport)); + pull_transport.ctx = &transport; + pull_transport.get_records = amduatd_fed_pull_unix_get_records; + pull_transport.free_records = ops.free_records; + pull_transport.get_artifact = amduatd_fed_pull_unix_get_artifact; + + status = amduatd_fed_pull_until(store, + &pointer_store, + req->effective_space, + peer_buf, + limit, + max_rounds, + fed_cfg, + &pull_transport, + &report); + amduatd_fed_pull_apply_status_http(status, &http_status, &http_reason); + if (status != AMDUATD_FED_PULL_APPLY_OK) { + const char *msg = report.error[0] != '\0' ? report.error : "error"; + amduatd_fed_until_report_free(&report); + return amduatd_send_json_error(fd, http_status, http_reason, msg); + } + + if (report.cursor_ref_set) { + (void)amduat_asl_ref_encode_hex(report.cursor_ref, &cursor_ref_hex); + } + + memset(&b, 0, sizeof(b)); + if (!amduatd_strbuf_append_cstr(&b, "{")) { + goto fed_pull_until_cleanup; + } + if (!amduatd_strbuf_append_cstr(&b, "\"peer\":\"") || + !amduatd_strbuf_append_cstr(&b, peer_buf) || + !amduatd_strbuf_append_cstr(&b, "\",")) { + goto fed_pull_until_cleanup; + } + if (!amduatd_strbuf_append_cstr(&b, "\"effective_space\":{")) { + goto fed_pull_until_cleanup; + } + if (req->effective_space != NULL && req->effective_space->enabled && + req->effective_space->space_id.data != NULL) { + const char *space_id = (const char *)req->effective_space->space_id.data; + if (!amduatd_strbuf_append_cstr(&b, "\"mode\":\"scoped\",") || + !amduatd_strbuf_append_cstr(&b, "\"space_id\":\"") || + !amduatd_strbuf_append_cstr(&b, space_id) || + !amduatd_strbuf_append_cstr(&b, "\"")) { + goto fed_pull_until_cleanup; + } + } else { + if (!amduatd_strbuf_append_cstr(&b, "\"mode\":\"unscoped\",") || + !amduatd_strbuf_append_cstr(&b, "\"space_id\":null")) { + goto fed_pull_until_cleanup; + } + } + if (!amduatd_strbuf_append_cstr(&b, "},")) { + goto fed_pull_until_cleanup; + } + if (!amduatd_strbuf_append_cstr(&b, "\"operation\":\"pull\",")) { + goto fed_pull_until_cleanup; + } + { + char tmp[64]; + snprintf(tmp, sizeof(tmp), "%llu", (unsigned long long)limit); + if (!amduatd_strbuf_append_cstr(&b, "\"limit\":") || + !amduatd_strbuf_append_cstr(&b, tmp) || + !amduatd_strbuf_append_cstr(&b, ",")) { + goto fed_pull_until_cleanup; + } + snprintf(tmp, sizeof(tmp), "%llu", (unsigned long long)max_rounds); + if (!amduatd_strbuf_append_cstr(&b, "\"max_rounds\":") || + !amduatd_strbuf_append_cstr(&b, tmp) || + !amduatd_strbuf_append_cstr(&b, ",")) { + goto fed_pull_until_cleanup; + } + snprintf(tmp, sizeof(tmp), "%llu", + (unsigned long long)report.rounds_executed); + if (!amduatd_strbuf_append_cstr(&b, "\"rounds_executed\":") || + !amduatd_strbuf_append_cstr(&b, tmp) || + !amduatd_strbuf_append_cstr(&b, ",")) { + goto fed_pull_until_cleanup; + } + } + if (!amduatd_strbuf_append_cstr(&b, "\"caught_up\":") || + !amduatd_strbuf_append_cstr(&b, report.caught_up ? "true" : "false") || + !amduatd_strbuf_append_cstr(&b, ",")) { + goto fed_pull_until_cleanup; + } + if (!amduatd_strbuf_append_cstr(&b, "\"applied\":{")) { + goto fed_pull_until_cleanup; + } + { + char tmp[64]; + snprintf(tmp, sizeof(tmp), "%zu", report.total_records); + if (!amduatd_strbuf_append_cstr(&b, "\"records\":") || + !amduatd_strbuf_append_cstr(&b, tmp) || + !amduatd_strbuf_append_cstr(&b, ",")) { + goto fed_pull_until_cleanup; + } + snprintf(tmp, sizeof(tmp), "%zu", report.total_artifacts); + if (!amduatd_strbuf_append_cstr(&b, "\"artifacts\":") || + !amduatd_strbuf_append_cstr(&b, tmp) || + !amduatd_strbuf_append_cstr(&b, "},")) { + goto fed_pull_until_cleanup; + } + } + if (!amduatd_strbuf_append_cstr(&b, "\"cursor\":{")) { + goto fed_pull_until_cleanup; + } + if (!amduatd_strbuf_append_cstr(&b, "\"last_logseq\":")) { + goto fed_pull_until_cleanup; + } + if (report.cursor_has_logseq) { + char tmp[64]; + snprintf(tmp, sizeof(tmp), "%llu", + (unsigned long long)report.cursor_logseq); + if (!amduatd_strbuf_append_cstr(&b, tmp)) { + goto fed_pull_until_cleanup; + } + } else { + if (!amduatd_strbuf_append_cstr(&b, "null")) { + goto fed_pull_until_cleanup; + } + } + if (!amduatd_strbuf_append_cstr(&b, ",\"ref\":")) { + goto fed_pull_until_cleanup; + } + if (cursor_ref_hex != NULL) { + if (!amduatd_strbuf_append_cstr(&b, "\"") || + !amduatd_strbuf_append_cstr(&b, cursor_ref_hex) || + !amduatd_strbuf_append_cstr(&b, "\"")) { + goto fed_pull_until_cleanup; + } + } else { + if (!amduatd_strbuf_append_cstr(&b, "null")) { + goto fed_pull_until_cleanup; + } + } + if (!amduatd_strbuf_append_cstr(&b, "}}\n")) { + goto fed_pull_until_cleanup; + } + + ok = amduatd_http_send_json(fd, 200, "OK", b.data, false); + +fed_pull_until_cleanup: + amduatd_strbuf_free(&b); + free(cursor_ref_hex); + amduatd_fed_until_report_free(&report); + if (!ok) { + return amduatd_send_json_error(fd, 500, "Internal Server Error", + "encode error"); + } + return true; +} + static bool amduatd_handle_post_fed_push(int fd, amduat_asl_store_t *store, const amduatd_fed_cfg_t *fed_cfg, @@ -5675,6 +6017,272 @@ fed_push_cleanup: } return true; } + +static bool amduatd_handle_post_fed_push_until( + int fd, + amduat_asl_store_t *store, + const amduatd_fed_cfg_t *fed_cfg, + const amduatd_caps_t *caps, + const amduatd_cfg_t *dcfg, + const amduatd_http_req_t *req, + const char *root_path) { + char peer_buf[AMDUAT_ASL_POINTER_NAME_MAX + 1u]; + char limit_buf[32]; + char rounds_buf[32]; + uint64_t limit = 128u; + uint64_t max_rounds = 10u; + amduat_asl_pointer_store_t pointer_store; + amduat_fed_transport_unix_t transport; + amduatd_fed_push_transport_t push_transport; + amduatd_fed_until_report_t report; + amduatd_fed_push_apply_status_t status; + amduatd_strbuf_t b; + char *cursor_ref_hex = NULL; + bool ok = false; + int http_status = 200; + const char *http_reason = "OK"; + + if (store == NULL || fed_cfg == NULL || req == NULL || root_path == NULL || + dcfg == NULL) { + return amduatd_send_json_error(fd, 500, "Internal Server Error", + "internal error"); + } + if (!amduatd_fed_require_space(fd, fed_cfg, req)) { + return false; + } + { + amduatd_fed_push_plan_status_t check = + amduatd_fed_push_plan_check(fed_cfg, store); + if (check == AMDUATD_FED_PUSH_PLAN_ERR_DISABLED) { + return amduatd_send_json_error(fd, 503, "Service Unavailable", + "federation disabled"); + } + if (check == AMDUATD_FED_PUSH_PLAN_ERR_UNSUPPORTED) { + return amduatd_send_json_error(fd, 501, "Not Implemented", + "requires index backend"); + } + if (check != AMDUATD_FED_PUSH_PLAN_OK) { + return amduatd_send_json_error(fd, 500, "Internal Server Error", + "plan unavailable"); + } + } + + if (caps != NULL && caps->enabled && req->x_capability[0] != '\0') { + const char *reason = NULL; + if (!amduatd_caps_check_space(caps, dcfg, req, &reason)) { + if (reason != NULL && strcmp(reason, "wrong-space") == 0) { + return amduatd_send_json_error(fd, 403, "Forbidden", + "space not permitted by capability"); + } + return amduatd_send_json_error(fd, 403, "Forbidden", + "invalid capability"); + } + } + + if (amduatd_query_param(req->path, "peer", + peer_buf, sizeof(peer_buf)) == NULL || + peer_buf[0] == '\0') { + return amduatd_send_json_error(fd, 400, "Bad Request", + "missing peer"); + } + { + amduat_octets_t scoped = amduat_octets(NULL, 0u); + if (!amduatd_fed_push_cursor_pointer_name(req->effective_space, + peer_buf, + &scoped)) { + return amduatd_send_json_error(fd, 400, "Bad Request", "invalid peer"); + } + amduat_octets_free(&scoped); + } + + if (amduatd_query_param(req->path, "limit", + limit_buf, sizeof(limit_buf)) != NULL) { + if (!amduatd_parse_u64_str(limit_buf, &limit) || limit == 0u || + limit > 2048u) { + return amduatd_send_json_error(fd, 400, "Bad Request", "invalid limit"); + } + } + + if (amduatd_query_param(req->path, "max_rounds", + rounds_buf, sizeof(rounds_buf)) != NULL) { + if (!amduatd_parse_u64_str(rounds_buf, &max_rounds) || max_rounds == 0u || + max_rounds > 100u) { + return amduatd_send_json_error(fd, 400, "Bad Request", + "invalid max_rounds"); + } + } + + if (!amduat_asl_pointer_store_init(&pointer_store, root_path)) { + return amduatd_send_json_error(fd, 500, "Internal Server Error", + "pointer store error"); + } + + if (fed_cfg->transport_kind != AMDUATD_FED_TRANSPORT_UNIX || + !fed_cfg->unix_socket_set) { + return amduatd_send_json_error(fd, 501, "Not Implemented", + "federation transport unavailable"); + } + if (!amduat_fed_transport_unix_init(&transport, fed_cfg->unix_socket_path)) { + return amduatd_send_json_error(fd, 500, "Internal Server Error", + "transport init failed"); + } + if (req->effective_space != NULL && req->effective_space->enabled && + req->effective_space->space_id.data != NULL) { + (void)amduat_fed_transport_unix_set_space( + &transport, (const char *)req->effective_space->space_id.data); + } + + memset(&push_transport, 0, sizeof(push_transport)); + push_transport.ctx = &transport; + push_transport.post_ingest = amduatd_fed_push_unix_post_ingest; + + status = amduatd_fed_push_until(store, + &pointer_store, + req->effective_space, + peer_buf, + limit, + max_rounds, + root_path, + fed_cfg, + &push_transport, + &report); + amduatd_fed_push_apply_status_http(status, &http_status, &http_reason); + if (status != AMDUATD_FED_PUSH_APPLY_OK) { + const char *msg = report.error[0] != '\0' ? report.error : "error"; + amduatd_fed_until_report_free(&report); + return amduatd_send_json_error(fd, http_status, http_reason, msg); + } + + if (report.cursor_ref_set) { + (void)amduat_asl_ref_encode_hex(report.cursor_ref, &cursor_ref_hex); + } + + memset(&b, 0, sizeof(b)); + if (!amduatd_strbuf_append_cstr(&b, "{")) { + goto fed_push_until_cleanup; + } + if (!amduatd_strbuf_append_cstr(&b, "\"peer\":\"") || + !amduatd_strbuf_append_cstr(&b, peer_buf) || + !amduatd_strbuf_append_cstr(&b, "\",")) { + goto fed_push_until_cleanup; + } + if (!amduatd_strbuf_append_cstr(&b, "\"effective_space\":{")) { + goto fed_push_until_cleanup; + } + if (req->effective_space != NULL && req->effective_space->enabled && + req->effective_space->space_id.data != NULL) { + const char *space_id = (const char *)req->effective_space->space_id.data; + if (!amduatd_strbuf_append_cstr(&b, "\"mode\":\"scoped\",") || + !amduatd_strbuf_append_cstr(&b, "\"space_id\":\"") || + !amduatd_strbuf_append_cstr(&b, space_id) || + !amduatd_strbuf_append_cstr(&b, "\"")) { + goto fed_push_until_cleanup; + } + } else { + if (!amduatd_strbuf_append_cstr(&b, "\"mode\":\"unscoped\",") || + !amduatd_strbuf_append_cstr(&b, "\"space_id\":null")) { + goto fed_push_until_cleanup; + } + } + if (!amduatd_strbuf_append_cstr(&b, "},")) { + goto fed_push_until_cleanup; + } + if (!amduatd_strbuf_append_cstr(&b, "\"operation\":\"push\",")) { + goto fed_push_until_cleanup; + } + { + char tmp[64]; + snprintf(tmp, sizeof(tmp), "%llu", (unsigned long long)limit); + if (!amduatd_strbuf_append_cstr(&b, "\"limit\":") || + !amduatd_strbuf_append_cstr(&b, tmp) || + !amduatd_strbuf_append_cstr(&b, ",")) { + goto fed_push_until_cleanup; + } + snprintf(tmp, sizeof(tmp), "%llu", (unsigned long long)max_rounds); + if (!amduatd_strbuf_append_cstr(&b, "\"max_rounds\":") || + !amduatd_strbuf_append_cstr(&b, tmp) || + !amduatd_strbuf_append_cstr(&b, ",")) { + goto fed_push_until_cleanup; + } + snprintf(tmp, sizeof(tmp), "%llu", + (unsigned long long)report.rounds_executed); + if (!amduatd_strbuf_append_cstr(&b, "\"rounds_executed\":") || + !amduatd_strbuf_append_cstr(&b, tmp) || + !amduatd_strbuf_append_cstr(&b, ",")) { + goto fed_push_until_cleanup; + } + } + if (!amduatd_strbuf_append_cstr(&b, "\"caught_up\":") || + !amduatd_strbuf_append_cstr(&b, report.caught_up ? "true" : "false") || + !amduatd_strbuf_append_cstr(&b, ",")) { + goto fed_push_until_cleanup; + } + if (!amduatd_strbuf_append_cstr(&b, "\"applied\":{")) { + goto fed_push_until_cleanup; + } + { + char tmp[64]; + snprintf(tmp, sizeof(tmp), "%zu", report.total_records); + if (!amduatd_strbuf_append_cstr(&b, "\"records\":") || + !amduatd_strbuf_append_cstr(&b, tmp) || + !amduatd_strbuf_append_cstr(&b, ",")) { + goto fed_push_until_cleanup; + } + snprintf(tmp, sizeof(tmp), "%zu", report.total_artifacts); + if (!amduatd_strbuf_append_cstr(&b, "\"artifacts\":") || + !amduatd_strbuf_append_cstr(&b, tmp) || + !amduatd_strbuf_append_cstr(&b, "},")) { + goto fed_push_until_cleanup; + } + } + if (!amduatd_strbuf_append_cstr(&b, "\"cursor\":{")) { + goto fed_push_until_cleanup; + } + if (!amduatd_strbuf_append_cstr(&b, "\"last_logseq\":")) { + goto fed_push_until_cleanup; + } + if (report.cursor_has_logseq) { + char tmp[64]; + snprintf(tmp, sizeof(tmp), "%llu", + (unsigned long long)report.cursor_logseq); + if (!amduatd_strbuf_append_cstr(&b, tmp)) { + goto fed_push_until_cleanup; + } + } else { + if (!amduatd_strbuf_append_cstr(&b, "null")) { + goto fed_push_until_cleanup; + } + } + if (!amduatd_strbuf_append_cstr(&b, ",\"ref\":")) { + goto fed_push_until_cleanup; + } + if (cursor_ref_hex != NULL) { + if (!amduatd_strbuf_append_cstr(&b, "\"") || + !amduatd_strbuf_append_cstr(&b, cursor_ref_hex) || + !amduatd_strbuf_append_cstr(&b, "\"")) { + goto fed_push_until_cleanup; + } + } else { + if (!amduatd_strbuf_append_cstr(&b, "null")) { + goto fed_push_until_cleanup; + } + } + if (!amduatd_strbuf_append_cstr(&b, "}}\n")) { + goto fed_push_until_cleanup; + } + + ok = amduatd_http_send_json(fd, 200, "OK", b.data, false); + +fed_push_until_cleanup: + amduatd_strbuf_free(&b); + free(cursor_ref_hex); + amduatd_fed_until_report_free(&report); + if (!ok) { + return amduatd_send_json_error(fd, 500, "Internal Server Error", + "encode error"); + } + return true; +} static bool amduatd_handle_post_pel_programs(int fd, amduat_asl_store_t *store, const amduatd_http_req_t *req) { @@ -7077,6 +7685,17 @@ static bool amduatd_handle_conn(int fd, root_path); goto conn_cleanup; } + if (strcmp(req.method, "POST") == 0 && + strcmp(no_query, "/v1/fed/pull/until") == 0) { + ok = amduatd_handle_post_fed_pull_until(fd, + store, + fed_cfg, + caps, + effective_cfg, + &req, + root_path); + goto conn_cleanup; + } if (strcmp(req.method, "POST") == 0 && strcmp(no_query, "/v1/fed/push") == 0) { ok = amduatd_handle_post_fed_push(fd, @@ -7088,6 +7707,17 @@ static bool amduatd_handle_conn(int fd, root_path); goto conn_cleanup; } + if (strcmp(req.method, "POST") == 0 && + strcmp(no_query, "/v1/fed/push/until") == 0) { + ok = amduatd_handle_post_fed_push_until(fd, + store, + fed_cfg, + caps, + effective_cfg, + &req, + root_path); + goto conn_cleanup; + } if (strcmp(req.method, "POST") == 0 && strcmp(no_query, "/v1/fed/ingest") == 0) { ok = amduatd_handle_post_fed_ingest(fd, diff --git a/src/amduatd_fed_until.c b/src/amduatd_fed_until.c new file mode 100644 index 0000000..e234d86 --- /dev/null +++ b/src/amduatd_fed_until.c @@ -0,0 +1,236 @@ +#include "amduatd_fed_until.h" + +#include +#include + +static void amduatd_fed_until_report_clear_cursor( + amduatd_fed_until_report_t *report) { + if (report == NULL) { + return; + } + if (report->cursor_ref_set) { + amduat_reference_free(&report->cursor_ref); + report->cursor_ref_set = false; + } + report->cursor_has_logseq = false; + report->cursor_logseq = 0u; +} + +static void amduatd_fed_until_report_set_cursor( + amduatd_fed_until_report_t *report, + bool has_logseq, + uint64_t logseq, + bool has_ref, + amduat_reference_t ref) { + if (report == NULL) { + return; + } + amduatd_fed_until_report_clear_cursor(report); + if (has_logseq) { + report->cursor_has_logseq = true; + report->cursor_logseq = logseq; + } + if (has_ref) { + if (amduat_reference_clone(ref, &report->cursor_ref)) { + report->cursor_ref_set = true; + } + } +} + +static void amduatd_fed_until_report_error(amduatd_fed_until_report_t *report, + const char *msg, + int remote_status) { + if (report == NULL || msg == NULL) { + return; + } + memset(report->error, 0, sizeof(report->error)); + strncpy(report->error, msg, sizeof(report->error) - 1u); + report->remote_status = remote_status; +} + +void amduatd_fed_until_report_init(amduatd_fed_until_report_t *report) { + if (report == NULL) { + return; + } + memset(report, 0, sizeof(*report)); + report->cursor_ref = amduat_reference(0u, amduat_octets(NULL, 0u)); +} + +void amduatd_fed_until_report_free(amduatd_fed_until_report_t *report) { + if (report == NULL) { + return; + } + if (report->cursor_ref_set) { + amduat_reference_free(&report->cursor_ref); + } + memset(report, 0, sizeof(*report)); +} + +amduatd_fed_pull_apply_status_t amduatd_fed_pull_until( + amduat_asl_store_t *store, + amduat_asl_pointer_store_t *pointer_store, + const amduatd_space_t *effective_space, + const char *peer_key, + uint64_t limit, + uint64_t max_rounds, + const amduatd_fed_cfg_t *fed_cfg, + const amduatd_fed_pull_transport_t *transport, + amduatd_fed_until_report_t *out_report) { + amduatd_fed_pull_apply_status_t status = AMDUATD_FED_PULL_APPLY_OK; + amduatd_fed_pull_apply_report_t round_report; + + if (out_report == NULL) { + return AMDUATD_FED_PULL_APPLY_ERR_INVALID; + } + amduatd_fed_until_report_init(out_report); + out_report->peer_key = peer_key; + out_report->effective_space = effective_space; + out_report->limit = limit; + out_report->max_rounds = max_rounds; + + if (store == NULL || pointer_store == NULL || peer_key == NULL || + fed_cfg == NULL || transport == NULL || max_rounds == 0u) { + amduatd_fed_until_report_error(out_report, "invalid inputs", 0); + return AMDUATD_FED_PULL_APPLY_ERR_INVALID; + } + + for (uint64_t round = 0u; round < max_rounds; ++round) { + amduatd_fed_pull_apply_report_init(&round_report); + status = amduatd_fed_pull_apply(store, + pointer_store, + effective_space, + peer_key, + limit, + fed_cfg, + transport, + &round_report); + out_report->rounds_executed++; + + if (status != AMDUATD_FED_PULL_APPLY_OK) { + amduatd_fed_until_report_error(out_report, + round_report.error[0] != '\0' + ? round_report.error + : "error", + round_report.remote_status); + amduatd_fed_pull_apply_report_free(&round_report); + return status; + } + + out_report->total_records += round_report.applied_record_count; + out_report->total_artifacts += round_report.applied_artifact_count; + if (round_report.cursor_advanced) { + amduatd_fed_until_report_set_cursor( + out_report, + round_report.cursor_after_has_logseq, + round_report.cursor_after_logseq, + round_report.cursor_after_ref_set, + round_report.cursor_after_ref); + } else if (round_report.cursor_present) { + amduatd_fed_until_report_set_cursor( + out_report, + round_report.cursor_has_logseq, + round_report.cursor_logseq, + round_report.cursor_ref_set, + round_report.cursor_ref); + } else { + amduatd_fed_until_report_clear_cursor(out_report); + } + + if (round_report.plan_record_count == 0u) { + out_report->caught_up = true; + amduatd_fed_pull_apply_report_free(&round_report); + return AMDUATD_FED_PULL_APPLY_OK; + } + + amduatd_fed_pull_apply_report_free(&round_report); + } + + out_report->caught_up = false; + return AMDUATD_FED_PULL_APPLY_OK; +} + +amduatd_fed_push_apply_status_t amduatd_fed_push_until( + amduat_asl_store_t *store, + amduat_asl_pointer_store_t *pointer_store, + const amduatd_space_t *effective_space, + const char *peer_key, + uint64_t limit, + uint64_t max_rounds, + const char *root_path, + const amduatd_fed_cfg_t *fed_cfg, + const amduatd_fed_push_transport_t *transport, + amduatd_fed_until_report_t *out_report) { + amduatd_fed_push_apply_status_t status = AMDUATD_FED_PUSH_APPLY_OK; + amduatd_fed_push_apply_report_t round_report; + + if (out_report == NULL) { + return AMDUATD_FED_PUSH_APPLY_ERR_INVALID; + } + amduatd_fed_until_report_init(out_report); + out_report->peer_key = peer_key; + out_report->effective_space = effective_space; + out_report->limit = limit; + out_report->max_rounds = max_rounds; + + if (store == NULL || pointer_store == NULL || peer_key == NULL || + root_path == NULL || fed_cfg == NULL || transport == NULL || + max_rounds == 0u) { + amduatd_fed_until_report_error(out_report, "invalid inputs", 0); + return AMDUATD_FED_PUSH_APPLY_ERR_INVALID; + } + + for (uint64_t round = 0u; round < max_rounds; ++round) { + amduatd_fed_push_apply_report_init(&round_report); + status = amduatd_fed_push_apply(store, + pointer_store, + effective_space, + peer_key, + limit, + root_path, + fed_cfg, + transport, + &round_report); + out_report->rounds_executed++; + + if (status != AMDUATD_FED_PUSH_APPLY_OK) { + amduatd_fed_until_report_error(out_report, + round_report.error[0] != '\0' + ? round_report.error + : "error", + round_report.remote_status); + amduatd_fed_push_apply_report_free(&round_report); + return status; + } + + out_report->total_records += round_report.sent_record_count; + out_report->total_artifacts += round_report.sent_artifact_count; + if (round_report.cursor_advanced) { + amduatd_fed_until_report_set_cursor( + out_report, + round_report.cursor_after_has_logseq, + round_report.cursor_after_logseq, + round_report.cursor_after_ref_set, + round_report.cursor_after_ref); + } else if (round_report.cursor_present) { + amduatd_fed_until_report_set_cursor( + out_report, + round_report.cursor_has_logseq, + round_report.cursor_logseq, + round_report.cursor_ref_set, + round_report.cursor_ref); + } else { + amduatd_fed_until_report_clear_cursor(out_report); + } + + if (round_report.plan_record_count == 0u) { + out_report->caught_up = true; + amduatd_fed_push_apply_report_free(&round_report); + return AMDUATD_FED_PUSH_APPLY_OK; + } + + amduatd_fed_push_apply_report_free(&round_report); + } + + out_report->caught_up = false; + return AMDUATD_FED_PUSH_APPLY_OK; +} diff --git a/src/amduatd_fed_until.h b/src/amduatd_fed_until.h new file mode 100644 index 0000000..d1d76fc --- /dev/null +++ b/src/amduatd_fed_until.h @@ -0,0 +1,63 @@ +#ifndef AMDUATD_FED_UNTIL_H +#define AMDUATD_FED_UNTIL_H + +#include "amduatd_fed_pull_apply.h" +#include "amduatd_fed_push_apply.h" + +#include +#include +#include + +#ifdef __cplusplus +extern "C" { +#endif + +typedef struct { + const char *peer_key; + const amduatd_space_t *effective_space; + uint64_t limit; + uint64_t max_rounds; + uint64_t rounds_executed; + bool caught_up; + size_t total_records; + size_t total_artifacts; + bool cursor_has_logseq; + uint64_t cursor_logseq; + bool cursor_ref_set; + amduat_reference_t cursor_ref; + int remote_status; + char error[256]; +} amduatd_fed_until_report_t; + +void amduatd_fed_until_report_init(amduatd_fed_until_report_t *report); + +void amduatd_fed_until_report_free(amduatd_fed_until_report_t *report); + +amduatd_fed_pull_apply_status_t amduatd_fed_pull_until( + amduat_asl_store_t *store, + amduat_asl_pointer_store_t *pointer_store, + const amduatd_space_t *effective_space, + const char *peer_key, + uint64_t limit, + uint64_t max_rounds, + const amduatd_fed_cfg_t *fed_cfg, + const amduatd_fed_pull_transport_t *transport, + amduatd_fed_until_report_t *out_report); + +amduatd_fed_push_apply_status_t amduatd_fed_push_until( + amduat_asl_store_t *store, + amduat_asl_pointer_store_t *pointer_store, + const amduatd_space_t *effective_space, + const char *peer_key, + uint64_t limit, + uint64_t max_rounds, + const char *root_path, + const amduatd_fed_cfg_t *fed_cfg, + const amduatd_fed_push_transport_t *transport, + amduatd_fed_until_report_t *out_report); + +#ifdef __cplusplus +} /* extern "C" */ +#endif + +#endif /* AMDUATD_FED_UNTIL_H */ diff --git a/tests/test_amduatd_fed_pull_until.c b/tests/test_amduatd_fed_pull_until.c new file mode 100644 index 0000000..4292b6f --- /dev/null +++ b/tests/test_amduatd_fed_pull_until.c @@ -0,0 +1,512 @@ +#ifndef _POSIX_C_SOURCE +#define _POSIX_C_SOURCE 200809L +#endif + +#include "amduatd_fed_until.h" +#include "amduatd_fed_cursor.h" +#include "amduatd_store.h" + +#include "amduat/asl/artifact_io.h" +#include "amduat/asl/core.h" +#include "amduat/asl/asl_store_fs_meta.h" +#include "amduat/asl/ref_derive.h" + +#include +#include +#include +#include + +typedef struct { + amduat_fed_record_t *records; + size_t record_count; + amduat_octets_t *artifact_bytes; +} amduatd_test_pull_round_t; + +typedef struct { + amduatd_test_pull_round_t *rounds; + size_t round_count; + size_t call_index; + size_t fail_round; + int fail_status; +} amduatd_test_pull_transport_t; + +static int failures = 0; + +static void expect(bool cond, const char *msg) { + if (!cond) { + fprintf(stderr, "FAIL: %s\n", msg); + failures++; + } +} + +static char *amduatd_test_make_temp_dir(void) { + char tmpl[] = "/tmp/amduatd-fed-pull-until-XXXXXX"; + char *dir = mkdtemp(tmpl); + size_t len; + char *copy; + if (dir == NULL) { + perror("mkdtemp"); + return NULL; + } + len = strlen(dir); + copy = (char *)malloc(len + 1u); + if (copy == NULL) { + fprintf(stderr, "failed to allocate temp dir copy\n"); + return NULL; + } + memcpy(copy, dir, len + 1u); + return copy; +} + +static bool amduatd_test_make_record(amduat_asl_store_t *store, + const char *payload, + uint64_t logseq, + amduat_fed_record_t *out_record, + amduat_octets_t *out_bytes) { + amduat_artifact_t artifact; + amduat_reference_t ref; + amduat_octets_t artifact_bytes = amduat_octets(NULL, 0u); + amduat_octets_t payload_bytes = amduat_octets(NULL, 0u); + + if (store == NULL || payload == NULL || out_record == NULL || + out_bytes == NULL) { + return false; + } + + if (!amduat_octets_clone(amduat_octets(payload, strlen(payload)), + &payload_bytes)) { + return false; + } + if (!amduat_asl_artifact_from_bytes(payload_bytes, + AMDUAT_ASL_IO_RAW, + false, + amduat_type_tag(0u), + &artifact)) { + amduat_octets_free(&payload_bytes); + return false; + } + if (!amduat_asl_ref_derive(artifact, + store->config.encoding_profile_id, + store->config.hash_id, + &ref, + &artifact_bytes)) { + amduat_asl_artifact_free(&artifact); + return false; + } + amduat_asl_artifact_free(&artifact); + + memset(out_record, 0, sizeof(*out_record)); + out_record->logseq = logseq; + out_record->id.type = AMDUAT_FED_REC_ARTIFACT; + out_record->id.ref = ref; + + *out_bytes = artifact_bytes; + return true; +} + +static bool amduatd_test_pull_get_records(void *ctx, + uint32_t domain_id, + uint64_t from_logseq, + uint64_t limit, + int *out_status, + amduat_fed_record_t **out_records, + size_t *out_len, + char **out_body) { + amduatd_test_pull_transport_t *t = (amduatd_test_pull_transport_t *)ctx; + (void)domain_id; + (void)from_logseq; + (void)limit; + if (out_status == NULL || out_records == NULL || out_len == NULL) { + return false; + } + *out_status = 200; + *out_records = NULL; + *out_len = 0; + if (out_body != NULL) { + *out_body = NULL; + } + if (t == NULL) { + return false; + } + if (t->fail_round != 0u && t->call_index + 1u == t->fail_round) { + *out_status = t->fail_status; + if (out_body != NULL) { + *out_body = strdup("fail"); + } + t->call_index++; + return true; + } + if (t->call_index >= t->round_count) { + t->call_index++; + return true; + } + *out_records = t->rounds[t->call_index].records; + *out_len = t->rounds[t->call_index].record_count; + t->call_index++; + return true; +} + +static void amduatd_test_pull_free_records(void *ctx, + amduat_fed_record_t *records, + size_t len) { + (void)ctx; + (void)records; + (void)len; +} + +static bool amduatd_test_pull_get_artifact(void *ctx, + amduat_reference_t ref, + int *out_status, + amduat_octets_t *out_bytes, + char **out_body) { + amduatd_test_pull_transport_t *t = (amduatd_test_pull_transport_t *)ctx; + size_t i; + if (out_status == NULL || out_bytes == NULL) { + return false; + } + *out_status = 404; + *out_bytes = amduat_octets(NULL, 0u); + if (out_body != NULL) { + *out_body = NULL; + } + if (t == NULL) { + return false; + } + for (i = 0; i < t->round_count; ++i) { + size_t j; + for (j = 0; j < t->rounds[i].record_count; ++j) { + if (!amduat_reference_eq(ref, t->rounds[i].records[j].id.ref)) { + continue; + } + if (!amduat_octets_clone(t->rounds[i].artifact_bytes[j], out_bytes)) { + return false; + } + *out_status = 200; + return true; + } + } + return true; +} + +static void amduatd_test_pull_round_free(amduatd_test_pull_round_t *round) { + size_t i; + if (round == NULL) { + return; + } + for (i = 0; i < round->record_count; ++i) { + amduat_reference_free(&round->records[i].id.ref); + amduat_octets_free(&round->artifact_bytes[i]); + } + free(round->records); + free(round->artifact_bytes); + round->records = NULL; + round->artifact_bytes = NULL; + round->record_count = 0u; +} + +static bool amduatd_test_pull_round_init(amduat_asl_store_t *store, + amduatd_test_pull_round_t *round, + const char **payloads, + size_t payloads_len, + uint64_t base_logseq) { + size_t i; + if (store == NULL || round == NULL) { + return false; + } + memset(round, 0, sizeof(*round)); + if (payloads_len == 0u) { + return true; + } + round->records = (amduat_fed_record_t *)calloc(payloads_len, + sizeof(*round->records)); + round->artifact_bytes = (amduat_octets_t *)calloc(payloads_len, + sizeof(*round->artifact_bytes)); + if (round->records == NULL || round->artifact_bytes == NULL) { + amduatd_test_pull_round_free(round); + return false; + } + for (i = 0; i < payloads_len; ++i) { + if (!amduatd_test_make_record(store, + payloads[i], + base_logseq + i, + &round->records[i], + &round->artifact_bytes[i])) { + amduatd_test_pull_round_free(round); + return false; + } + } + round->record_count = payloads_len; + return true; +} + +static int amduatd_test_pull_until_zero(void) { + char *root = amduatd_test_make_temp_dir(); + amduat_asl_store_fs_config_t cfg; + amduatd_store_ctx_t store_ctx; + amduat_asl_store_t store; + amduat_asl_pointer_store_t pointer_store; + amduatd_space_t space; + amduatd_fed_cfg_t fed_cfg; + amduatd_test_pull_transport_t stub; + amduatd_fed_pull_transport_t transport; + amduatd_fed_until_report_t report; + amduatd_fed_pull_apply_status_t status; + + if (root == NULL) { + return 1; + } + + memset(&cfg, 0, sizeof(cfg)); + if (!amduat_asl_store_fs_init_root(root, NULL, &cfg)) { + fprintf(stderr, "failed to init store root\n"); + free(root); + return 1; + } + memset(&store_ctx, 0, sizeof(store_ctx)); + memset(&store, 0, sizeof(store)); + if (!amduatd_store_init(&store, + &cfg, + &store_ctx, + root, + AMDUATD_STORE_BACKEND_INDEX)) { + fprintf(stderr, "failed to init store\n"); + free(root); + return 1; + } + if (!amduat_asl_pointer_store_init(&pointer_store, root)) { + fprintf(stderr, "failed to init pointer store\n"); + free(root); + return 1; + } + if (!amduatd_space_init(&space, "demo", false)) { + fprintf(stderr, "failed to init space\n"); + free(root); + return 1; + } + amduatd_fed_cfg_init(&fed_cfg); + fed_cfg.enabled = true; + + memset(&stub, 0, sizeof(stub)); + memset(&transport, 0, sizeof(transport)); + transport.ctx = &stub; + transport.get_records = amduatd_test_pull_get_records; + transport.free_records = amduatd_test_pull_free_records; + transport.get_artifact = amduatd_test_pull_get_artifact; + + status = amduatd_fed_pull_until(&store, + &pointer_store, + &space, + "1", + 16u, + 3u, + &fed_cfg, + &transport, + &report); + expect(status == AMDUATD_FED_PULL_APPLY_OK, "pull until zero ok"); + expect(report.caught_up, "pull until caught up"); + expect(report.rounds_executed == 1u, "pull until rounds executed"); + expect(report.total_records == 0u, "pull until records"); + expect(report.total_artifacts == 0u, "pull until artifacts"); + + amduatd_fed_until_report_free(&report); + free(root); + return failures == 0 ? 0 : 1; +} + +static int amduatd_test_pull_until_multi(void) { + char *root = amduatd_test_make_temp_dir(); + amduat_asl_store_fs_config_t cfg; + amduatd_store_ctx_t store_ctx; + amduat_asl_store_t store; + amduat_asl_pointer_store_t pointer_store; + amduatd_space_t space; + amduatd_fed_cfg_t fed_cfg; + amduatd_test_pull_round_t rounds[2]; + amduatd_test_pull_transport_t stub; + amduatd_fed_pull_transport_t transport; + amduatd_fed_until_report_t report; + amduatd_fed_pull_apply_status_t status; + const char *round0_payloads[] = {"a", "b"}; + const char *round1_payloads[] = {"c"}; + + if (root == NULL) { + return 1; + } + + memset(&cfg, 0, sizeof(cfg)); + if (!amduat_asl_store_fs_init_root(root, NULL, &cfg)) { + fprintf(stderr, "failed to init store root\n"); + free(root); + return 1; + } + memset(&store_ctx, 0, sizeof(store_ctx)); + memset(&store, 0, sizeof(store)); + if (!amduatd_store_init(&store, + &cfg, + &store_ctx, + root, + AMDUATD_STORE_BACKEND_INDEX)) { + fprintf(stderr, "failed to init store\n"); + free(root); + return 1; + } + if (!amduat_asl_pointer_store_init(&pointer_store, root)) { + fprintf(stderr, "failed to init pointer store\n"); + free(root); + return 1; + } + if (!amduatd_space_init(&space, "demo", false)) { + fprintf(stderr, "failed to init space\n"); + free(root); + return 1; + } + amduatd_fed_cfg_init(&fed_cfg); + fed_cfg.enabled = true; + + if (!amduatd_test_pull_round_init(&store, + &rounds[0], + round0_payloads, + 2u, + 0u) || + !amduatd_test_pull_round_init(&store, + &rounds[1], + round1_payloads, + 1u, + 2u)) { + fprintf(stderr, "failed to init rounds\n"); + free(root); + return 1; + } + + memset(&stub, 0, sizeof(stub)); + stub.rounds = rounds; + stub.round_count = 2u; + memset(&transport, 0, sizeof(transport)); + transport.ctx = &stub; + transport.get_records = amduatd_test_pull_get_records; + transport.free_records = amduatd_test_pull_free_records; + transport.get_artifact = amduatd_test_pull_get_artifact; + + status = amduatd_fed_pull_until(&store, + &pointer_store, + &space, + "1", + 2u, + 5u, + &fed_cfg, + &transport, + &report); + expect(status == AMDUATD_FED_PULL_APPLY_OK, "pull until multi ok"); + expect(report.caught_up, "pull until multi caught up"); + expect(report.rounds_executed == 3u, "pull until multi rounds"); + expect(report.total_records == 3u, "pull until multi records"); + expect(report.total_artifacts == 3u, "pull until multi artifacts"); + + amduatd_fed_until_report_free(&report); + amduatd_test_pull_round_free(&rounds[0]); + amduatd_test_pull_round_free(&rounds[1]); + free(root); + return failures == 0 ? 0 : 1; +} + +static int amduatd_test_pull_until_error(void) { + char *root = amduatd_test_make_temp_dir(); + amduat_asl_store_fs_config_t cfg; + amduatd_store_ctx_t store_ctx; + amduat_asl_store_t store; + amduat_asl_pointer_store_t pointer_store; + amduatd_space_t space; + amduatd_fed_cfg_t fed_cfg; + amduatd_test_pull_round_t rounds[1]; + amduatd_test_pull_transport_t stub; + amduatd_fed_pull_transport_t transport; + amduatd_fed_until_report_t report; + amduatd_fed_pull_apply_status_t status; + const char *round0_payloads[] = {"a"}; + + if (root == NULL) { + return 1; + } + + memset(&cfg, 0, sizeof(cfg)); + if (!amduat_asl_store_fs_init_root(root, NULL, &cfg)) { + fprintf(stderr, "failed to init store root\n"); + free(root); + return 1; + } + memset(&store_ctx, 0, sizeof(store_ctx)); + memset(&store, 0, sizeof(store)); + if (!amduatd_store_init(&store, + &cfg, + &store_ctx, + root, + AMDUATD_STORE_BACKEND_INDEX)) { + fprintf(stderr, "failed to init store\n"); + free(root); + return 1; + } + if (!amduat_asl_pointer_store_init(&pointer_store, root)) { + fprintf(stderr, "failed to init pointer store\n"); + free(root); + return 1; + } + if (!amduatd_space_init(&space, "demo", false)) { + fprintf(stderr, "failed to init space\n"); + free(root); + return 1; + } + amduatd_fed_cfg_init(&fed_cfg); + fed_cfg.enabled = true; + + if (!amduatd_test_pull_round_init(&store, + &rounds[0], + round0_payloads, + 1u, + 0u)) { + fprintf(stderr, "failed to init rounds\n"); + free(root); + return 1; + } + + memset(&stub, 0, sizeof(stub)); + stub.rounds = rounds; + stub.round_count = 1u; + stub.fail_round = 2u; + stub.fail_status = 500; + memset(&transport, 0, sizeof(transport)); + transport.ctx = &stub; + transport.get_records = amduatd_test_pull_get_records; + transport.free_records = amduatd_test_pull_free_records; + transport.get_artifact = amduatd_test_pull_get_artifact; + + status = amduatd_fed_pull_until(&store, + &pointer_store, + &space, + "1", + 1u, + 4u, + &fed_cfg, + &transport, + &report); + expect(status == AMDUATD_FED_PULL_APPLY_ERR_REMOTE, "pull until error"); + expect(report.rounds_executed == 2u, "pull until error rounds"); + expect(report.total_records == 1u, "pull until error records"); + + amduatd_fed_until_report_free(&report); + amduatd_test_pull_round_free(&rounds[0]); + free(root); + return failures == 0 ? 0 : 1; +} + +int main(void) { + if (amduatd_test_pull_until_zero() != 0) { + return 1; + } + if (amduatd_test_pull_until_multi() != 0) { + return 1; + } + if (amduatd_test_pull_until_error() != 0) { + return 1; + } + return failures == 0 ? 0 : 1; +} diff --git a/tests/test_amduatd_fed_push_until.c b/tests/test_amduatd_fed_push_until.c new file mode 100644 index 0000000..34ab5b7 --- /dev/null +++ b/tests/test_amduatd_fed_push_until.c @@ -0,0 +1,441 @@ +#ifndef _POSIX_C_SOURCE +#define _POSIX_C_SOURCE 200809L +#endif + +#include "amduatd_fed_until.h" +#include "amduatd_fed_cursor.h" +#include "amduatd_space.h" +#include "amduatd_store.h" + +#include "amduat/asl/artifact_io.h" +#include "amduat/asl/asl_store_fs_meta.h" +#include "amduat/asl/log_store.h" + +#include +#include +#include +#include + +typedef struct { + size_t call_count; + size_t fail_at; + int fail_status; +} amduatd_test_push_transport_t; + +static int failures = 0; + +static void expect(bool cond, const char *msg) { + if (!cond) { + fprintf(stderr, "FAIL: %s\n", msg); + failures++; + } +} + +static char *amduatd_test_make_temp_dir(void) { + char tmpl[] = "/tmp/amduatd-fed-push-until-XXXXXX"; + char *dir = mkdtemp(tmpl); + size_t len; + char *copy; + if (dir == NULL) { + perror("mkdtemp"); + return NULL; + } + len = strlen(dir); + copy = (char *)malloc(len + 1u); + if (copy == NULL) { + fprintf(stderr, "failed to allocate temp dir copy\n"); + return NULL; + } + memcpy(copy, dir, len + 1u); + return copy; +} + +static bool amduatd_test_store_artifact(amduat_asl_store_t *store, + const char *payload, + amduat_reference_t *out_ref) { + amduat_artifact_t artifact; + amduat_octets_t payload_bytes = amduat_octets(NULL, 0u); + amduat_asl_index_state_t state; + amduat_asl_store_error_t err; + + if (store == NULL || payload == NULL || out_ref == NULL) { + return false; + } + if (!amduat_octets_clone(amduat_octets(payload, strlen(payload)), + &payload_bytes)) { + return false; + } + if (!amduat_asl_artifact_from_bytes(payload_bytes, + AMDUAT_ASL_IO_RAW, + false, + amduat_type_tag(0u), + &artifact)) { + amduat_octets_free(&payload_bytes); + return false; + } + err = amduat_asl_store_put_indexed(store, artifact, out_ref, &state); + amduat_asl_artifact_free(&artifact); + return err == AMDUAT_ASL_STORE_OK; +} + +static bool amduatd_test_append_fed_log(amduat_asl_store_t *store, + amduat_asl_pointer_store_t *pointer_store, + const amduatd_space_t *space, + const char *root_path, + amduat_reference_t ref) { + amduat_asl_log_store_t log_store; + amduat_octets_t log_name = amduat_octets(NULL, 0u); + amduat_asl_log_entry_t entry; + uint64_t offset = 0u; + amduat_asl_store_error_t err; + + if (!amduat_asl_log_store_init(&log_store, + root_path, + store, + pointer_store)) { + return false; + } + if (!amduatd_space_scope_name(space, "fed/records", &log_name)) { + return false; + } + memset(&entry, 0, sizeof(entry)); + entry.kind = AMDUATD_FED_LOG_KIND_ARTIFACT; + entry.has_timestamp = false; + entry.timestamp = 0u; + entry.payload_ref = ref; + entry.has_actor = false; + entry.actor = amduat_octets(NULL, 0u); + err = amduat_asl_log_append(&log_store, + (const char *)log_name.data, + &entry, + 1u, + &offset); + amduat_octets_free(&log_name); + return err == AMDUAT_ASL_STORE_OK; +} + +static bool amduatd_test_push_post_ingest(void *ctx, + amduat_fed_record_type_t record_type, + amduat_reference_t ref, + amduat_octets_t bytes, + int *out_status, + char **out_body) { + amduatd_test_push_transport_t *t = (amduatd_test_push_transport_t *)ctx; + const char *status = "ok"; + const char *applied = "true"; + char buf[256]; + int n; + + (void)record_type; + (void)ref; + (void)bytes; + + if (out_status == NULL || out_body == NULL || t == NULL) { + return false; + } + t->call_count++; + if (t->fail_at != 0u && t->call_count == t->fail_at) { + *out_status = t->fail_status; + *out_body = strdup("{\"status\":\"error\"}"); + return *out_body != NULL; + } + n = snprintf(buf, + sizeof(buf), + "{\"status\":\"%s\",\"applied\":%s," + "\"ref\":null,\"effective_space\":{" + "\"mode\":\"unscoped\",\"space_id\":null}}", + status, + applied); + if (n <= 0 || (size_t)n >= sizeof(buf)) { + return false; + } + *out_status = 200; + *out_body = strdup(buf); + return *out_body != NULL; +} + +static int amduatd_test_push_until_zero(void) { + char *root = amduatd_test_make_temp_dir(); + amduat_asl_store_fs_config_t cfg; + amduatd_store_ctx_t store_ctx; + amduat_asl_store_t store; + amduat_asl_pointer_store_t pointer_store; + amduatd_space_t space; + amduatd_fed_cfg_t fed_cfg; + amduatd_test_push_transport_t stub; + amduatd_fed_push_transport_t transport; + amduatd_fed_until_report_t report; + amduatd_fed_push_apply_status_t status; + + if (root == NULL) { + return 1; + } + + memset(&cfg, 0, sizeof(cfg)); + if (!amduat_asl_store_fs_init_root(root, NULL, &cfg)) { + fprintf(stderr, "failed to init store root\n"); + free(root); + return 1; + } + memset(&store_ctx, 0, sizeof(store_ctx)); + memset(&store, 0, sizeof(store)); + if (!amduatd_store_init(&store, + &cfg, + &store_ctx, + root, + AMDUATD_STORE_BACKEND_INDEX)) { + fprintf(stderr, "failed to init store\n"); + free(root); + return 1; + } + if (!amduat_asl_pointer_store_init(&pointer_store, root)) { + fprintf(stderr, "failed to init pointer store\n"); + free(root); + return 1; + } + if (!amduatd_space_init(&space, "demo", false)) { + fprintf(stderr, "failed to init space\n"); + free(root); + return 1; + } + amduatd_fed_cfg_init(&fed_cfg); + fed_cfg.enabled = true; + + memset(&stub, 0, sizeof(stub)); + memset(&transport, 0, sizeof(transport)); + transport.ctx = &stub; + transport.post_ingest = amduatd_test_push_post_ingest; + + status = amduatd_fed_push_until(&store, + &pointer_store, + &space, + "2", + 8u, + 3u, + root, + &fed_cfg, + &transport, + &report); + expect(status == AMDUATD_FED_PUSH_APPLY_OK, "push until zero ok"); + expect(report.caught_up, "push until caught up"); + expect(report.rounds_executed == 1u, "push until rounds executed"); + expect(report.total_records == 0u, "push until records"); + expect(report.total_artifacts == 0u, "push until artifacts"); + + amduatd_fed_until_report_free(&report); + free(root); + return failures == 0 ? 0 : 1; +} + +static int amduatd_test_push_until_multi(void) { + char *root = amduatd_test_make_temp_dir(); + amduat_asl_store_fs_config_t cfg; + amduatd_store_ctx_t store_ctx; + amduat_asl_store_t store; + amduat_asl_pointer_store_t pointer_store; + amduatd_space_t space; + amduatd_fed_cfg_t fed_cfg; + amduat_reference_t ref0; + amduat_reference_t ref1; + amduatd_test_push_transport_t stub; + amduatd_fed_push_transport_t transport; + amduatd_fed_until_report_t report; + amduatd_fed_push_apply_status_t status; + + if (root == NULL) { + return 1; + } + + memset(&cfg, 0, sizeof(cfg)); + if (!amduat_asl_store_fs_init_root(root, NULL, &cfg)) { + fprintf(stderr, "failed to init store root\n"); + free(root); + return 1; + } + memset(&store_ctx, 0, sizeof(store_ctx)); + memset(&store, 0, sizeof(store)); + if (!amduatd_store_init(&store, + &cfg, + &store_ctx, + root, + AMDUATD_STORE_BACKEND_INDEX)) { + fprintf(stderr, "failed to init store\n"); + free(root); + return 1; + } + if (!amduat_asl_pointer_store_init(&pointer_store, root)) { + fprintf(stderr, "failed to init pointer store\n"); + free(root); + return 1; + } + if (!amduatd_space_init(&space, "demo", false)) { + fprintf(stderr, "failed to init space\n"); + free(root); + return 1; + } + amduatd_fed_cfg_init(&fed_cfg); + fed_cfg.enabled = true; + + if (!amduatd_test_store_artifact(&store, "alpha", &ref0) || + !amduatd_test_store_artifact(&store, "beta", &ref1)) { + fprintf(stderr, "failed to store artifacts\n"); + free(root); + return 1; + } + if (!amduatd_test_append_fed_log(&store, + &pointer_store, + &space, + root, + ref0) || + !amduatd_test_append_fed_log(&store, + &pointer_store, + &space, + root, + ref1)) { + fprintf(stderr, "failed to append fed log\n"); + amduat_reference_free(&ref0); + amduat_reference_free(&ref1); + free(root); + return 1; + } + + memset(&stub, 0, sizeof(stub)); + memset(&transport, 0, sizeof(transport)); + transport.ctx = &stub; + transport.post_ingest = amduatd_test_push_post_ingest; + + status = amduatd_fed_push_until(&store, + &pointer_store, + &space, + "2", + 1u, + 5u, + root, + &fed_cfg, + &transport, + &report); + expect(status == AMDUATD_FED_PUSH_APPLY_OK, "push until multi ok"); + expect(report.caught_up, "push until multi caught up"); + expect(report.rounds_executed == 3u, "push until multi rounds"); + expect(report.total_records == 2u, "push until multi records"); + expect(report.total_artifacts == 2u, "push until multi artifacts"); + + amduat_reference_free(&ref0); + amduat_reference_free(&ref1); + amduatd_fed_until_report_free(&report); + free(root); + return failures == 0 ? 0 : 1; +} + +static int amduatd_test_push_until_error(void) { + char *root = amduatd_test_make_temp_dir(); + amduat_asl_store_fs_config_t cfg; + amduatd_store_ctx_t store_ctx; + amduat_asl_store_t store; + amduat_asl_pointer_store_t pointer_store; + amduatd_space_t space; + amduatd_fed_cfg_t fed_cfg; + amduat_reference_t ref0; + amduat_reference_t ref1; + amduatd_test_push_transport_t stub; + amduatd_fed_push_transport_t transport; + amduatd_fed_until_report_t report; + amduatd_fed_push_apply_status_t status; + + if (root == NULL) { + return 1; + } + + memset(&cfg, 0, sizeof(cfg)); + if (!amduat_asl_store_fs_init_root(root, NULL, &cfg)) { + fprintf(stderr, "failed to init store root\n"); + free(root); + return 1; + } + memset(&store_ctx, 0, sizeof(store_ctx)); + memset(&store, 0, sizeof(store)); + if (!amduatd_store_init(&store, + &cfg, + &store_ctx, + root, + AMDUATD_STORE_BACKEND_INDEX)) { + fprintf(stderr, "failed to init store\n"); + free(root); + return 1; + } + if (!amduat_asl_pointer_store_init(&pointer_store, root)) { + fprintf(stderr, "failed to init pointer store\n"); + free(root); + return 1; + } + if (!amduatd_space_init(&space, "demo", false)) { + fprintf(stderr, "failed to init space\n"); + free(root); + return 1; + } + amduatd_fed_cfg_init(&fed_cfg); + fed_cfg.enabled = true; + + if (!amduatd_test_store_artifact(&store, "alpha", &ref0) || + !amduatd_test_store_artifact(&store, "beta", &ref1)) { + fprintf(stderr, "failed to store artifacts\n"); + free(root); + return 1; + } + if (!amduatd_test_append_fed_log(&store, + &pointer_store, + &space, + root, + ref0) || + !amduatd_test_append_fed_log(&store, + &pointer_store, + &space, + root, + ref1)) { + fprintf(stderr, "failed to append fed log\n"); + amduat_reference_free(&ref0); + amduat_reference_free(&ref1); + free(root); + return 1; + } + + memset(&stub, 0, sizeof(stub)); + stub.fail_at = 2u; + stub.fail_status = 500; + memset(&transport, 0, sizeof(transport)); + transport.ctx = &stub; + transport.post_ingest = amduatd_test_push_post_ingest; + + status = amduatd_fed_push_until(&store, + &pointer_store, + &space, + "2", + 1u, + 4u, + root, + &fed_cfg, + &transport, + &report); + expect(status == AMDUATD_FED_PUSH_APPLY_ERR_REMOTE, "push until error"); + expect(report.rounds_executed == 2u, "push until error rounds"); + expect(report.total_records == 1u, "push until error records"); + + amduat_reference_free(&ref0); + amduat_reference_free(&ref1); + amduatd_fed_until_report_free(&report); + free(root); + return failures == 0 ? 0 : 1; +} + +int main(void) { + if (amduatd_test_push_until_zero() != 0) { + return 1; + } + if (amduatd_test_push_until_multi() != 0) { + return 1; + } + if (amduatd_test_push_until_error() != 0) { + return 1; + } + return failures == 0 ? 0 : 1; +}