#ifndef _POSIX_C_SOURCE #define _POSIX_C_SOURCE 200809L #endif #include "amduatd_fed_until.h" #include "amduatd_fed_cursor.h" #include "amduatd_store.h" #include "amduat/asl/artifact_io.h" #include "amduat/asl/core.h" #include "amduat/asl/asl_store_fs_meta.h" #include "amduat/asl/ref_derive.h" #include #include #include #include typedef struct { amduat_fed_record_t *records; size_t record_count; amduat_octets_t *artifact_bytes; } amduatd_test_pull_round_t; typedef struct { amduatd_test_pull_round_t *rounds; size_t round_count; size_t call_index; size_t fail_round; int fail_status; } amduatd_test_pull_transport_t; static int failures = 0; static void expect(bool cond, const char *msg) { if (!cond) { fprintf(stderr, "FAIL: %s\n", msg); failures++; } } static char *amduatd_test_make_temp_dir(void) { char tmpl[] = "/tmp/amduatd-fed-pull-until-XXXXXX"; char *dir = mkdtemp(tmpl); size_t len; char *copy; if (dir == NULL) { perror("mkdtemp"); return NULL; } len = strlen(dir); copy = (char *)malloc(len + 1u); if (copy == NULL) { fprintf(stderr, "failed to allocate temp dir copy\n"); return NULL; } memcpy(copy, dir, len + 1u); return copy; } static bool amduatd_test_make_record(amduat_asl_store_t *store, const char *payload, uint64_t logseq, amduat_fed_record_t *out_record, amduat_octets_t *out_bytes) { amduat_artifact_t artifact; amduat_reference_t ref; amduat_octets_t artifact_bytes = amduat_octets(NULL, 0u); amduat_octets_t payload_bytes = amduat_octets(NULL, 0u); if (store == NULL || payload == NULL || out_record == NULL || out_bytes == NULL) { return false; } if (!amduat_octets_clone(amduat_octets(payload, strlen(payload)), &payload_bytes)) { return false; } if (!amduat_asl_artifact_from_bytes(payload_bytes, AMDUAT_ASL_IO_RAW, false, amduat_type_tag(0u), &artifact)) { amduat_octets_free(&payload_bytes); return false; } if (!amduat_asl_ref_derive(artifact, store->config.encoding_profile_id, store->config.hash_id, &ref, &artifact_bytes)) { amduat_asl_artifact_free(&artifact); return false; } amduat_asl_artifact_free(&artifact); memset(out_record, 0, sizeof(*out_record)); out_record->logseq = logseq; out_record->id.type = AMDUAT_FED_REC_ARTIFACT; out_record->id.ref = ref; *out_bytes = artifact_bytes; return true; } static bool amduatd_test_pull_get_records(void *ctx, uint32_t domain_id, uint64_t from_logseq, uint64_t limit, int *out_status, amduat_fed_record_t **out_records, size_t *out_len, char **out_body) { amduatd_test_pull_transport_t *t = (amduatd_test_pull_transport_t *)ctx; (void)domain_id; (void)from_logseq; (void)limit; if (out_status == NULL || out_records == NULL || out_len == NULL) { return false; } *out_status = 200; *out_records = NULL; *out_len = 0; if (out_body != NULL) { *out_body = NULL; } if (t == NULL) { return false; } if (t->fail_round != 0u && t->call_index + 1u == t->fail_round) { *out_status = t->fail_status; if (out_body != NULL) { *out_body = strdup("fail"); } t->call_index++; return true; } if (t->call_index >= t->round_count) { t->call_index++; return true; } *out_records = t->rounds[t->call_index].records; *out_len = t->rounds[t->call_index].record_count; t->call_index++; return true; } static void amduatd_test_pull_free_records(void *ctx, amduat_fed_record_t *records, size_t len) { (void)ctx; (void)records; (void)len; } static bool amduatd_test_pull_get_artifact(void *ctx, amduat_reference_t ref, int *out_status, amduat_octets_t *out_bytes, char **out_body) { amduatd_test_pull_transport_t *t = (amduatd_test_pull_transport_t *)ctx; size_t i; if (out_status == NULL || out_bytes == NULL) { return false; } *out_status = 404; *out_bytes = amduat_octets(NULL, 0u); if (out_body != NULL) { *out_body = NULL; } if (t == NULL) { return false; } for (i = 0; i < t->round_count; ++i) { size_t j; for (j = 0; j < t->rounds[i].record_count; ++j) { if (!amduat_reference_eq(ref, t->rounds[i].records[j].id.ref)) { continue; } if (!amduat_octets_clone(t->rounds[i].artifact_bytes[j], out_bytes)) { return false; } *out_status = 200; return true; } } return true; } static void amduatd_test_pull_round_free(amduatd_test_pull_round_t *round) { size_t i; if (round == NULL) { return; } for (i = 0; i < round->record_count; ++i) { amduat_reference_free(&round->records[i].id.ref); amduat_octets_free(&round->artifact_bytes[i]); } free(round->records); free(round->artifact_bytes); round->records = NULL; round->artifact_bytes = NULL; round->record_count = 0u; } static bool amduatd_test_pull_round_init(amduat_asl_store_t *store, amduatd_test_pull_round_t *round, const char **payloads, size_t payloads_len, uint64_t base_logseq) { size_t i; if (store == NULL || round == NULL) { return false; } memset(round, 0, sizeof(*round)); if (payloads_len == 0u) { return true; } round->records = (amduat_fed_record_t *)calloc(payloads_len, sizeof(*round->records)); round->artifact_bytes = (amduat_octets_t *)calloc(payloads_len, sizeof(*round->artifact_bytes)); if (round->records == NULL || round->artifact_bytes == NULL) { amduatd_test_pull_round_free(round); return false; } for (i = 0; i < payloads_len; ++i) { if (!amduatd_test_make_record(store, payloads[i], base_logseq + i, &round->records[i], &round->artifact_bytes[i])) { amduatd_test_pull_round_free(round); return false; } } round->record_count = payloads_len; return true; } static int amduatd_test_pull_until_zero(void) { char *root = amduatd_test_make_temp_dir(); amduat_asl_store_fs_config_t cfg; amduatd_store_ctx_t store_ctx; amduat_asl_store_t store; amduat_asl_pointer_store_t pointer_store; amduatd_space_t space; amduatd_fed_cfg_t fed_cfg; amduatd_test_pull_transport_t stub; amduatd_fed_pull_transport_t transport; amduatd_fed_until_report_t report; amduatd_fed_pull_apply_status_t status; if (root == NULL) { return 1; } memset(&cfg, 0, sizeof(cfg)); if (!amduat_asl_store_fs_init_root(root, NULL, &cfg)) { fprintf(stderr, "failed to init store root\n"); free(root); return 1; } memset(&store_ctx, 0, sizeof(store_ctx)); memset(&store, 0, sizeof(store)); if (!amduatd_store_init(&store, &cfg, &store_ctx, root, AMDUATD_STORE_BACKEND_INDEX)) { fprintf(stderr, "failed to init store\n"); free(root); return 1; } if (!amduat_asl_pointer_store_init(&pointer_store, root)) { fprintf(stderr, "failed to init pointer store\n"); free(root); return 1; } if (!amduatd_space_init(&space, "demo", false)) { fprintf(stderr, "failed to init space\n"); free(root); return 1; } amduatd_fed_cfg_init(&fed_cfg); fed_cfg.enabled = true; memset(&stub, 0, sizeof(stub)); memset(&transport, 0, sizeof(transport)); transport.ctx = &stub; transport.get_records = amduatd_test_pull_get_records; transport.free_records = amduatd_test_pull_free_records; transport.get_artifact = amduatd_test_pull_get_artifact; status = amduatd_fed_pull_until(&store, &pointer_store, &space, "1", NULL, 16u, 3u, &fed_cfg, &transport, &report); expect(status == AMDUATD_FED_PULL_APPLY_OK, "pull until zero ok"); expect(report.caught_up, "pull until caught up"); expect(report.rounds_executed == 1u, "pull until rounds executed"); expect(report.total_records == 0u, "pull until records"); expect(report.total_artifacts == 0u, "pull until artifacts"); amduatd_fed_until_report_free(&report); free(root); return failures == 0 ? 0 : 1; } static int amduatd_test_pull_until_multi(void) { char *root = amduatd_test_make_temp_dir(); amduat_asl_store_fs_config_t cfg; amduatd_store_ctx_t store_ctx; amduat_asl_store_t store; amduat_asl_pointer_store_t pointer_store; amduatd_space_t space; amduatd_fed_cfg_t fed_cfg; amduatd_test_pull_round_t rounds[2]; amduatd_test_pull_transport_t stub; amduatd_fed_pull_transport_t transport; amduatd_fed_until_report_t report; amduatd_fed_pull_apply_status_t status; const char *round0_payloads[] = {"a", "b"}; const char *round1_payloads[] = {"c"}; if (root == NULL) { return 1; } memset(&cfg, 0, sizeof(cfg)); if (!amduat_asl_store_fs_init_root(root, NULL, &cfg)) { fprintf(stderr, "failed to init store root\n"); free(root); return 1; } memset(&store_ctx, 0, sizeof(store_ctx)); memset(&store, 0, sizeof(store)); if (!amduatd_store_init(&store, &cfg, &store_ctx, root, AMDUATD_STORE_BACKEND_INDEX)) { fprintf(stderr, "failed to init store\n"); free(root); return 1; } if (!amduat_asl_pointer_store_init(&pointer_store, root)) { fprintf(stderr, "failed to init pointer store\n"); free(root); return 1; } if (!amduatd_space_init(&space, "demo", false)) { fprintf(stderr, "failed to init space\n"); free(root); return 1; } amduatd_fed_cfg_init(&fed_cfg); fed_cfg.enabled = true; if (!amduatd_test_pull_round_init(&store, &rounds[0], round0_payloads, 2u, 0u) || !amduatd_test_pull_round_init(&store, &rounds[1], round1_payloads, 1u, 2u)) { fprintf(stderr, "failed to init rounds\n"); free(root); return 1; } memset(&stub, 0, sizeof(stub)); stub.rounds = rounds; stub.round_count = 2u; memset(&transport, 0, sizeof(transport)); transport.ctx = &stub; transport.get_records = amduatd_test_pull_get_records; transport.free_records = amduatd_test_pull_free_records; transport.get_artifact = amduatd_test_pull_get_artifact; status = amduatd_fed_pull_until(&store, &pointer_store, &space, "1", NULL, 2u, 5u, &fed_cfg, &transport, &report); expect(status == AMDUATD_FED_PULL_APPLY_OK, "pull until multi ok"); expect(report.caught_up, "pull until multi caught up"); expect(report.rounds_executed == 3u, "pull until multi rounds"); expect(report.total_records == 3u, "pull until multi records"); expect(report.total_artifacts == 3u, "pull until multi artifacts"); amduatd_fed_until_report_free(&report); amduatd_test_pull_round_free(&rounds[0]); amduatd_test_pull_round_free(&rounds[1]); free(root); return failures == 0 ? 0 : 1; } static int amduatd_test_pull_until_error(void) { char *root = amduatd_test_make_temp_dir(); amduat_asl_store_fs_config_t cfg; amduatd_store_ctx_t store_ctx; amduat_asl_store_t store; amduat_asl_pointer_store_t pointer_store; amduatd_space_t space; amduatd_fed_cfg_t fed_cfg; amduatd_test_pull_round_t rounds[1]; amduatd_test_pull_transport_t stub; amduatd_fed_pull_transport_t transport; amduatd_fed_until_report_t report; amduatd_fed_pull_apply_status_t status; const char *round0_payloads[] = {"a"}; if (root == NULL) { return 1; } memset(&cfg, 0, sizeof(cfg)); if (!amduat_asl_store_fs_init_root(root, NULL, &cfg)) { fprintf(stderr, "failed to init store root\n"); free(root); return 1; } memset(&store_ctx, 0, sizeof(store_ctx)); memset(&store, 0, sizeof(store)); if (!amduatd_store_init(&store, &cfg, &store_ctx, root, AMDUATD_STORE_BACKEND_INDEX)) { fprintf(stderr, "failed to init store\n"); free(root); return 1; } if (!amduat_asl_pointer_store_init(&pointer_store, root)) { fprintf(stderr, "failed to init pointer store\n"); free(root); return 1; } if (!amduatd_space_init(&space, "demo", false)) { fprintf(stderr, "failed to init space\n"); free(root); return 1; } amduatd_fed_cfg_init(&fed_cfg); fed_cfg.enabled = true; if (!amduatd_test_pull_round_init(&store, &rounds[0], round0_payloads, 1u, 0u)) { fprintf(stderr, "failed to init rounds\n"); free(root); return 1; } memset(&stub, 0, sizeof(stub)); stub.rounds = rounds; stub.round_count = 1u; stub.fail_round = 2u; stub.fail_status = 500; memset(&transport, 0, sizeof(transport)); transport.ctx = &stub; transport.get_records = amduatd_test_pull_get_records; transport.free_records = amduatd_test_pull_free_records; transport.get_artifact = amduatd_test_pull_get_artifact; status = amduatd_fed_pull_until(&store, &pointer_store, &space, "1", NULL, 1u, 4u, &fed_cfg, &transport, &report); expect(status == AMDUATD_FED_PULL_APPLY_ERR_REMOTE, "pull until error"); expect(report.rounds_executed == 2u, "pull until error rounds"); expect(report.total_records == 1u, "pull until error records"); amduatd_fed_until_report_free(&report); amduatd_test_pull_round_free(&rounds[0]); free(root); return failures == 0 ? 0 : 1; } int main(void) { if (amduatd_test_pull_until_zero() != 0) { return 1; } if (amduatd_test_pull_until_multi() != 0) { return 1; } if (amduatd_test_pull_until_error() != 0) { return 1; } return failures == 0 ? 0 : 1; }