526 lines
15 KiB
C
526 lines
15 KiB
C
#include "amduat/asl/log_store.h"
|
|
#include "amduat/asl/asl_store_index_fs.h"
|
|
#include "amduat/asl/store.h"
|
|
#include "amduat/enc/asl1_core.h"
|
|
#include "amduat/hash/asl1.h"
|
|
|
|
#include <dirent.h>
|
|
#include <errno.h>
|
|
#include <inttypes.h>
|
|
#include <pthread.h>
|
|
#include <stdbool.h>
|
|
#include <stdint.h>
|
|
#include <stdio.h>
|
|
#include <stdlib.h>
|
|
#include <string.h>
|
|
#include <sys/stat.h>
|
|
#include <sys/types.h>
|
|
#include <unistd.h>
|
|
|
|
static bool join_path(const char *base, const char *segment, char **out_path) {
|
|
size_t base_len;
|
|
size_t seg_len;
|
|
bool needs_sep;
|
|
size_t total_len;
|
|
char *buffer;
|
|
size_t offset;
|
|
|
|
if (base == NULL || segment == NULL || out_path == NULL) {
|
|
return false;
|
|
}
|
|
base_len = strlen(base);
|
|
seg_len = strlen(segment);
|
|
if (base_len == 0u || seg_len == 0u) {
|
|
return false;
|
|
}
|
|
needs_sep = base[base_len - 1u] != '/';
|
|
total_len = base_len + (needs_sep ? 1u : 0u) + seg_len + 1u;
|
|
buffer = (char *)malloc(total_len);
|
|
if (buffer == NULL) {
|
|
return false;
|
|
}
|
|
offset = 0u;
|
|
memcpy(buffer + offset, base, base_len);
|
|
offset += base_len;
|
|
if (needs_sep) {
|
|
buffer[offset++] = '/';
|
|
}
|
|
memcpy(buffer + offset, segment, seg_len);
|
|
offset += seg_len;
|
|
buffer[offset] = '\0';
|
|
*out_path = buffer;
|
|
return true;
|
|
}
|
|
|
|
static bool remove_tree(const char *path) {
|
|
struct stat st;
|
|
DIR *dir;
|
|
struct dirent *entry;
|
|
|
|
if (path == NULL) {
|
|
return false;
|
|
}
|
|
if (lstat(path, &st) != 0) {
|
|
return errno == ENOENT;
|
|
}
|
|
if (!S_ISDIR(st.st_mode)) {
|
|
return unlink(path) == 0;
|
|
}
|
|
dir = opendir(path);
|
|
if (dir == NULL) {
|
|
return false;
|
|
}
|
|
while ((entry = readdir(dir)) != NULL) {
|
|
char *child = NULL;
|
|
if (strcmp(entry->d_name, ".") == 0 || strcmp(entry->d_name, "..") == 0) {
|
|
continue;
|
|
}
|
|
if (!join_path(path, entry->d_name, &child)) {
|
|
closedir(dir);
|
|
return false;
|
|
}
|
|
if (!remove_tree(child)) {
|
|
free(child);
|
|
closedir(dir);
|
|
return false;
|
|
}
|
|
free(child);
|
|
}
|
|
if (closedir(dir) != 0) {
|
|
return false;
|
|
}
|
|
return rmdir(path) == 0;
|
|
}
|
|
|
|
static bool refs_equal(amduat_reference_t a, amduat_reference_t b) {
|
|
if (a.hash_id != b.hash_id) {
|
|
return false;
|
|
}
|
|
if (a.digest.len != b.digest.len) {
|
|
return false;
|
|
}
|
|
if (a.digest.len == 0u) {
|
|
return true;
|
|
}
|
|
if (a.digest.data == NULL || b.digest.data == NULL) {
|
|
return false;
|
|
}
|
|
return memcmp(a.digest.data, b.digest.data, a.digest.len) == 0;
|
|
}
|
|
|
|
typedef struct {
|
|
amduat_asl_store_t *store;
|
|
amduat_asl_log_store_t *log_store;
|
|
const char *log_name;
|
|
uint16_t kind;
|
|
size_t iterations;
|
|
amduat_asl_store_error_t first_err;
|
|
uint64_t *observed_offsets;
|
|
} append_worker_t;
|
|
|
|
typedef struct {
|
|
amduat_asl_store_t *store;
|
|
uint16_t kind;
|
|
size_t iterations;
|
|
amduat_asl_store_error_t first_err;
|
|
} put_worker_t;
|
|
|
|
static void *append_worker_main(void *opaque) {
|
|
append_worker_t *worker = (append_worker_t *)opaque;
|
|
size_t i;
|
|
|
|
if (worker == NULL || worker->store == NULL || worker->log_store == NULL ||
|
|
worker->log_name == NULL) {
|
|
return NULL;
|
|
}
|
|
worker->first_err = AMDUAT_ASL_STORE_OK;
|
|
|
|
for (i = 0u; i < worker->iterations; ++i) {
|
|
char payload_buf[64];
|
|
size_t payload_len;
|
|
amduat_artifact_t payload_artifact;
|
|
amduat_reference_t payload_ref;
|
|
amduat_asl_log_entry_t entry;
|
|
amduat_asl_store_error_t err;
|
|
uint64_t first_offset = 0u;
|
|
|
|
payload_len = (size_t)snprintf(payload_buf, sizeof(payload_buf),
|
|
"worker-%u-%zu", (unsigned)worker->kind, i);
|
|
if (payload_len == 0u || payload_len >= sizeof(payload_buf)) {
|
|
worker->first_err = AMDUAT_ASL_STORE_ERR_IO;
|
|
return NULL;
|
|
}
|
|
|
|
payload_artifact =
|
|
amduat_artifact(amduat_octets((const uint8_t *)payload_buf, payload_len));
|
|
payload_ref = amduat_reference(0u, amduat_octets(NULL, 0u));
|
|
err = amduat_asl_store_put(worker->store, payload_artifact, &payload_ref);
|
|
if (err != AMDUAT_ASL_STORE_OK) {
|
|
worker->first_err = err;
|
|
return NULL;
|
|
}
|
|
|
|
memset(&entry, 0, sizeof(entry));
|
|
entry.kind = worker->kind;
|
|
entry.payload_ref = payload_ref;
|
|
err = amduat_asl_log_append(worker->log_store,
|
|
worker->log_name,
|
|
&entry,
|
|
1u,
|
|
&first_offset);
|
|
amduat_reference_free(&payload_ref);
|
|
if (err != AMDUAT_ASL_STORE_OK) {
|
|
worker->first_err = err;
|
|
return NULL;
|
|
}
|
|
if (worker->observed_offsets != NULL && i < worker->iterations) {
|
|
worker->observed_offsets[i] = first_offset;
|
|
}
|
|
}
|
|
|
|
return NULL;
|
|
}
|
|
|
|
static void *put_worker_main(void *opaque) {
|
|
put_worker_t *worker = (put_worker_t *)opaque;
|
|
size_t i;
|
|
|
|
if (worker == NULL || worker->store == NULL) {
|
|
return NULL;
|
|
}
|
|
worker->first_err = AMDUAT_ASL_STORE_OK;
|
|
|
|
for (i = 0u; i < worker->iterations; ++i) {
|
|
char payload_buf[64];
|
|
size_t payload_len;
|
|
amduat_artifact_t payload_artifact;
|
|
amduat_reference_t payload_ref;
|
|
amduat_asl_store_error_t err;
|
|
|
|
payload_len = (size_t)snprintf(payload_buf, sizeof(payload_buf),
|
|
"put-only-%u-%zu", (unsigned)worker->kind, i);
|
|
if (payload_len == 0u || payload_len >= sizeof(payload_buf)) {
|
|
worker->first_err = AMDUAT_ASL_STORE_ERR_IO;
|
|
return NULL;
|
|
}
|
|
|
|
payload_artifact =
|
|
amduat_artifact(amduat_octets((const uint8_t *)payload_buf, payload_len));
|
|
payload_ref = amduat_reference(0u, amduat_octets(NULL, 0u));
|
|
err = amduat_asl_store_put(worker->store, payload_artifact, &payload_ref);
|
|
amduat_reference_free(&payload_ref);
|
|
if (err != AMDUAT_ASL_STORE_OK) {
|
|
worker->first_err = err;
|
|
return NULL;
|
|
}
|
|
}
|
|
|
|
return NULL;
|
|
}
|
|
|
|
int main(void) {
|
|
amduat_asl_store_config_t config;
|
|
amduat_asl_store_index_fs_t fs_a;
|
|
amduat_asl_store_index_fs_t fs_b;
|
|
amduat_asl_store_t store_a;
|
|
amduat_asl_store_t store_b;
|
|
amduat_asl_log_store_t log_a;
|
|
amduat_asl_log_store_t log_b;
|
|
amduat_artifact_t payload_artifact_a;
|
|
amduat_artifact_t payload_artifact_b;
|
|
amduat_reference_t payload_ref_a;
|
|
amduat_reference_t payload_ref_b;
|
|
amduat_asl_log_entry_t entry_a;
|
|
amduat_asl_log_entry_t entry_b;
|
|
amduat_asl_log_entry_t *out_entries = NULL;
|
|
size_t out_len = 0u;
|
|
uint64_t out_next_offset = 0u;
|
|
bool out_end = false;
|
|
uint64_t first_offset = 0u;
|
|
const char *log_name = "collection/space/app1/daemon/edges/log";
|
|
enum { WORKER_ITERS = 32 };
|
|
append_worker_t worker_a;
|
|
append_worker_t worker_b;
|
|
append_worker_t mixed_append_worker;
|
|
put_worker_t mixed_put_worker;
|
|
pthread_t thread_a;
|
|
pthread_t thread_b;
|
|
pthread_t mixed_append_thread;
|
|
pthread_t mixed_put_thread;
|
|
bool started_a = false;
|
|
bool started_b = false;
|
|
bool started_mixed_append = false;
|
|
bool started_mixed_put = false;
|
|
uint64_t worker_a_offsets[WORKER_ITERS];
|
|
uint64_t worker_b_offsets[WORKER_ITERS];
|
|
char *root = NULL;
|
|
int rc = 1;
|
|
|
|
root = strdup("/tmp/amduat-log-index-XXXXXX");
|
|
if (root == NULL) {
|
|
fprintf(stderr, "alloc root template failed\n");
|
|
return 1;
|
|
}
|
|
if (mkdtemp(root) == NULL) {
|
|
fprintf(stderr, "mkdtemp failed\n");
|
|
goto cleanup;
|
|
}
|
|
|
|
memset(&config, 0, sizeof(config));
|
|
config.encoding_profile_id = AMDUAT_ENC_ASL1_CORE_V1;
|
|
config.hash_id = AMDUAT_HASH_ASL1_ID_SHA256;
|
|
|
|
if (!amduat_asl_store_index_fs_init(&fs_a, config, root) ||
|
|
!amduat_asl_store_index_fs_init(&fs_b, config, root)) {
|
|
fprintf(stderr, "index fs init failed\n");
|
|
goto cleanup;
|
|
}
|
|
amduat_asl_store_init(&store_a, config, amduat_asl_store_index_fs_ops(), &fs_a);
|
|
amduat_asl_store_init(&store_b, config, amduat_asl_store_index_fs_ops(), &fs_b);
|
|
|
|
if (!amduat_asl_log_store_init(&log_a, root, &store_a, NULL) ||
|
|
!amduat_asl_log_store_init(&log_b, root, &store_b, NULL)) {
|
|
fprintf(stderr, "log store init failed\n");
|
|
goto cleanup;
|
|
}
|
|
|
|
payload_artifact_a = amduat_artifact(amduat_octets((const uint8_t *)"payload-a", 9u));
|
|
payload_ref_a = amduat_reference(0u, amduat_octets(NULL, 0u));
|
|
if (amduat_asl_store_put(&store_a, payload_artifact_a, &payload_ref_a) !=
|
|
AMDUAT_ASL_STORE_OK) {
|
|
fprintf(stderr, "put payload a failed\n");
|
|
goto cleanup;
|
|
}
|
|
|
|
memset(&entry_a, 0, sizeof(entry_a));
|
|
entry_a.kind = 1u;
|
|
entry_a.payload_ref = payload_ref_a;
|
|
if (amduat_asl_log_append(&log_a, log_name, &entry_a, 1u, &first_offset) !=
|
|
AMDUAT_ASL_STORE_OK) {
|
|
fprintf(stderr, "first append failed\n");
|
|
goto cleanup;
|
|
}
|
|
if (first_offset != 0u) {
|
|
fprintf(stderr, "first offset mismatch: %" PRIu64 "\n", first_offset);
|
|
goto cleanup;
|
|
}
|
|
|
|
payload_artifact_b = amduat_artifact(amduat_octets((const uint8_t *)"payload-b", 9u));
|
|
payload_ref_b = amduat_reference(0u, amduat_octets(NULL, 0u));
|
|
if (amduat_asl_store_put(&store_b, payload_artifact_b, &payload_ref_b) !=
|
|
AMDUAT_ASL_STORE_OK) {
|
|
fprintf(stderr, "put payload b failed\n");
|
|
goto cleanup;
|
|
}
|
|
|
|
memset(&entry_b, 0, sizeof(entry_b));
|
|
entry_b.kind = 2u;
|
|
entry_b.payload_ref = payload_ref_b;
|
|
if (amduat_asl_log_append(&log_b, log_name, &entry_b, 1u, &first_offset) !=
|
|
AMDUAT_ASL_STORE_OK) {
|
|
fprintf(stderr, "second append failed\n");
|
|
goto cleanup;
|
|
}
|
|
if (first_offset != 1u) {
|
|
fprintf(stderr, "second first_offset mismatch: %" PRIu64 "\n", first_offset);
|
|
goto cleanup;
|
|
}
|
|
|
|
if (amduat_asl_log_read(&log_a,
|
|
log_name,
|
|
0u,
|
|
8u,
|
|
&out_entries,
|
|
&out_len,
|
|
&out_next_offset,
|
|
&out_end) != AMDUAT_ASL_STORE_OK) {
|
|
fprintf(stderr, "log read failed\n");
|
|
goto cleanup;
|
|
}
|
|
|
|
if (out_len != 2u || out_next_offset != 2u || !out_end) {
|
|
fprintf(stderr, "readback shape mismatch len=%zu next=%" PRIu64 " end=%d\n",
|
|
out_len, out_next_offset, out_end ? 1 : 0);
|
|
goto cleanup;
|
|
}
|
|
if (out_entries[0].kind != 1u || !refs_equal(out_entries[0].payload_ref, payload_ref_a)) {
|
|
fprintf(stderr, "entry0 mismatch\n");
|
|
goto cleanup;
|
|
}
|
|
if (out_entries[1].kind != 2u || !refs_equal(out_entries[1].payload_ref, payload_ref_b)) {
|
|
fprintf(stderr, "entry1 mismatch\n");
|
|
goto cleanup;
|
|
}
|
|
amduat_asl_log_entries_free(out_entries, out_len);
|
|
out_entries = NULL;
|
|
out_len = 0u;
|
|
|
|
memset(&worker_a, 0, sizeof(worker_a));
|
|
worker_a.store = &store_a;
|
|
worker_a.log_store = &log_a;
|
|
worker_a.log_name = log_name;
|
|
worker_a.kind = 10u;
|
|
worker_a.iterations = WORKER_ITERS;
|
|
worker_a.observed_offsets = worker_a_offsets;
|
|
|
|
memset(&worker_b, 0, sizeof(worker_b));
|
|
worker_b.store = &store_b;
|
|
worker_b.log_store = &log_b;
|
|
worker_b.log_name = log_name;
|
|
worker_b.kind = 11u;
|
|
worker_b.iterations = WORKER_ITERS;
|
|
worker_b.observed_offsets = worker_b_offsets;
|
|
|
|
if (pthread_create(&thread_a, NULL, append_worker_main, &worker_a) != 0) {
|
|
fprintf(stderr, "pthread_create failed\n");
|
|
goto cleanup;
|
|
}
|
|
started_a = true;
|
|
if (pthread_create(&thread_b, NULL, append_worker_main, &worker_b) != 0) {
|
|
fprintf(stderr, "pthread_create failed\n");
|
|
goto cleanup;
|
|
}
|
|
started_b = true;
|
|
(void)pthread_join(thread_a, NULL);
|
|
(void)pthread_join(thread_b, NULL);
|
|
started_a = false;
|
|
started_b = false;
|
|
|
|
if (worker_a.first_err != AMDUAT_ASL_STORE_OK) {
|
|
fprintf(stderr, "worker_a failed err=%d\n", (int)worker_a.first_err);
|
|
goto cleanup;
|
|
}
|
|
if (worker_b.first_err != AMDUAT_ASL_STORE_OK) {
|
|
fprintf(stderr, "worker_b failed err=%d\n", (int)worker_b.first_err);
|
|
goto cleanup;
|
|
}
|
|
{
|
|
bool seen[2u + 2u * WORKER_ITERS];
|
|
size_t i;
|
|
memset(seen, 0, sizeof(seen));
|
|
for (i = 0u; i < WORKER_ITERS; ++i) {
|
|
if (worker_a_offsets[i] < 2u ||
|
|
worker_a_offsets[i] >= (uint64_t)(2u + 2u * WORKER_ITERS)) {
|
|
fprintf(stderr, "worker_a offset out of range: %" PRIu64 "\n",
|
|
worker_a_offsets[i]);
|
|
goto cleanup;
|
|
}
|
|
if (seen[worker_a_offsets[i]]) {
|
|
fprintf(stderr, "duplicate offset from worker_a: %" PRIu64 "\n",
|
|
worker_a_offsets[i]);
|
|
goto cleanup;
|
|
}
|
|
seen[worker_a_offsets[i]] = true;
|
|
}
|
|
for (i = 0u; i < WORKER_ITERS; ++i) {
|
|
if (worker_b_offsets[i] < 2u ||
|
|
worker_b_offsets[i] >= (uint64_t)(2u + 2u * WORKER_ITERS)) {
|
|
fprintf(stderr, "worker_b offset out of range: %" PRIu64 "\n",
|
|
worker_b_offsets[i]);
|
|
goto cleanup;
|
|
}
|
|
if (seen[worker_b_offsets[i]]) {
|
|
fprintf(stderr, "duplicate offset from worker_b: %" PRIu64 "\n",
|
|
worker_b_offsets[i]);
|
|
goto cleanup;
|
|
}
|
|
seen[worker_b_offsets[i]] = true;
|
|
}
|
|
for (i = 2u; i < (2u + 2u * WORKER_ITERS); ++i) {
|
|
if (!seen[i]) {
|
|
fprintf(stderr, "missing worker offset: %zu\n", i);
|
|
goto cleanup;
|
|
}
|
|
}
|
|
}
|
|
|
|
if (amduat_asl_log_read(&log_a,
|
|
log_name,
|
|
0u,
|
|
4096u,
|
|
&out_entries,
|
|
&out_len,
|
|
&out_next_offset,
|
|
&out_end) != AMDUAT_ASL_STORE_OK) {
|
|
fprintf(stderr, "log read after stress failed\n");
|
|
goto cleanup;
|
|
}
|
|
if (out_len != (2u + 2u * WORKER_ITERS) || out_next_offset != out_len ||
|
|
!out_end) {
|
|
fprintf(stderr,
|
|
"stress read mismatch len=%zu next=%" PRIu64 " expected=%u end=%d\n",
|
|
out_len, out_next_offset, (unsigned)(2u + 2u * WORKER_ITERS),
|
|
out_end ? 1 : 0);
|
|
goto cleanup;
|
|
}
|
|
amduat_asl_log_entries_free(out_entries, out_len);
|
|
out_entries = NULL;
|
|
out_len = 0u;
|
|
|
|
memset(&mixed_append_worker, 0, sizeof(mixed_append_worker));
|
|
mixed_append_worker.store = &store_a;
|
|
mixed_append_worker.log_store = &log_a;
|
|
mixed_append_worker.log_name = log_name;
|
|
mixed_append_worker.kind = 12u;
|
|
mixed_append_worker.iterations = WORKER_ITERS;
|
|
|
|
memset(&mixed_put_worker, 0, sizeof(mixed_put_worker));
|
|
mixed_put_worker.store = &store_b;
|
|
mixed_put_worker.kind = 13u;
|
|
mixed_put_worker.iterations = WORKER_ITERS * 4u;
|
|
|
|
if (pthread_create(&mixed_append_thread, NULL, append_worker_main,
|
|
&mixed_append_worker) != 0) {
|
|
fprintf(stderr, "pthread_create mixed append failed\n");
|
|
goto cleanup;
|
|
}
|
|
started_mixed_append = true;
|
|
if (pthread_create(&mixed_put_thread, NULL, put_worker_main,
|
|
&mixed_put_worker) != 0) {
|
|
fprintf(stderr, "pthread_create mixed put failed\n");
|
|
goto cleanup;
|
|
}
|
|
started_mixed_put = true;
|
|
(void)pthread_join(mixed_append_thread, NULL);
|
|
(void)pthread_join(mixed_put_thread, NULL);
|
|
started_mixed_append = false;
|
|
started_mixed_put = false;
|
|
|
|
if (mixed_append_worker.first_err != AMDUAT_ASL_STORE_OK) {
|
|
fprintf(stderr, "mixed append worker failed err=%d\n",
|
|
(int)mixed_append_worker.first_err);
|
|
goto cleanup;
|
|
}
|
|
if (mixed_put_worker.first_err != AMDUAT_ASL_STORE_OK) {
|
|
fprintf(stderr, "mixed put worker failed err=%d\n",
|
|
(int)mixed_put_worker.first_err);
|
|
goto cleanup;
|
|
}
|
|
|
|
rc = 0;
|
|
|
|
cleanup:
|
|
if (started_a) {
|
|
(void)pthread_join(thread_a, NULL);
|
|
}
|
|
if (started_b) {
|
|
(void)pthread_join(thread_b, NULL);
|
|
}
|
|
if (started_mixed_append) {
|
|
(void)pthread_join(mixed_append_thread, NULL);
|
|
}
|
|
if (started_mixed_put) {
|
|
(void)pthread_join(mixed_put_thread, NULL);
|
|
}
|
|
amduat_asl_log_entries_free(out_entries, out_len);
|
|
amduat_reference_free(&payload_ref_a);
|
|
amduat_reference_free(&payload_ref_b);
|
|
if (root != NULL) {
|
|
if (!remove_tree(root)) {
|
|
fprintf(stderr, "warning: cleanup failed for %s\n", root);
|
|
}
|
|
free(root);
|
|
}
|
|
return rc;
|
|
}
|