From c7a9e2f6aa30dc67731412fc27966a168e2367db Mon Sep 17 00:00:00 2001 From: Carl Niklas Rydberg Date: Sun, 8 Feb 2026 00:06:42 +0100 Subject: [PATCH] Fix index-fs write locking and add index backend regressions --- CMakeLists.txt | 30 ++ include/amduat/asl/asl_store_index_fs.h | 5 + src/adapters/asl_pointer_fs/asl_pointer_fs.c | 17 +- .../asl_store_index_fs/asl_store_index_fs.c | 155 +++++- src/core/asl_log_store.c | 68 +++ .../asl/test_asl_index_put_get_consistency.c | 176 +++++++ tests/asl/test_asl_log_store_index_fs.c | 478 ++++++++++++++++++ 7 files changed, 923 insertions(+), 6 deletions(-) create mode 100644 tests/asl/test_asl_index_put_get_consistency.c create mode 100644 tests/asl/test_asl_log_store_index_fs.c diff --git a/CMakeLists.txt b/CMakeLists.txt index e2061e6..2d82535 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -509,6 +509,36 @@ set_tests_properties(asl_store_index_fs PROPERTIES TIMEOUT 120 ) +add_executable(amduat_test_asl_log_store_index_fs + tests/asl/test_asl_log_store_index_fs.c) +target_include_directories(amduat_test_asl_log_store_index_fs + PRIVATE ${AMDUAT_INTERNAL_DIR} + PRIVATE ${AMDUAT_INCLUDE_DIR} +) +target_compile_definitions(amduat_test_asl_log_store_index_fs + PRIVATE _POSIX_C_SOURCE=200809L +) +target_link_libraries(amduat_test_asl_log_store_index_fs + PRIVATE amduat_asl_log_store amduat_asl_store_index_fs amduat_format pthread +) +add_test(NAME asl_log_store_index_fs COMMAND amduat_test_asl_log_store_index_fs) +set_tests_properties(asl_log_store_index_fs PROPERTIES TIMEOUT 30) + +add_executable(amduat_test_asl_index_put_get_consistency + tests/asl/test_asl_index_put_get_consistency.c) +target_include_directories(amduat_test_asl_index_put_get_consistency + PRIVATE ${AMDUAT_INTERNAL_DIR} + PRIVATE ${AMDUAT_INCLUDE_DIR} +) +target_compile_definitions(amduat_test_asl_index_put_get_consistency + PRIVATE _POSIX_C_SOURCE=200809L +) +target_link_libraries(amduat_test_asl_index_put_get_consistency + PRIVATE amduat_asl_store_index_fs amduat_format pthread +) +add_test(NAME asl_index_put_get_consistency + COMMAND amduat_test_asl_index_put_get_consistency) + add_executable(amduat_test_asl_index_accel tests/asl/test_asl_index_accel.c) target_include_directories(amduat_test_asl_index_accel PRIVATE ${AMDUAT_INTERNAL_DIR} diff --git a/include/amduat/asl/asl_store_index_fs.h b/include/amduat/asl/asl_store_index_fs.h index ba464d8..1b19028 100644 --- a/include/amduat/asl/asl_store_index_fs.h +++ b/include/amduat/asl/asl_store_index_fs.h @@ -3,8 +3,10 @@ #include "amduat/asl/store.h" +#include #include #include +#include #ifdef __cplusplus extern "C" { @@ -42,6 +44,9 @@ typedef struct { amduat_asl_snapshot_id_t next_snapshot_id; bool snapshot_state_initialized; void *open_segments; + pthread_mutex_t write_mutex; + uint32_t write_depth; + int write_lock_fd; } amduat_asl_store_index_fs_t; bool amduat_asl_store_index_fs_init(amduat_asl_store_index_fs_t *fs, diff --git a/src/adapters/asl_pointer_fs/asl_pointer_fs.c b/src/adapters/asl_pointer_fs/asl_pointer_fs.c index 49666b8..1e1fcb4 100644 --- a/src/adapters/asl_pointer_fs/asl_pointer_fs.c +++ b/src/adapters/asl_pointer_fs/asl_pointer_fs.c @@ -472,6 +472,7 @@ static amduat_asl_pointer_error_t amduat_asl_pointer_write_head( bool has_prev) { char *tmp_path; size_t tmp_len; + int tmp_fd; FILE *fp; uint8_t header[AMDUAT_ASL_POINTER_MAGIC_LEN + 4u + 1u]; amduat_octets_t ref_bytes = amduat_octets(NULL, 0u); @@ -503,17 +504,25 @@ static amduat_asl_pointer_error_t amduat_asl_pointer_write_head( prev_len = 0u; } - tmp_len = strlen(path) + 5u; + tmp_len = strlen(path) + sizeof(".tmp.XXXXXX"); tmp_path = (char *)malloc(tmp_len); if (tmp_path == NULL) { free((void *)ref_bytes.data); free((void *)prev_bytes.data); return AMDUAT_ASL_POINTER_ERR_IO; } - snprintf(tmp_path, tmp_len, "%s.tmp", path); - - fp = fopen(tmp_path, "wb"); + snprintf(tmp_path, tmp_len, "%s.tmp.XXXXXX", path); + tmp_fd = mkstemp(tmp_path); + if (tmp_fd < 0) { + free(tmp_path); + free((void *)ref_bytes.data); + free((void *)prev_bytes.data); + return AMDUAT_ASL_POINTER_ERR_IO; + } + fp = fdopen(tmp_fd, "wb"); if (fp == NULL) { + close(tmp_fd); + (void)remove(tmp_path); free(tmp_path); free((void *)ref_bytes.data); free((void *)prev_bytes.data); diff --git a/src/adapters/asl_store_index_fs/asl_store_index_fs.c b/src/adapters/asl_store_index_fs/asl_store_index_fs.c index 73644d8..da88c41 100644 --- a/src/adapters/asl_store_index_fs/asl_store_index_fs.c +++ b/src/adapters/asl_store_index_fs/asl_store_index_fs.c @@ -16,11 +16,13 @@ #include #include #include +#include #include #include #include #include #include +#include #include #include #include @@ -130,6 +132,92 @@ static bool amduat_asl_store_index_fs_current_state_impl( void *ctx, amduat_asl_index_state_t *out_state); +static amduat_asl_store_error_t amduat_asl_store_index_fs_put_indexed_impl( + void *ctx, + amduat_artifact_t artifact, + amduat_reference_t *out_ref, + amduat_asl_index_state_t *out_state); + +static int amduat_asl_store_index_fs_lockfile_acquire( + amduat_asl_store_index_fs_t *fs) { + char lock_path[AMDUAT_ASL_STORE_INDEX_FS_ROOT_MAX + 32]; + int fd; + + if (fs == NULL || fs->root_path[0] == '\0') { + return -1; + } + if (snprintf(lock_path, sizeof(lock_path), "%s/.index-write.lock", + fs->root_path) >= (int)sizeof(lock_path)) { + return -1; + } + fd = open(lock_path, O_CREAT | O_RDWR, 0644); + if (fd < 0) { + return -1; + } + while (flock(fd, LOCK_EX) != 0) { + if (errno == EINTR) { + continue; + } + close(fd); + return -1; + } + return fd; +} + +static void amduat_asl_store_index_fs_lockfile_release(int fd) { + if (fd < 0) { + return; + } + while (flock(fd, LOCK_UN) != 0) { + if (errno == EINTR) { + continue; + } + break; + } + close(fd); +} + +static amduat_asl_store_error_t amduat_asl_store_index_fs_begin_write( + amduat_asl_store_index_fs_t *fs) { + int lock_fd; + + if (fs == NULL) { + return AMDUAT_ASL_STORE_ERR_IO; + } + if (pthread_mutex_lock(&fs->write_mutex) != 0) { + return AMDUAT_ASL_STORE_ERR_IO; + } + if (fs->write_depth == 0u) { + lock_fd = amduat_asl_store_index_fs_lockfile_acquire(fs); + if (lock_fd < 0) { + pthread_mutex_unlock(&fs->write_mutex); + return AMDUAT_ASL_STORE_ERR_IO; + } + fs->write_lock_fd = lock_fd; + } + fs->write_depth += 1u; + return AMDUAT_ASL_STORE_OK; +} + +static void amduat_asl_store_index_fs_end_write(amduat_asl_store_index_fs_t *fs) { + int lock_fd; + + if (fs == NULL) { + return; + } + if (fs->write_depth == 0u) { + pthread_mutex_unlock(&fs->write_mutex); + return; + } + fs->write_depth -= 1u; + if (fs->write_depth == 0u) { + lock_fd = fs->write_lock_fd; + fs->write_lock_fd = -1; + amduat_asl_store_index_fs_lockfile_release(lock_fd); + } + pthread_mutex_unlock(&fs->write_mutex); +} + static const uint8_t k_amduat_asl_store_index_fs_log_magic[ AMDUAT_ASL_STORE_INDEX_FS_LOG_MAGIC_LEN] = {'A', 'S', 'L', 'L', 'O', 'G', '0', '1'}; @@ -1162,7 +1250,7 @@ static bool amduat_asl_store_index_fs_encode_artifact_ref( return true; } -static amduat_asl_store_error_t amduat_asl_store_index_fs_append_log_record( +static amduat_asl_store_error_t amduat_asl_store_index_fs_append_log_record_unlocked( amduat_asl_store_index_fs_t *fs, uint32_t record_type, const uint8_t *payload, @@ -1274,6 +1362,30 @@ static amduat_asl_store_error_t amduat_asl_store_index_fs_append_log_record( return AMDUAT_ASL_STORE_OK; } +static amduat_asl_store_error_t amduat_asl_store_index_fs_append_log_record( + amduat_asl_store_index_fs_t *fs, + uint32_t record_type, + const uint8_t *payload, + size_t payload_len, + amduat_asl_index_state_t *out_state) { + amduat_asl_store_error_t err; + + if (fs == NULL) { + return AMDUAT_ASL_STORE_ERR_IO; + } + err = amduat_asl_store_index_fs_begin_write(fs); + if (err != AMDUAT_ASL_STORE_OK) { + return err; + } + err = amduat_asl_store_index_fs_append_log_record_unlocked(fs, + record_type, + payload, + payload_len, + out_state); + amduat_asl_store_index_fs_end_write(fs); + return err; +} + static bool amduat_asl_store_index_fs_open_segment_contains( const amduat_asl_store_index_fs_open_segment_t *segment, amduat_reference_t ref) { @@ -3987,7 +4099,7 @@ static amduat_asl_store_error_t amduat_asl_store_index_fs_log_scan_impl( return err; } -static amduat_asl_store_error_t amduat_asl_store_index_fs_put_indexed_impl( +static amduat_asl_store_error_t amduat_asl_store_index_fs_put_indexed_unlocked_impl( void *ctx, amduat_artifact_t artifact, amduat_reference_t *out_ref, @@ -4495,6 +4607,30 @@ static amduat_asl_store_error_t amduat_asl_store_index_fs_put_impl( &state); } +static amduat_asl_store_error_t amduat_asl_store_index_fs_put_indexed_impl( + void *ctx, + amduat_artifact_t artifact, + amduat_reference_t *out_ref, + amduat_asl_index_state_t *out_state) { + amduat_asl_store_index_fs_t *fs; + amduat_asl_store_error_t err; + + if (ctx == NULL || out_ref == NULL || out_state == NULL) { + return AMDUAT_ASL_STORE_ERR_IO; + } + fs = (amduat_asl_store_index_fs_t *)ctx; + err = amduat_asl_store_index_fs_begin_write(fs); + if (err != AMDUAT_ASL_STORE_OK) { + return err; + } + err = amduat_asl_store_index_fs_put_indexed_unlocked_impl(ctx, + artifact, + out_ref, + out_state); + amduat_asl_store_index_fs_end_write(fs); + return err; +} + static amduat_asl_store_error_t amduat_asl_store_index_fs_get_impl( void *ctx, amduat_reference_t ref, @@ -5045,6 +5181,7 @@ bool amduat_asl_store_index_fs_init(amduat_asl_store_index_fs_t *fs, amduat_asl_store_config_t config, const char *root_path) { size_t len; + pthread_mutexattr_t mutex_attr; if (fs == NULL || root_path == NULL) { return false; @@ -5074,6 +5211,20 @@ bool amduat_asl_store_index_fs_init(amduat_asl_store_index_fs_t *fs, fs->next_snapshot_id = 0u; fs->snapshot_state_initialized = false; fs->open_segments = NULL; + fs->write_depth = 0u; + fs->write_lock_fd = -1; + if (pthread_mutexattr_init(&mutex_attr) != 0) { + return false; + } + if (pthread_mutexattr_settype(&mutex_attr, PTHREAD_MUTEX_RECURSIVE) != 0) { + pthread_mutexattr_destroy(&mutex_attr); + return false; + } + if (pthread_mutex_init(&fs->write_mutex, &mutex_attr) != 0) { + pthread_mutexattr_destroy(&mutex_attr); + return false; + } + pthread_mutexattr_destroy(&mutex_attr); return true; } diff --git a/src/core/asl_log_store.c b/src/core/asl_log_store.c index 1b2d9d1..955d2b6 100644 --- a/src/core/asl_log_store.c +++ b/src/core/asl_log_store.c @@ -1,4 +1,5 @@ #include "amduat/asl/log_store.h" +#include "amduat/asl/ref_text.h" #include "amduat/enc/asl1_core_codec.h" #include "amduat/util/log.h" @@ -525,6 +526,9 @@ amduat_asl_store_error_t amduat_asl_log_append( } if (!amduat_asl_log_build_pointer_name(log_name, &pointer_name)) { + amduat_log(AMDUAT_LOG_ERROR, + "asl_log_append: build pointer name failed (log=%s)", + log_name != NULL ? log_name : "(null)"); return AMDUAT_ASL_STORE_ERR_IO; } entry_count = (uint32_t)entries_len; @@ -541,6 +545,12 @@ amduat_asl_store_error_t amduat_asl_log_append( ptr_err = amduat_asl_pointer_get(&log_store->pointer_store, pointer_name, &head_exists, &head_ref); if (ptr_err != AMDUAT_ASL_POINTER_OK) { + amduat_log(AMDUAT_LOG_ERROR, + "asl_log_append: pointer_get failed (log=%s pointer=%s attempt=%u err=%d)", + log_name != NULL ? log_name : "(null)", + pointer_name != NULL ? pointer_name : "(null)", + (unsigned)attempt, + (int)ptr_err); free(pointer_name); return AMDUAT_ASL_STORE_ERR_IO; } @@ -549,6 +559,16 @@ amduat_asl_store_error_t amduat_asl_log_append( store_err = amduat_asl_store_get(log_store->store, head_ref, &head_artifact); if (store_err != AMDUAT_ASL_STORE_OK) { + char *head_ref_hex = NULL; + (void)amduat_asl_ref_encode_hex(head_ref, &head_ref_hex); + amduat_log(AMDUAT_LOG_ERROR, + "asl_log_append: store_get head failed (log=%s pointer=%s attempt=%u err=%d head_ref=%s)", + log_name != NULL ? log_name : "(null)", + pointer_name != NULL ? pointer_name : "(null)", + (unsigned)attempt, + (int)store_err, + head_ref_hex != NULL ? head_ref_hex : "(hex-encode-failed)"); + free(head_ref_hex); amduat_reference_free(&head_ref); free(pointer_name); return store_err; @@ -564,6 +584,12 @@ amduat_asl_store_error_t amduat_asl_log_append( &head_chunk); amduat_artifact_free(&head_artifact); if (store_err != AMDUAT_ASL_STORE_OK) { + amduat_log(AMDUAT_LOG_ERROR, + "asl_log_append: decode head chunk failed (log=%s pointer=%s attempt=%u err=%d)", + log_name != NULL ? log_name : "(null)", + pointer_name != NULL ? pointer_name : "(null)", + (unsigned)attempt, + (int)store_err); amduat_reference_free(&head_ref); free(pointer_name); return store_err; @@ -598,6 +624,12 @@ amduat_asl_store_error_t amduat_asl_log_append( store_err = amduat_asl_log_encode_chunk(&new_chunk, &encoded); if (store_err != AMDUAT_ASL_STORE_OK) { + amduat_log(AMDUAT_LOG_ERROR, + "asl_log_append: encode chunk failed (log=%s pointer=%s attempt=%u err=%d)", + log_name != NULL ? log_name : "(null)", + pointer_name != NULL ? pointer_name : "(null)", + (unsigned)attempt, + (int)store_err); if (head_exists) { amduat_reference_free(&head_ref); } @@ -609,12 +641,38 @@ amduat_asl_store_error_t amduat_asl_log_append( store_err = amduat_asl_store_put(log_store->store, artifact, &new_ref); free((void *)encoded.data); if (store_err != AMDUAT_ASL_STORE_OK) { + amduat_log(AMDUAT_LOG_ERROR, + "asl_log_append: store_put new chunk failed (log=%s pointer=%s attempt=%u err=%d)", + log_name != NULL ? log_name : "(null)", + pointer_name != NULL ? pointer_name : "(null)", + (unsigned)attempt, + (int)store_err); if (head_exists) { amduat_reference_free(&head_ref); } free(pointer_name); return store_err; } + { + amduat_artifact_t verify_artifact; + amduat_asl_store_error_t verify_err; + char *new_ref_hex = NULL; + (void)amduat_asl_ref_encode_hex(new_ref, &new_ref_hex); + verify_err = amduat_asl_store_get(log_store->store, new_ref, + &verify_artifact); + if (verify_err == AMDUAT_ASL_STORE_OK) { + amduat_artifact_free(&verify_artifact); + } else { + amduat_log(AMDUAT_LOG_ERROR, + "asl_log_append: store_put verify get failed (log=%s pointer=%s attempt=%u err=%d new_ref=%s)", + log_name != NULL ? log_name : "(null)", + pointer_name != NULL ? pointer_name : "(null)", + (unsigned)attempt, + (int)verify_err, + new_ref_hex != NULL ? new_ref_hex : "(hex-encode-failed)"); + } + free(new_ref_hex); + } { bool swapped = false; @@ -629,6 +687,12 @@ amduat_asl_store_error_t amduat_asl_log_append( amduat_reference_free(&head_ref); } if (cas_err != AMDUAT_ASL_POINTER_OK) { + amduat_log(AMDUAT_LOG_ERROR, + "asl_log_append: pointer_cas failed (log=%s pointer=%s attempt=%u err=%d)", + log_name != NULL ? log_name : "(null)", + pointer_name != NULL ? pointer_name : "(null)", + (unsigned)attempt, + (int)cas_err); free(pointer_name); return AMDUAT_ASL_STORE_ERR_IO; } @@ -645,6 +709,10 @@ amduat_asl_store_error_t amduat_asl_log_append( } } + amduat_log(AMDUAT_LOG_ERROR, + "asl_log_append: max retries exhausted (log=%s pointer=%s)", + log_name != NULL ? log_name : "(null)", + pointer_name != NULL ? pointer_name : "(null)"); free(pointer_name); return AMDUAT_ASL_STORE_ERR_IO; } diff --git a/tests/asl/test_asl_index_put_get_consistency.c b/tests/asl/test_asl_index_put_get_consistency.c new file mode 100644 index 0000000..1d0aa00 --- /dev/null +++ b/tests/asl/test_asl_index_put_get_consistency.c @@ -0,0 +1,176 @@ +#include "amduat/asl/asl_store_index_fs.h" +#include "amduat/asl/store.h" +#include "amduat/enc/asl1_core.h" +#include "amduat/hash/asl1.h" + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +static bool join_path(const char *base, const char *segment, char **out_path) { + size_t base_len; + size_t seg_len; + bool needs_sep; + size_t total_len; + char *buffer; + size_t offset; + + if (base == NULL || segment == NULL || out_path == NULL) { + return false; + } + base_len = strlen(base); + seg_len = strlen(segment); + if (base_len == 0u || seg_len == 0u) { + return false; + } + needs_sep = base[base_len - 1u] != '/'; + total_len = base_len + (needs_sep ? 1u : 0u) + seg_len + 1u; + buffer = (char *)malloc(total_len); + if (buffer == NULL) { + return false; + } + offset = 0u; + memcpy(buffer + offset, base, base_len); + offset += base_len; + if (needs_sep) { + buffer[offset++] = '/'; + } + memcpy(buffer + offset, segment, seg_len); + offset += seg_len; + buffer[offset] = '\0'; + *out_path = buffer; + return true; +} + +static bool remove_tree(const char *path) { + struct stat st; + DIR *dir; + struct dirent *entry; + + if (path == NULL) { + return false; + } + if (lstat(path, &st) != 0) { + return errno == ENOENT; + } + if (!S_ISDIR(st.st_mode)) { + return unlink(path) == 0; + } + dir = opendir(path); + if (dir == NULL) { + return false; + } + while ((entry = readdir(dir)) != NULL) { + char *child = NULL; + if (strcmp(entry->d_name, ".") == 0 || strcmp(entry->d_name, "..") == 0) { + continue; + } + if (!join_path(path, entry->d_name, &child)) { + closedir(dir); + return false; + } + if (!remove_tree(child)) { + free(child); + closedir(dir); + return false; + } + free(child); + } + if (closedir(dir) != 0) { + return false; + } + return rmdir(path) == 0; +} + +static char *make_temp_root(void) { + char *tmpl = NULL; + char *root = NULL; + + tmpl = strdup("/tmp/amduat-index-putget-XXXXXX"); + if (tmpl == NULL) { + return NULL; + } + if (mkdtemp(tmpl) == NULL) { + free(tmpl); + return NULL; + } + root = tmpl; + return root; +} + +int main(void) { + amduat_asl_store_config_t config; + amduat_asl_store_index_fs_t fs; + amduat_asl_store_t store; + char *root = NULL; + int rc = 1; + size_t i; + + root = make_temp_root(); + if (root == NULL) { + fprintf(stderr, "temp root creation failed\n"); + return 1; + } + + memset(&config, 0, sizeof(config)); + config.encoding_profile_id = AMDUAT_ENC_ASL1_CORE_V1; + config.hash_id = AMDUAT_HASH_ASL1_ID_SHA256; + if (!amduat_asl_store_index_fs_init(&fs, config, root)) { + fprintf(stderr, "index fs init failed\n"); + goto cleanup; + } + amduat_asl_store_init(&store, config, amduat_asl_store_index_fs_ops(), &fs); + + for (i = 0u; i < 128u; ++i) { + char payload_buf[64]; + size_t payload_len; + amduat_artifact_t artifact; + amduat_reference_t ref; + amduat_artifact_t loaded; + amduat_asl_store_error_t err; + + payload_len = (size_t)snprintf(payload_buf, sizeof(payload_buf), + "payload-%zu", i); + if (payload_len == 0u || payload_len >= sizeof(payload_buf)) { + fprintf(stderr, "payload format failed at %zu\n", i); + goto cleanup; + } + + artifact = amduat_artifact( + amduat_octets((const uint8_t *)payload_buf, payload_len)); + ref = amduat_reference(0u, amduat_octets(NULL, 0u)); + err = amduat_asl_store_put(&store, artifact, &ref); + if (err != AMDUAT_ASL_STORE_OK) { + fprintf(stderr, "put failed at %zu: %d\n", i, (int)err); + goto cleanup; + } + + loaded = amduat_artifact(amduat_octets(NULL, 0u)); + err = amduat_asl_store_get(&store, ref, &loaded); + if (err != AMDUAT_ASL_STORE_OK) { + fprintf(stderr, "get after put failed at %zu: %d\n", i, (int)err); + amduat_reference_free(&ref); + goto cleanup; + } + amduat_artifact_free(&loaded); + amduat_reference_free(&ref); + } + + rc = 0; + +cleanup: + if (root != NULL) { + if (!remove_tree(root)) { + fprintf(stderr, "warning: cleanup failed for %s\n", root); + } + free(root); + } + return rc; +} diff --git a/tests/asl/test_asl_log_store_index_fs.c b/tests/asl/test_asl_log_store_index_fs.c new file mode 100644 index 0000000..41c64ea --- /dev/null +++ b/tests/asl/test_asl_log_store_index_fs.c @@ -0,0 +1,478 @@ +#include "amduat/asl/log_store.h" +#include "amduat/asl/asl_store_index_fs.h" +#include "amduat/asl/store.h" +#include "amduat/enc/asl1_core.h" +#include "amduat/hash/asl1.h" + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +static bool join_path(const char *base, const char *segment, char **out_path) { + size_t base_len; + size_t seg_len; + bool needs_sep; + size_t total_len; + char *buffer; + size_t offset; + + if (base == NULL || segment == NULL || out_path == NULL) { + return false; + } + base_len = strlen(base); + seg_len = strlen(segment); + if (base_len == 0u || seg_len == 0u) { + return false; + } + needs_sep = base[base_len - 1u] != '/'; + total_len = base_len + (needs_sep ? 1u : 0u) + seg_len + 1u; + buffer = (char *)malloc(total_len); + if (buffer == NULL) { + return false; + } + offset = 0u; + memcpy(buffer + offset, base, base_len); + offset += base_len; + if (needs_sep) { + buffer[offset++] = '/'; + } + memcpy(buffer + offset, segment, seg_len); + offset += seg_len; + buffer[offset] = '\0'; + *out_path = buffer; + return true; +} + +static bool remove_tree(const char *path) { + struct stat st; + DIR *dir; + struct dirent *entry; + + if (path == NULL) { + return false; + } + if (lstat(path, &st) != 0) { + return errno == ENOENT; + } + if (!S_ISDIR(st.st_mode)) { + return unlink(path) == 0; + } + dir = opendir(path); + if (dir == NULL) { + return false; + } + while ((entry = readdir(dir)) != NULL) { + char *child = NULL; + if (strcmp(entry->d_name, ".") == 0 || strcmp(entry->d_name, "..") == 0) { + continue; + } + if (!join_path(path, entry->d_name, &child)) { + closedir(dir); + return false; + } + if (!remove_tree(child)) { + free(child); + closedir(dir); + return false; + } + free(child); + } + if (closedir(dir) != 0) { + return false; + } + return rmdir(path) == 0; +} + +static bool refs_equal(amduat_reference_t a, amduat_reference_t b) { + if (a.hash_id != b.hash_id) { + return false; + } + if (a.digest.len != b.digest.len) { + return false; + } + if (a.digest.len == 0u) { + return true; + } + if (a.digest.data == NULL || b.digest.data == NULL) { + return false; + } + return memcmp(a.digest.data, b.digest.data, a.digest.len) == 0; +} + +typedef struct { + amduat_asl_store_t *store; + amduat_asl_log_store_t *log_store; + const char *log_name; + uint16_t kind; + size_t iterations; + amduat_asl_store_error_t first_err; +} append_worker_t; + +typedef struct { + amduat_asl_store_t *store; + uint16_t kind; + size_t iterations; + amduat_asl_store_error_t first_err; +} put_worker_t; + +static void *append_worker_main(void *opaque) { + append_worker_t *worker = (append_worker_t *)opaque; + size_t i; + + if (worker == NULL || worker->store == NULL || worker->log_store == NULL || + worker->log_name == NULL) { + return NULL; + } + worker->first_err = AMDUAT_ASL_STORE_OK; + + for (i = 0u; i < worker->iterations; ++i) { + char payload_buf[64]; + size_t payload_len; + amduat_artifact_t payload_artifact; + amduat_reference_t payload_ref; + amduat_asl_log_entry_t entry; + amduat_asl_store_error_t err; + uint64_t first_offset = 0u; + + payload_len = (size_t)snprintf(payload_buf, sizeof(payload_buf), + "worker-%u-%zu", (unsigned)worker->kind, i); + if (payload_len == 0u || payload_len >= sizeof(payload_buf)) { + worker->first_err = AMDUAT_ASL_STORE_ERR_IO; + return NULL; + } + + payload_artifact = + amduat_artifact(amduat_octets((const uint8_t *)payload_buf, payload_len)); + payload_ref = amduat_reference(0u, amduat_octets(NULL, 0u)); + err = amduat_asl_store_put(worker->store, payload_artifact, &payload_ref); + if (err != AMDUAT_ASL_STORE_OK) { + worker->first_err = err; + return NULL; + } + + memset(&entry, 0, sizeof(entry)); + entry.kind = worker->kind; + entry.payload_ref = payload_ref; + err = amduat_asl_log_append(worker->log_store, + worker->log_name, + &entry, + 1u, + &first_offset); + amduat_reference_free(&payload_ref); + if (err != AMDUAT_ASL_STORE_OK) { + worker->first_err = err; + return NULL; + } + } + + return NULL; +} + +static void *put_worker_main(void *opaque) { + put_worker_t *worker = (put_worker_t *)opaque; + size_t i; + + if (worker == NULL || worker->store == NULL) { + return NULL; + } + worker->first_err = AMDUAT_ASL_STORE_OK; + + for (i = 0u; i < worker->iterations; ++i) { + char payload_buf[64]; + size_t payload_len; + amduat_artifact_t payload_artifact; + amduat_reference_t payload_ref; + amduat_asl_store_error_t err; + + payload_len = (size_t)snprintf(payload_buf, sizeof(payload_buf), + "put-only-%u-%zu", (unsigned)worker->kind, i); + if (payload_len == 0u || payload_len >= sizeof(payload_buf)) { + worker->first_err = AMDUAT_ASL_STORE_ERR_IO; + return NULL; + } + + payload_artifact = + amduat_artifact(amduat_octets((const uint8_t *)payload_buf, payload_len)); + payload_ref = amduat_reference(0u, amduat_octets(NULL, 0u)); + err = amduat_asl_store_put(worker->store, payload_artifact, &payload_ref); + amduat_reference_free(&payload_ref); + if (err != AMDUAT_ASL_STORE_OK) { + worker->first_err = err; + return NULL; + } + } + + return NULL; +} + +int main(void) { + amduat_asl_store_config_t config; + amduat_asl_store_index_fs_t fs_a; + amduat_asl_store_index_fs_t fs_b; + amduat_asl_store_t store_a; + amduat_asl_store_t store_b; + amduat_asl_log_store_t log_a; + amduat_asl_log_store_t log_b; + amduat_artifact_t payload_artifact_a; + amduat_artifact_t payload_artifact_b; + amduat_reference_t payload_ref_a; + amduat_reference_t payload_ref_b; + amduat_asl_log_entry_t entry_a; + amduat_asl_log_entry_t entry_b; + amduat_asl_log_entry_t *out_entries = NULL; + size_t out_len = 0u; + uint64_t out_next_offset = 0u; + bool out_end = false; + uint64_t first_offset = 0u; + const char *log_name = "collection/space/app1/daemon/edges/log"; + enum { WORKER_ITERS = 32 }; + append_worker_t worker_a; + append_worker_t worker_b; + append_worker_t mixed_append_worker; + put_worker_t mixed_put_worker; + pthread_t thread_a; + pthread_t thread_b; + pthread_t mixed_append_thread; + pthread_t mixed_put_thread; + bool started_a = false; + bool started_b = false; + bool started_mixed_append = false; + bool started_mixed_put = false; + char *root = NULL; + int rc = 1; + + root = strdup("/tmp/amduat-log-index-XXXXXX"); + if (root == NULL) { + fprintf(stderr, "alloc root template failed\n"); + return 1; + } + if (mkdtemp(root) == NULL) { + fprintf(stderr, "mkdtemp failed\n"); + goto cleanup; + } + + memset(&config, 0, sizeof(config)); + config.encoding_profile_id = AMDUAT_ENC_ASL1_CORE_V1; + config.hash_id = AMDUAT_HASH_ASL1_ID_SHA256; + + if (!amduat_asl_store_index_fs_init(&fs_a, config, root) || + !amduat_asl_store_index_fs_init(&fs_b, config, root)) { + fprintf(stderr, "index fs init failed\n"); + goto cleanup; + } + amduat_asl_store_init(&store_a, config, amduat_asl_store_index_fs_ops(), &fs_a); + amduat_asl_store_init(&store_b, config, amduat_asl_store_index_fs_ops(), &fs_b); + + if (!amduat_asl_log_store_init(&log_a, root, &store_a, NULL) || + !amduat_asl_log_store_init(&log_b, root, &store_b, NULL)) { + fprintf(stderr, "log store init failed\n"); + goto cleanup; + } + + payload_artifact_a = amduat_artifact(amduat_octets((const uint8_t *)"payload-a", 9u)); + payload_ref_a = amduat_reference(0u, amduat_octets(NULL, 0u)); + if (amduat_asl_store_put(&store_a, payload_artifact_a, &payload_ref_a) != + AMDUAT_ASL_STORE_OK) { + fprintf(stderr, "put payload a failed\n"); + goto cleanup; + } + + memset(&entry_a, 0, sizeof(entry_a)); + entry_a.kind = 1u; + entry_a.payload_ref = payload_ref_a; + if (amduat_asl_log_append(&log_a, log_name, &entry_a, 1u, &first_offset) != + AMDUAT_ASL_STORE_OK) { + fprintf(stderr, "first append failed\n"); + goto cleanup; + } + if (first_offset != 0u) { + fprintf(stderr, "first offset mismatch: %" PRIu64 "\n", first_offset); + goto cleanup; + } + + payload_artifact_b = amduat_artifact(amduat_octets((const uint8_t *)"payload-b", 9u)); + payload_ref_b = amduat_reference(0u, amduat_octets(NULL, 0u)); + if (amduat_asl_store_put(&store_b, payload_artifact_b, &payload_ref_b) != + AMDUAT_ASL_STORE_OK) { + fprintf(stderr, "put payload b failed\n"); + goto cleanup; + } + + memset(&entry_b, 0, sizeof(entry_b)); + entry_b.kind = 2u; + entry_b.payload_ref = payload_ref_b; + if (amduat_asl_log_append(&log_b, log_name, &entry_b, 1u, &first_offset) != + AMDUAT_ASL_STORE_OK) { + fprintf(stderr, "second append failed\n"); + goto cleanup; + } + if (first_offset != 1u) { + fprintf(stderr, "second first_offset mismatch: %" PRIu64 "\n", first_offset); + goto cleanup; + } + + if (amduat_asl_log_read(&log_a, + log_name, + 0u, + 8u, + &out_entries, + &out_len, + &out_next_offset, + &out_end) != AMDUAT_ASL_STORE_OK) { + fprintf(stderr, "log read failed\n"); + goto cleanup; + } + + if (out_len != 2u || out_next_offset != 2u || !out_end) { + fprintf(stderr, "readback shape mismatch len=%zu next=%" PRIu64 " end=%d\n", + out_len, out_next_offset, out_end ? 1 : 0); + goto cleanup; + } + if (out_entries[0].kind != 1u || !refs_equal(out_entries[0].payload_ref, payload_ref_a)) { + fprintf(stderr, "entry0 mismatch\n"); + goto cleanup; + } + if (out_entries[1].kind != 2u || !refs_equal(out_entries[1].payload_ref, payload_ref_b)) { + fprintf(stderr, "entry1 mismatch\n"); + goto cleanup; + } + amduat_asl_log_entries_free(out_entries, out_len); + out_entries = NULL; + out_len = 0u; + + memset(&worker_a, 0, sizeof(worker_a)); + worker_a.store = &store_a; + worker_a.log_store = &log_a; + worker_a.log_name = log_name; + worker_a.kind = 10u; + worker_a.iterations = WORKER_ITERS; + + memset(&worker_b, 0, sizeof(worker_b)); + worker_b.store = &store_b; + worker_b.log_store = &log_b; + worker_b.log_name = log_name; + worker_b.kind = 11u; + worker_b.iterations = WORKER_ITERS; + + if (pthread_create(&thread_a, NULL, append_worker_main, &worker_a) != 0) { + fprintf(stderr, "pthread_create failed\n"); + goto cleanup; + } + started_a = true; + if (pthread_create(&thread_b, NULL, append_worker_main, &worker_b) != 0) { + fprintf(stderr, "pthread_create failed\n"); + goto cleanup; + } + started_b = true; + (void)pthread_join(thread_a, NULL); + (void)pthread_join(thread_b, NULL); + started_a = false; + started_b = false; + + if (worker_a.first_err != AMDUAT_ASL_STORE_OK) { + fprintf(stderr, "worker_a failed err=%d\n", (int)worker_a.first_err); + goto cleanup; + } + if (worker_b.first_err != AMDUAT_ASL_STORE_OK) { + fprintf(stderr, "worker_b failed err=%d\n", (int)worker_b.first_err); + goto cleanup; + } + + if (amduat_asl_log_read(&log_a, + log_name, + 0u, + 4096u, + &out_entries, + &out_len, + &out_next_offset, + &out_end) != AMDUAT_ASL_STORE_OK) { + fprintf(stderr, "log read after stress failed\n"); + goto cleanup; + } + if (out_len != (2u + 2u * WORKER_ITERS) || out_next_offset != out_len || + !out_end) { + fprintf(stderr, + "stress read mismatch len=%zu next=%" PRIu64 " expected=%u end=%d\n", + out_len, out_next_offset, (unsigned)(2u + 2u * WORKER_ITERS), + out_end ? 1 : 0); + goto cleanup; + } + amduat_asl_log_entries_free(out_entries, out_len); + out_entries = NULL; + out_len = 0u; + + memset(&mixed_append_worker, 0, sizeof(mixed_append_worker)); + mixed_append_worker.store = &store_a; + mixed_append_worker.log_store = &log_a; + mixed_append_worker.log_name = log_name; + mixed_append_worker.kind = 12u; + mixed_append_worker.iterations = WORKER_ITERS; + + memset(&mixed_put_worker, 0, sizeof(mixed_put_worker)); + mixed_put_worker.store = &store_b; + mixed_put_worker.kind = 13u; + mixed_put_worker.iterations = WORKER_ITERS * 4u; + + if (pthread_create(&mixed_append_thread, NULL, append_worker_main, + &mixed_append_worker) != 0) { + fprintf(stderr, "pthread_create mixed append failed\n"); + goto cleanup; + } + started_mixed_append = true; + if (pthread_create(&mixed_put_thread, NULL, put_worker_main, + &mixed_put_worker) != 0) { + fprintf(stderr, "pthread_create mixed put failed\n"); + goto cleanup; + } + started_mixed_put = true; + (void)pthread_join(mixed_append_thread, NULL); + (void)pthread_join(mixed_put_thread, NULL); + started_mixed_append = false; + started_mixed_put = false; + + if (mixed_append_worker.first_err != AMDUAT_ASL_STORE_OK) { + fprintf(stderr, "mixed append worker failed err=%d\n", + (int)mixed_append_worker.first_err); + goto cleanup; + } + if (mixed_put_worker.first_err != AMDUAT_ASL_STORE_OK) { + fprintf(stderr, "mixed put worker failed err=%d\n", + (int)mixed_put_worker.first_err); + goto cleanup; + } + + rc = 0; + +cleanup: + if (started_a) { + (void)pthread_join(thread_a, NULL); + } + if (started_b) { + (void)pthread_join(thread_b, NULL); + } + if (started_mixed_append) { + (void)pthread_join(mixed_append_thread, NULL); + } + if (started_mixed_put) { + (void)pthread_join(mixed_put_thread, NULL); + } + amduat_asl_log_entries_free(out_entries, out_len); + amduat_reference_free(&payload_ref_a); + amduat_reference_free(&payload_ref_b); + if (root != NULL) { + if (!remove_tree(root)) { + fprintf(stderr, "warning: cleanup failed for %s\n", root); + } + free(root); + } + return rc; +}