amduat-api/tests/test_amduatd_fed_pull_apply.c

582 lines
19 KiB
C
Raw Normal View History

#ifndef _POSIX_C_SOURCE
#define _POSIX_C_SOURCE 200809L
#endif
#include "amduatd_fed_pull_apply.h"
#include "amduatd_fed_cursor.h"
#include "amduatd_store.h"
#include "amduat/asl/artifact_io.h"
#include "amduat/asl/asl_store_fs_meta.h"
#include "amduat/asl/ref_derive.h"
#include "amduat/hash/asl1.h"
#include <stdbool.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
typedef struct {
amduat_fed_record_t *records;
size_t record_count;
amduat_octets_t *artifact_bytes;
bool fail_artifact;
size_t fail_index;
int fail_status;
bool mutate_cursor;
amduat_asl_store_t *store;
amduat_asl_pointer_store_t *pointer_store;
const amduatd_space_t *space;
const char *peer_key;
} amduatd_test_pull_transport_t;
static int failures = 0;
static void expect(bool cond, const char *msg) {
if (!cond) {
fprintf(stderr, "FAIL: %s\n", msg);
failures++;
}
}
static char *amduatd_test_make_temp_dir(void) {
char tmpl[] = "/tmp/amduatd-fed-pull-XXXXXX";
char *dir = mkdtemp(tmpl);
size_t len;
char *copy;
if (dir == NULL) {
perror("mkdtemp");
return NULL;
}
len = strlen(dir);
copy = (char *)malloc(len + 1u);
if (copy == NULL) {
fprintf(stderr, "failed to allocate temp dir copy\n");
return NULL;
}
memcpy(copy, dir, len + 1u);
return copy;
}
static bool amduatd_test_clone_record(const amduat_fed_record_t *src,
amduat_fed_record_t *dst) {
if (src == NULL || dst == NULL) {
return false;
}
*dst = *src;
if (!amduat_reference_clone(src->id.ref, &dst->id.ref)) {
return false;
}
return true;
}
static bool amduatd_test_pull_get_records(void *ctx,
uint32_t domain_id,
uint64_t from_logseq,
uint64_t limit,
int *out_status,
amduat_fed_record_t **out_records,
size_t *out_len,
char **out_body) {
amduatd_test_pull_transport_t *t = (amduatd_test_pull_transport_t *)ctx;
amduat_fed_record_t *records = NULL;
size_t i;
(void)domain_id;
(void)from_logseq;
(void)limit;
if (out_status == NULL || out_records == NULL || out_len == NULL) {
return false;
}
*out_status = 200;
*out_records = NULL;
*out_len = 0;
if (out_body != NULL) {
*out_body = NULL;
}
if (t == NULL || t->record_count == 0u) {
return true;
}
records = (amduat_fed_record_t *)calloc(t->record_count, sizeof(*records));
if (records == NULL) {
return false;
}
for (i = 0; i < t->record_count; ++i) {
if (!amduatd_test_clone_record(&t->records[i], &records[i])) {
free(records);
return false;
}
}
*out_records = records;
*out_len = t->record_count;
if (t->mutate_cursor && t->store != NULL && t->pointer_store != NULL &&
t->space != NULL && t->peer_key != NULL) {
amduatd_fed_cursor_record_t cursor;
amduat_reference_t cursor_ref;
amduatd_fed_cursor_record_t current;
amduat_reference_t current_ref;
const amduat_reference_t *expected_ref = NULL;
amduatd_fed_cursor_record_init(&cursor);
cursor.peer_key = strdup(t->peer_key);
cursor.space_id = strdup((const char *)t->space->space_id.data);
cursor.has_logseq = true;
cursor.last_logseq = 99u;
cursor.has_record_ref = true;
if (cursor.peer_key != NULL && cursor.space_id != NULL &&
amduat_reference_clone(t->records[0].id.ref,
&cursor.last_record_ref)) {
amduatd_fed_cursor_record_init(&current);
memset(&current_ref, 0, sizeof(current_ref));
if (amduatd_fed_cursor_get(t->store,
t->pointer_store,
t->space,
t->peer_key,
&current,
&current_ref) == AMDUATD_FED_CURSOR_OK) {
expected_ref = &current_ref;
}
(void)amduatd_fed_cursor_cas_set(t->store,
t->pointer_store,
t->space,
t->peer_key,
expected_ref,
&cursor,
&cursor_ref);
if (expected_ref != NULL) {
amduat_reference_free(&current_ref);
}
amduatd_fed_cursor_record_free(&current);
amduat_reference_free(&cursor_ref);
}
amduatd_fed_cursor_record_free(&cursor);
}
return true;
}
static void amduatd_test_pull_free_records(void *ctx,
amduat_fed_record_t *records,
size_t len) {
size_t i;
(void)ctx;
if (records == NULL) {
return;
}
for (i = 0; i < len; ++i) {
amduat_reference_free(&records[i].id.ref);
}
free(records);
}
static bool amduatd_test_pull_get_artifact(void *ctx,
amduat_reference_t ref,
int *out_status,
amduat_octets_t *out_bytes,
char **out_body) {
amduatd_test_pull_transport_t *t = (amduatd_test_pull_transport_t *)ctx;
size_t i;
if (out_status == NULL || out_bytes == NULL) {
return false;
}
*out_status = 404;
*out_bytes = amduat_octets(NULL, 0u);
if (out_body != NULL) {
*out_body = NULL;
}
if (t == NULL) {
return false;
}
for (i = 0; i < t->record_count; ++i) {
if (!amduat_reference_eq(ref, t->records[i].id.ref)) {
continue;
}
if (t->fail_artifact && i == t->fail_index) {
*out_status = t->fail_status;
return true;
}
if (!amduat_octets_clone(t->artifact_bytes[i], out_bytes)) {
return false;
}
*out_status = 200;
return true;
}
return true;
}
static bool amduatd_test_make_record(amduat_asl_store_t *store,
const char *payload,
uint64_t logseq,
uint32_t domain_id,
amduat_fed_record_t *out_record,
amduat_octets_t *out_bytes) {
amduat_artifact_t artifact;
amduat_reference_t ref;
amduat_octets_t artifact_bytes = amduat_octets(NULL, 0u);
amduat_octets_t payload_bytes = amduat_octets(NULL, 0u);
if (store == NULL || payload == NULL || out_record == NULL ||
out_bytes == NULL) {
return false;
}
if (!amduat_octets_clone(amduat_octets(payload, strlen(payload)),
&payload_bytes)) {
return false;
}
if (!amduat_asl_artifact_from_bytes(payload_bytes,
AMDUAT_ASL_IO_RAW,
false,
amduat_type_tag(0u),
&artifact)) {
amduat_octets_free(&payload_bytes);
return false;
}
if (!amduat_asl_ref_derive(artifact,
store->config.encoding_profile_id,
store->config.hash_id,
&ref,
&artifact_bytes)) {
amduat_asl_artifact_free(&artifact);
return false;
}
amduat_asl_artifact_free(&artifact);
amduat_octets_free(&artifact_bytes);
memset(out_record, 0, sizeof(*out_record));
out_record->id.type = AMDUAT_FED_REC_ARTIFACT;
out_record->id.ref = ref;
out_record->logseq = logseq;
out_record->snapshot_id = 0u;
out_record->log_prefix = 0u;
out_record->meta.domain_id = domain_id;
out_record->meta.visibility = 1u;
out_record->meta.has_source = 0u;
out_record->meta.source_domain = 0u;
if (!amduat_octets_clone(amduat_octets(payload, strlen(payload)),
out_bytes)) {
amduat_reference_free(&ref);
return false;
}
return true;
}
static void amduatd_test_free_transport(amduatd_test_pull_transport_t *t) {
size_t i;
if (t == NULL) {
return;
}
if (t->records != NULL) {
for (i = 0; i < t->record_count; ++i) {
amduat_reference_free(&t->records[i].id.ref);
}
free(t->records);
}
if (t->artifact_bytes != NULL) {
for (i = 0; i < t->record_count; ++i) {
amduat_octets_free(&t->artifact_bytes[i]);
}
free(t->artifact_bytes);
}
memset(t, 0, sizeof(*t));
}
int main(void) {
char *root = amduatd_test_make_temp_dir();
amduat_asl_store_fs_config_t cfg;
amduatd_store_ctx_t store_ctx;
amduat_asl_store_t store;
amduat_asl_pointer_store_t pointer_store;
amduatd_space_t space;
amduatd_fed_cfg_t fed_cfg;
if (root == NULL) {
return 1;
}
memset(&cfg, 0, sizeof(cfg));
if (!amduat_asl_store_fs_init_root(root, NULL, &cfg)) {
fprintf(stderr, "failed to init store root\n");
free(root);
return 1;
}
memset(&store_ctx, 0, sizeof(store_ctx));
memset(&store, 0, sizeof(store));
if (!amduatd_store_init(&store,
&cfg,
&store_ctx,
root,
AMDUATD_STORE_BACKEND_INDEX)) {
fprintf(stderr, "failed to init store\n");
free(root);
return 1;
}
if (!amduat_asl_pointer_store_init(&pointer_store, root)) {
fprintf(stderr, "failed to init pointer store\n");
free(root);
return 1;
}
if (!amduatd_space_init(&space, "alpha", false)) {
fprintf(stderr, "failed to init space\n");
free(root);
return 1;
}
amduatd_fed_cfg_init(&fed_cfg);
fed_cfg.enabled = true;
{
amduatd_test_pull_transport_t t;
amduatd_fed_pull_transport_t transport;
amduat_fed_record_t *records = NULL;
amduat_octets_t *bytes = NULL;
amduatd_fed_pull_apply_report_t report;
amduatd_fed_pull_apply_status_t rc;
amduat_reference_t cursor_ref;
amduatd_fed_cursor_record_t cursor;
size_t i;
memset(&t, 0, sizeof(t));
records = (amduat_fed_record_t *)calloc(2u, sizeof(*records));
bytes = (amduat_octets_t *)calloc(2u, sizeof(*bytes));
if (records == NULL || bytes == NULL) {
fprintf(stderr, "failed to alloc records\n");
free(root);
return 1;
}
if (!amduatd_test_make_record(&store,
"hello",
1u,
1u,
&records[0],
&bytes[0]) ||
!amduatd_test_make_record(&store,
"world",
2u,
1u,
&records[1],
&bytes[1])) {
fprintf(stderr, "failed to make records\n");
free(root);
return 1;
}
t.records = records;
t.record_count = 2u;
t.artifact_bytes = bytes;
memset(&transport, 0, sizeof(transport));
transport.ctx = &t;
transport.get_records = amduatd_test_pull_get_records;
transport.free_records = amduatd_test_pull_free_records;
transport.get_artifact = amduatd_test_pull_get_artifact;
amduatd_fed_pull_apply_report_init(&report);
rc = amduatd_fed_pull_apply(&store,
&pointer_store,
&space,
"1",
NULL,
2u,
&fed_cfg,
&transport,
&report);
expect(rc == AMDUATD_FED_PULL_APPLY_OK, "apply success");
expect(report.applied_record_count == 2u, "applied record count");
expect(report.cursor_advanced, "cursor advanced");
for (i = 0; i < 2u; ++i) {
amduat_artifact_t artifact;
memset(&artifact, 0, sizeof(artifact));
expect(amduat_asl_store_get(&store,
records[i].id.ref,
&artifact) == AMDUAT_ASL_STORE_OK,
"artifact stored");
amduat_asl_artifact_free(&artifact);
}
amduatd_fed_cursor_record_init(&cursor);
memset(&cursor_ref, 0, sizeof(cursor_ref));
expect(amduatd_fed_cursor_get(&store,
&pointer_store,
&space,
"1",
&cursor,
&cursor_ref) == AMDUATD_FED_CURSOR_OK,
"cursor get after apply");
expect(cursor.has_logseq && cursor.last_logseq == 2u,
"cursor advanced logseq");
amduat_reference_free(&cursor_ref);
amduatd_fed_cursor_record_free(&cursor);
amduatd_fed_pull_apply_report_free(&report);
amduatd_test_free_transport(&t);
}
{
amduatd_test_pull_transport_t t;
amduatd_fed_pull_transport_t transport;
amduat_fed_record_t *records = NULL;
amduat_octets_t *bytes = NULL;
amduatd_fed_pull_apply_report_t report;
amduatd_fed_pull_apply_status_t rc;
amduatd_fed_cursor_record_t cursor;
amduat_reference_t cursor_ref;
uint64_t before_logseq = 0u;
bool before_has_logseq = false;
memset(&t, 0, sizeof(t));
records = (amduat_fed_record_t *)calloc(2u, sizeof(*records));
bytes = (amduat_octets_t *)calloc(2u, sizeof(*bytes));
if (records == NULL || bytes == NULL) {
fprintf(stderr, "failed to alloc records\n");
free(root);
return 1;
}
if (!amduatd_test_make_record(&store,
"alpha",
3u,
1u,
&records[0],
&bytes[0]) ||
!amduatd_test_make_record(&store,
"beta",
4u,
1u,
&records[1],
&bytes[1])) {
fprintf(stderr, "failed to make records\n");
free(root);
return 1;
}
t.records = records;
t.record_count = 2u;
t.artifact_bytes = bytes;
t.fail_artifact = true;
t.fail_index = 1u;
t.fail_status = 503;
memset(&transport, 0, sizeof(transport));
transport.ctx = &t;
transport.get_records = amduatd_test_pull_get_records;
transport.free_records = amduatd_test_pull_free_records;
transport.get_artifact = amduatd_test_pull_get_artifact;
amduatd_fed_cursor_record_init(&cursor);
memset(&cursor_ref, 0, sizeof(cursor_ref));
expect(amduatd_fed_cursor_get(&store,
&pointer_store,
&space,
"1",
&cursor,
&cursor_ref) == AMDUATD_FED_CURSOR_OK,
"cursor present before partial");
before_has_logseq = cursor.has_logseq;
before_logseq = cursor.last_logseq;
amduat_reference_free(&cursor_ref);
amduatd_fed_cursor_record_free(&cursor);
amduatd_fed_pull_apply_report_init(&report);
rc = amduatd_fed_pull_apply(&store,
&pointer_store,
&space,
"1",
NULL,
2u,
&fed_cfg,
&transport,
&report);
expect(rc == AMDUATD_FED_PULL_APPLY_ERR_STORE, "apply partial failure");
expect(report.applied_record_count == 1u, "partial applied count");
amduatd_fed_cursor_record_init(&cursor);
memset(&cursor_ref, 0, sizeof(cursor_ref));
expect(amduatd_fed_cursor_get(&store,
&pointer_store,
&space,
"1",
&cursor,
&cursor_ref) == AMDUATD_FED_CURSOR_OK,
"cursor present (from previous test)");
expect(cursor.has_logseq == before_has_logseq, "cursor unchanged flag");
expect(cursor.last_logseq == before_logseq, "cursor unchanged logseq");
amduat_reference_free(&cursor_ref);
amduatd_fed_cursor_record_free(&cursor);
amduatd_fed_pull_apply_report_free(&report);
amduatd_test_free_transport(&t);
}
{
amduatd_test_pull_transport_t t;
amduatd_fed_pull_transport_t transport;
amduat_fed_record_t *records = NULL;
amduat_octets_t *bytes = NULL;
amduatd_fed_pull_apply_report_t report;
amduatd_fed_pull_apply_status_t rc;
amduatd_fed_cursor_record_t cursor;
amduat_reference_t cursor_ref;
memset(&t, 0, sizeof(t));
records = (amduat_fed_record_t *)calloc(1u, sizeof(*records));
bytes = (amduat_octets_t *)calloc(1u, sizeof(*bytes));
if (records == NULL || bytes == NULL) {
fprintf(stderr, "failed to alloc records\n");
free(root);
return 1;
}
if (!amduatd_test_make_record(&store,
"gamma",
5u,
1u,
&records[0],
&bytes[0])) {
fprintf(stderr, "failed to make record\n");
free(root);
return 1;
}
t.records = records;
t.record_count = 1u;
t.artifact_bytes = bytes;
t.mutate_cursor = true;
t.store = &store;
t.pointer_store = &pointer_store;
t.space = &space;
t.peer_key = "1";
memset(&transport, 0, sizeof(transport));
transport.ctx = &t;
transport.get_records = amduatd_test_pull_get_records;
transport.free_records = amduatd_test_pull_free_records;
transport.get_artifact = amduatd_test_pull_get_artifact;
amduatd_fed_pull_apply_report_init(&report);
rc = amduatd_fed_pull_apply(&store,
&pointer_store,
&space,
"1",
NULL,
1u,
&fed_cfg,
&transport,
&report);
expect(rc == AMDUATD_FED_PULL_APPLY_ERR_CONFLICT, "cursor conflict");
amduatd_fed_cursor_record_init(&cursor);
memset(&cursor_ref, 0, sizeof(cursor_ref));
expect(amduatd_fed_cursor_get(&store,
&pointer_store,
&space,
"1",
&cursor,
&cursor_ref) == AMDUATD_FED_CURSOR_OK,
"cursor present after conflict");
expect(cursor.has_logseq && cursor.last_logseq == 99u,
"cursor unchanged on conflict");
amduat_reference_free(&cursor_ref);
amduatd_fed_cursor_record_free(&cursor);
amduatd_fed_pull_apply_report_free(&report);
amduatd_test_free_transport(&t);
}
free(root);
return failures == 0 ? 0 : 1;
}