amduat-api/tests/test_amduatd_fed_push_apply.c

279 lines
8.7 KiB
C
Raw Permalink Normal View History

#ifndef _POSIX_C_SOURCE
#define _POSIX_C_SOURCE 200809L
#endif
#include "amduatd_fed_push_apply.h"
#include "amduatd_fed_cursor.h"
#include "amduatd_space.h"
#include "amduatd_store.h"
#include "amduat/asl/asl_store_fs_meta.h"
#include "amduat/asl/artifact_io.h"
#include "amduat/asl/log_store.h"
#include "amduat/hash/asl1.h"
#include <stdbool.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
typedef struct {
size_t call_count;
size_t already_present_at;
} amduatd_test_push_transport_t;
static int failures = 0;
static void expect(bool cond, const char *msg) {
if (!cond) {
fprintf(stderr, "FAIL: %s\n", msg);
failures++;
}
}
static char *amduatd_test_make_temp_dir(void) {
char tmpl[] = "/tmp/amduatd-fed-push-XXXXXX";
char *dir = mkdtemp(tmpl);
size_t len;
char *copy;
if (dir == NULL) {
perror("mkdtemp");
return NULL;
}
len = strlen(dir);
copy = (char *)malloc(len + 1u);
if (copy == NULL) {
fprintf(stderr, "failed to allocate temp dir copy\n");
return NULL;
}
memcpy(copy, dir, len + 1u);
return copy;
}
static bool amduatd_test_store_artifact(amduat_asl_store_t *store,
const char *payload,
amduat_reference_t *out_ref) {
amduat_artifact_t artifact;
amduat_octets_t payload_bytes = amduat_octets(NULL, 0u);
amduat_asl_index_state_t state;
amduat_asl_store_error_t err;
if (store == NULL || payload == NULL || out_ref == NULL) {
return false;
}
if (!amduat_octets_clone(amduat_octets(payload, strlen(payload)),
&payload_bytes)) {
return false;
}
if (!amduat_asl_artifact_from_bytes(payload_bytes,
AMDUAT_ASL_IO_RAW,
false,
amduat_type_tag(0u),
&artifact)) {
amduat_octets_free(&payload_bytes);
return false;
}
err = amduat_asl_store_put_indexed(store, artifact, out_ref, &state);
amduat_asl_artifact_free(&artifact);
return err == AMDUAT_ASL_STORE_OK;
}
static bool amduatd_test_append_fed_log(amduat_asl_store_t *store,
amduat_asl_pointer_store_t *pointer_store,
const amduatd_space_t *space,
const char *root_path,
amduat_reference_t ref) {
amduat_asl_log_store_t log_store;
amduat_octets_t log_name = amduat_octets(NULL, 0u);
amduat_asl_log_entry_t entry;
uint64_t offset = 0u;
amduat_asl_store_error_t err;
if (!amduat_asl_log_store_init(&log_store,
root_path,
store,
pointer_store)) {
return false;
}
if (!amduatd_space_scope_name(space, "fed/records", &log_name)) {
return false;
}
memset(&entry, 0, sizeof(entry));
entry.kind = AMDUATD_FED_LOG_KIND_ARTIFACT;
entry.has_timestamp = false;
entry.timestamp = 0u;
entry.payload_ref = ref;
entry.has_actor = false;
entry.actor = amduat_octets(NULL, 0u);
err = amduat_asl_log_append(&log_store,
(const char *)log_name.data,
&entry,
1u,
&offset);
amduat_octets_free(&log_name);
return err == AMDUAT_ASL_STORE_OK;
}
static bool amduatd_test_push_post_ingest(void *ctx,
amduat_fed_record_type_t record_type,
amduat_reference_t ref,
amduat_octets_t bytes,
int *out_status,
char **out_body) {
amduatd_test_push_transport_t *t = (amduatd_test_push_transport_t *)ctx;
const char *status = "ok";
const char *applied = "true";
char buf[256];
int n;
(void)record_type;
(void)ref;
(void)bytes;
if (out_status == NULL || out_body == NULL || t == NULL) {
return false;
}
t->call_count++;
if (t->already_present_at != 0u && t->call_count == t->already_present_at) {
status = "already_present";
applied = "false";
}
n = snprintf(buf,
sizeof(buf),
"{\"status\":\"%s\",\"applied\":%s,"
"\"ref\":null,\"effective_space\":{"
"\"mode\":\"unscoped\",\"space_id\":null}}",
status,
applied);
if (n <= 0 || (size_t)n >= sizeof(buf)) {
return false;
}
*out_status = 200;
*out_body = strdup(buf);
return *out_body != NULL;
}
int main(void) {
char *root = amduatd_test_make_temp_dir();
amduat_asl_store_fs_config_t cfg;
amduatd_store_ctx_t store_ctx;
amduat_asl_store_t store;
amduat_asl_pointer_store_t pointer_store;
amduatd_space_t space;
amduatd_fed_cfg_t fed_cfg;
amduat_reference_t ref0;
amduat_reference_t ref1;
amduatd_fed_push_transport_t transport;
amduatd_test_push_transport_t stub;
amduatd_fed_push_apply_report_t report;
amduatd_fed_push_apply_status_t status;
amduatd_fed_cursor_record_t cursor;
amduat_reference_t cursor_ref;
if (root == NULL) {
return 1;
}
memset(&cfg, 0, sizeof(cfg));
if (!amduat_asl_store_fs_init_root(root, NULL, &cfg)) {
fprintf(stderr, "failed to init store root\n");
free(root);
return 1;
}
memset(&store_ctx, 0, sizeof(store_ctx));
memset(&store, 0, sizeof(store));
if (!amduatd_store_init(&store,
&cfg,
&store_ctx,
root,
AMDUATD_STORE_BACKEND_INDEX)) {
fprintf(stderr, "failed to init store\n");
free(root);
return 1;
}
if (!amduat_asl_pointer_store_init(&pointer_store, root)) {
fprintf(stderr, "failed to init pointer store\n");
free(root);
return 1;
}
if (!amduatd_space_init(&space, "demo", false)) {
fprintf(stderr, "failed to init space\n");
free(root);
return 1;
}
if (!amduatd_test_store_artifact(&store, "alpha", &ref0) ||
!amduatd_test_store_artifact(&store, "beta", &ref1)) {
fprintf(stderr, "failed to store artifacts\n");
free(root);
return 1;
}
if (!amduatd_test_append_fed_log(&store,
&pointer_store,
&space,
root,
ref0) ||
!amduatd_test_append_fed_log(&store,
&pointer_store,
&space,
root,
ref1)) {
fprintf(stderr, "failed to append fed log\n");
amduat_reference_free(&ref0);
amduat_reference_free(&ref1);
free(root);
return 1;
}
amduatd_fed_cfg_init(&fed_cfg);
fed_cfg.enabled = true;
memset(&stub, 0, sizeof(stub));
stub.already_present_at = 1u;
memset(&transport, 0, sizeof(transport));
transport.ctx = &stub;
transport.post_ingest = amduatd_test_push_post_ingest;
status = amduatd_fed_push_apply(&store,
&pointer_store,
&space,
"2",
NULL,
16u,
root,
&fed_cfg,
&transport,
&report);
expect(status == AMDUATD_FED_PUSH_APPLY_OK, "push apply ok");
expect(report.sent_record_count == 2u, "sent record count");
expect(report.peer_ok_count == 1u, "peer ok count");
expect(report.peer_already_present_count == 1u, "peer already present");
expect(report.cursor_advanced, "cursor advanced");
expect(report.cursor_after_has_logseq &&
report.cursor_after_logseq == 1u,
"cursor after logseq");
amduatd_fed_cursor_record_init(&cursor);
memset(&cursor_ref, 0, sizeof(cursor_ref));
{
amduatd_fed_cursor_status_t st;
st = amduatd_fed_push_cursor_get(&store,
&pointer_store,
&space,
"2",
&cursor,
&cursor_ref);
expect(st == AMDUATD_FED_CURSOR_OK, "push cursor stored");
expect(cursor.has_logseq && cursor.last_logseq == 1u,
"push cursor logseq");
}
amduatd_fed_cursor_record_free(&cursor);
amduat_reference_free(&cursor_ref);
amduatd_fed_push_apply_report_free(&report);
amduat_reference_free(&ref0);
amduat_reference_free(&ref1);
free(root);
return failures == 0 ? 0 : 1;
}