#ifndef _POSIX_C_SOURCE #define _POSIX_C_SOURCE 200809L #endif #include "amduatd_fed_pull_apply.h" #include "amduatd_fed_cursor.h" #include "amduatd_store.h" #include "amduat/asl/artifact_io.h" #include "amduat/asl/asl_store_fs_meta.h" #include "amduat/asl/ref_derive.h" #include "amduat/hash/asl1.h" #include #include #include #include typedef struct { amduat_fed_record_t *records; size_t record_count; amduat_octets_t *artifact_bytes; bool fail_artifact; size_t fail_index; int fail_status; bool mutate_cursor; amduat_asl_store_t *store; amduat_asl_pointer_store_t *pointer_store; const amduatd_space_t *space; const char *peer_key; } amduatd_test_pull_transport_t; static int failures = 0; static void expect(bool cond, const char *msg) { if (!cond) { fprintf(stderr, "FAIL: %s\n", msg); failures++; } } static char *amduatd_test_make_temp_dir(void) { char tmpl[] = "/tmp/amduatd-fed-pull-XXXXXX"; char *dir = mkdtemp(tmpl); size_t len; char *copy; if (dir == NULL) { perror("mkdtemp"); return NULL; } len = strlen(dir); copy = (char *)malloc(len + 1u); if (copy == NULL) { fprintf(stderr, "failed to allocate temp dir copy\n"); return NULL; } memcpy(copy, dir, len + 1u); return copy; } static bool amduatd_test_clone_record(const amduat_fed_record_t *src, amduat_fed_record_t *dst) { if (src == NULL || dst == NULL) { return false; } *dst = *src; if (!amduat_reference_clone(src->id.ref, &dst->id.ref)) { return false; } return true; } static bool amduatd_test_pull_get_records(void *ctx, uint32_t domain_id, uint64_t from_logseq, uint64_t limit, int *out_status, amduat_fed_record_t **out_records, size_t *out_len, char **out_body) { amduatd_test_pull_transport_t *t = (amduatd_test_pull_transport_t *)ctx; amduat_fed_record_t *records = NULL; size_t i; (void)domain_id; (void)from_logseq; (void)limit; if (out_status == NULL || out_records == NULL || out_len == NULL) { return false; } *out_status = 200; *out_records = NULL; *out_len = 0; if (out_body != NULL) { *out_body = NULL; } if (t == NULL || t->record_count == 0u) { return true; } records = (amduat_fed_record_t *)calloc(t->record_count, sizeof(*records)); if (records == NULL) { return false; } for (i = 0; i < t->record_count; ++i) { if (!amduatd_test_clone_record(&t->records[i], &records[i])) { free(records); return false; } } *out_records = records; *out_len = t->record_count; if (t->mutate_cursor && t->store != NULL && t->pointer_store != NULL && t->space != NULL && t->peer_key != NULL) { amduatd_fed_cursor_record_t cursor; amduat_reference_t cursor_ref; amduatd_fed_cursor_record_t current; amduat_reference_t current_ref; const amduat_reference_t *expected_ref = NULL; amduatd_fed_cursor_record_init(&cursor); cursor.peer_key = strdup(t->peer_key); cursor.space_id = strdup((const char *)t->space->space_id.data); cursor.has_logseq = true; cursor.last_logseq = 99u; cursor.has_record_ref = true; if (cursor.peer_key != NULL && cursor.space_id != NULL && amduat_reference_clone(t->records[0].id.ref, &cursor.last_record_ref)) { amduatd_fed_cursor_record_init(¤t); memset(¤t_ref, 0, sizeof(current_ref)); if (amduatd_fed_cursor_get(t->store, t->pointer_store, t->space, t->peer_key, ¤t, ¤t_ref) == AMDUATD_FED_CURSOR_OK) { expected_ref = ¤t_ref; } (void)amduatd_fed_cursor_cas_set(t->store, t->pointer_store, t->space, t->peer_key, expected_ref, &cursor, &cursor_ref); if (expected_ref != NULL) { amduat_reference_free(¤t_ref); } amduatd_fed_cursor_record_free(¤t); amduat_reference_free(&cursor_ref); } amduatd_fed_cursor_record_free(&cursor); } return true; } static void amduatd_test_pull_free_records(void *ctx, amduat_fed_record_t *records, size_t len) { size_t i; (void)ctx; if (records == NULL) { return; } for (i = 0; i < len; ++i) { amduat_reference_free(&records[i].id.ref); } free(records); } static bool amduatd_test_pull_get_artifact(void *ctx, amduat_reference_t ref, int *out_status, amduat_octets_t *out_bytes, char **out_body) { amduatd_test_pull_transport_t *t = (amduatd_test_pull_transport_t *)ctx; size_t i; if (out_status == NULL || out_bytes == NULL) { return false; } *out_status = 404; *out_bytes = amduat_octets(NULL, 0u); if (out_body != NULL) { *out_body = NULL; } if (t == NULL) { return false; } for (i = 0; i < t->record_count; ++i) { if (!amduat_reference_eq(ref, t->records[i].id.ref)) { continue; } if (t->fail_artifact && i == t->fail_index) { *out_status = t->fail_status; return true; } if (!amduat_octets_clone(t->artifact_bytes[i], out_bytes)) { return false; } *out_status = 200; return true; } return true; } static bool amduatd_test_make_record(amduat_asl_store_t *store, const char *payload, uint64_t logseq, uint32_t domain_id, amduat_fed_record_t *out_record, amduat_octets_t *out_bytes) { amduat_artifact_t artifact; amduat_reference_t ref; amduat_octets_t artifact_bytes = amduat_octets(NULL, 0u); amduat_octets_t payload_bytes = amduat_octets(NULL, 0u); if (store == NULL || payload == NULL || out_record == NULL || out_bytes == NULL) { return false; } if (!amduat_octets_clone(amduat_octets(payload, strlen(payload)), &payload_bytes)) { return false; } if (!amduat_asl_artifact_from_bytes(payload_bytes, AMDUAT_ASL_IO_RAW, false, amduat_type_tag(0u), &artifact)) { amduat_octets_free(&payload_bytes); return false; } if (!amduat_asl_ref_derive(artifact, store->config.encoding_profile_id, store->config.hash_id, &ref, &artifact_bytes)) { amduat_asl_artifact_free(&artifact); return false; } amduat_asl_artifact_free(&artifact); amduat_octets_free(&artifact_bytes); memset(out_record, 0, sizeof(*out_record)); out_record->id.type = AMDUAT_FED_REC_ARTIFACT; out_record->id.ref = ref; out_record->logseq = logseq; out_record->snapshot_id = 0u; out_record->log_prefix = 0u; out_record->meta.domain_id = domain_id; out_record->meta.visibility = 1u; out_record->meta.has_source = 0u; out_record->meta.source_domain = 0u; if (!amduat_octets_clone(amduat_octets(payload, strlen(payload)), out_bytes)) { amduat_reference_free(&ref); return false; } return true; } static void amduatd_test_free_transport(amduatd_test_pull_transport_t *t) { size_t i; if (t == NULL) { return; } if (t->records != NULL) { for (i = 0; i < t->record_count; ++i) { amduat_reference_free(&t->records[i].id.ref); } free(t->records); } if (t->artifact_bytes != NULL) { for (i = 0; i < t->record_count; ++i) { amduat_octets_free(&t->artifact_bytes[i]); } free(t->artifact_bytes); } memset(t, 0, sizeof(*t)); } int main(void) { char *root = amduatd_test_make_temp_dir(); amduat_asl_store_fs_config_t cfg; amduatd_store_ctx_t store_ctx; amduat_asl_store_t store; amduat_asl_pointer_store_t pointer_store; amduatd_space_t space; amduatd_fed_cfg_t fed_cfg; if (root == NULL) { return 1; } memset(&cfg, 0, sizeof(cfg)); if (!amduat_asl_store_fs_init_root(root, NULL, &cfg)) { fprintf(stderr, "failed to init store root\n"); free(root); return 1; } memset(&store_ctx, 0, sizeof(store_ctx)); memset(&store, 0, sizeof(store)); if (!amduatd_store_init(&store, &cfg, &store_ctx, root, AMDUATD_STORE_BACKEND_INDEX)) { fprintf(stderr, "failed to init store\n"); free(root); return 1; } if (!amduat_asl_pointer_store_init(&pointer_store, root)) { fprintf(stderr, "failed to init pointer store\n"); free(root); return 1; } if (!amduatd_space_init(&space, "alpha", false)) { fprintf(stderr, "failed to init space\n"); free(root); return 1; } amduatd_fed_cfg_init(&fed_cfg); fed_cfg.enabled = true; { amduatd_test_pull_transport_t t; amduatd_fed_pull_transport_t transport; amduat_fed_record_t *records = NULL; amduat_octets_t *bytes = NULL; amduatd_fed_pull_apply_report_t report; amduatd_fed_pull_apply_status_t rc; amduat_reference_t cursor_ref; amduatd_fed_cursor_record_t cursor; size_t i; memset(&t, 0, sizeof(t)); records = (amduat_fed_record_t *)calloc(2u, sizeof(*records)); bytes = (amduat_octets_t *)calloc(2u, sizeof(*bytes)); if (records == NULL || bytes == NULL) { fprintf(stderr, "failed to alloc records\n"); free(root); return 1; } if (!amduatd_test_make_record(&store, "hello", 1u, 1u, &records[0], &bytes[0]) || !amduatd_test_make_record(&store, "world", 2u, 1u, &records[1], &bytes[1])) { fprintf(stderr, "failed to make records\n"); free(root); return 1; } t.records = records; t.record_count = 2u; t.artifact_bytes = bytes; memset(&transport, 0, sizeof(transport)); transport.ctx = &t; transport.get_records = amduatd_test_pull_get_records; transport.free_records = amduatd_test_pull_free_records; transport.get_artifact = amduatd_test_pull_get_artifact; amduatd_fed_pull_apply_report_init(&report); rc = amduatd_fed_pull_apply(&store, &pointer_store, &space, "1", NULL, 2u, &fed_cfg, &transport, &report); expect(rc == AMDUATD_FED_PULL_APPLY_OK, "apply success"); expect(report.applied_record_count == 2u, "applied record count"); expect(report.cursor_advanced, "cursor advanced"); for (i = 0; i < 2u; ++i) { amduat_artifact_t artifact; memset(&artifact, 0, sizeof(artifact)); expect(amduat_asl_store_get(&store, records[i].id.ref, &artifact) == AMDUAT_ASL_STORE_OK, "artifact stored"); amduat_asl_artifact_free(&artifact); } amduatd_fed_cursor_record_init(&cursor); memset(&cursor_ref, 0, sizeof(cursor_ref)); expect(amduatd_fed_cursor_get(&store, &pointer_store, &space, "1", &cursor, &cursor_ref) == AMDUATD_FED_CURSOR_OK, "cursor get after apply"); expect(cursor.has_logseq && cursor.last_logseq == 2u, "cursor advanced logseq"); amduat_reference_free(&cursor_ref); amduatd_fed_cursor_record_free(&cursor); amduatd_fed_pull_apply_report_free(&report); amduatd_test_free_transport(&t); } { amduatd_test_pull_transport_t t; amduatd_fed_pull_transport_t transport; amduat_fed_record_t *records = NULL; amduat_octets_t *bytes = NULL; amduatd_fed_pull_apply_report_t report; amduatd_fed_pull_apply_status_t rc; amduatd_fed_cursor_record_t cursor; amduat_reference_t cursor_ref; uint64_t before_logseq = 0u; bool before_has_logseq = false; memset(&t, 0, sizeof(t)); records = (amduat_fed_record_t *)calloc(2u, sizeof(*records)); bytes = (amduat_octets_t *)calloc(2u, sizeof(*bytes)); if (records == NULL || bytes == NULL) { fprintf(stderr, "failed to alloc records\n"); free(root); return 1; } if (!amduatd_test_make_record(&store, "alpha", 3u, 1u, &records[0], &bytes[0]) || !amduatd_test_make_record(&store, "beta", 4u, 1u, &records[1], &bytes[1])) { fprintf(stderr, "failed to make records\n"); free(root); return 1; } t.records = records; t.record_count = 2u; t.artifact_bytes = bytes; t.fail_artifact = true; t.fail_index = 1u; t.fail_status = 503; memset(&transport, 0, sizeof(transport)); transport.ctx = &t; transport.get_records = amduatd_test_pull_get_records; transport.free_records = amduatd_test_pull_free_records; transport.get_artifact = amduatd_test_pull_get_artifact; amduatd_fed_cursor_record_init(&cursor); memset(&cursor_ref, 0, sizeof(cursor_ref)); expect(amduatd_fed_cursor_get(&store, &pointer_store, &space, "1", &cursor, &cursor_ref) == AMDUATD_FED_CURSOR_OK, "cursor present before partial"); before_has_logseq = cursor.has_logseq; before_logseq = cursor.last_logseq; amduat_reference_free(&cursor_ref); amduatd_fed_cursor_record_free(&cursor); amduatd_fed_pull_apply_report_init(&report); rc = amduatd_fed_pull_apply(&store, &pointer_store, &space, "1", NULL, 2u, &fed_cfg, &transport, &report); expect(rc == AMDUATD_FED_PULL_APPLY_ERR_STORE, "apply partial failure"); expect(report.applied_record_count == 1u, "partial applied count"); amduatd_fed_cursor_record_init(&cursor); memset(&cursor_ref, 0, sizeof(cursor_ref)); expect(amduatd_fed_cursor_get(&store, &pointer_store, &space, "1", &cursor, &cursor_ref) == AMDUATD_FED_CURSOR_OK, "cursor present (from previous test)"); expect(cursor.has_logseq == before_has_logseq, "cursor unchanged flag"); expect(cursor.last_logseq == before_logseq, "cursor unchanged logseq"); amduat_reference_free(&cursor_ref); amduatd_fed_cursor_record_free(&cursor); amduatd_fed_pull_apply_report_free(&report); amduatd_test_free_transport(&t); } { amduatd_test_pull_transport_t t; amduatd_fed_pull_transport_t transport; amduat_fed_record_t *records = NULL; amduat_octets_t *bytes = NULL; amduatd_fed_pull_apply_report_t report; amduatd_fed_pull_apply_status_t rc; amduatd_fed_cursor_record_t cursor; amduat_reference_t cursor_ref; memset(&t, 0, sizeof(t)); records = (amduat_fed_record_t *)calloc(1u, sizeof(*records)); bytes = (amduat_octets_t *)calloc(1u, sizeof(*bytes)); if (records == NULL || bytes == NULL) { fprintf(stderr, "failed to alloc records\n"); free(root); return 1; } if (!amduatd_test_make_record(&store, "gamma", 5u, 1u, &records[0], &bytes[0])) { fprintf(stderr, "failed to make record\n"); free(root); return 1; } t.records = records; t.record_count = 1u; t.artifact_bytes = bytes; t.mutate_cursor = true; t.store = &store; t.pointer_store = &pointer_store; t.space = &space; t.peer_key = "1"; memset(&transport, 0, sizeof(transport)); transport.ctx = &t; transport.get_records = amduatd_test_pull_get_records; transport.free_records = amduatd_test_pull_free_records; transport.get_artifact = amduatd_test_pull_get_artifact; amduatd_fed_pull_apply_report_init(&report); rc = amduatd_fed_pull_apply(&store, &pointer_store, &space, "1", NULL, 1u, &fed_cfg, &transport, &report); expect(rc == AMDUATD_FED_PULL_APPLY_ERR_CONFLICT, "cursor conflict"); amduatd_fed_cursor_record_init(&cursor); memset(&cursor_ref, 0, sizeof(cursor_ref)); expect(amduatd_fed_cursor_get(&store, &pointer_store, &space, "1", &cursor, &cursor_ref) == AMDUATD_FED_CURSOR_OK, "cursor present after conflict"); expect(cursor.has_logseq && cursor.last_logseq == 99u, "cursor unchanged on conflict"); amduat_reference_free(&cursor_ref); amduatd_fed_cursor_record_free(&cursor); amduatd_fed_pull_apply_report_free(&report); amduatd_test_free_transport(&t); } free(root); return failures == 0 ? 0 : 1; }