#include "amduat/asl/log_store.h" #include "amduat/asl/asl_store_index_fs.h" #include "amduat/asl/store.h" #include "amduat/enc/asl1_core.h" #include "amduat/hash/asl1.h" #include #include #include #include #include #include #include #include #include #include #include #include static bool join_path(const char *base, const char *segment, char **out_path) { size_t base_len; size_t seg_len; bool needs_sep; size_t total_len; char *buffer; size_t offset; if (base == NULL || segment == NULL || out_path == NULL) { return false; } base_len = strlen(base); seg_len = strlen(segment); if (base_len == 0u || seg_len == 0u) { return false; } needs_sep = base[base_len - 1u] != '/'; total_len = base_len + (needs_sep ? 1u : 0u) + seg_len + 1u; buffer = (char *)malloc(total_len); if (buffer == NULL) { return false; } offset = 0u; memcpy(buffer + offset, base, base_len); offset += base_len; if (needs_sep) { buffer[offset++] = '/'; } memcpy(buffer + offset, segment, seg_len); offset += seg_len; buffer[offset] = '\0'; *out_path = buffer; return true; } static bool remove_tree(const char *path) { struct stat st; DIR *dir; struct dirent *entry; if (path == NULL) { return false; } if (lstat(path, &st) != 0) { return errno == ENOENT; } if (!S_ISDIR(st.st_mode)) { return unlink(path) == 0; } dir = opendir(path); if (dir == NULL) { return false; } while ((entry = readdir(dir)) != NULL) { char *child = NULL; if (strcmp(entry->d_name, ".") == 0 || strcmp(entry->d_name, "..") == 0) { continue; } if (!join_path(path, entry->d_name, &child)) { closedir(dir); return false; } if (!remove_tree(child)) { free(child); closedir(dir); return false; } free(child); } if (closedir(dir) != 0) { return false; } return rmdir(path) == 0; } static bool refs_equal(amduat_reference_t a, amduat_reference_t b) { if (a.hash_id != b.hash_id) { return false; } if (a.digest.len != b.digest.len) { return false; } if (a.digest.len == 0u) { return true; } if (a.digest.data == NULL || b.digest.data == NULL) { return false; } return memcmp(a.digest.data, b.digest.data, a.digest.len) == 0; } typedef struct { amduat_asl_store_t *store; amduat_asl_log_store_t *log_store; const char *log_name; uint16_t kind; size_t iterations; amduat_asl_store_error_t first_err; uint64_t *observed_offsets; } append_worker_t; typedef struct { amduat_asl_store_t *store; uint16_t kind; size_t iterations; amduat_asl_store_error_t first_err; } put_worker_t; static void *append_worker_main(void *opaque) { append_worker_t *worker = (append_worker_t *)opaque; size_t i; if (worker == NULL || worker->store == NULL || worker->log_store == NULL || worker->log_name == NULL) { return NULL; } worker->first_err = AMDUAT_ASL_STORE_OK; for (i = 0u; i < worker->iterations; ++i) { char payload_buf[64]; size_t payload_len; amduat_artifact_t payload_artifact; amduat_reference_t payload_ref; amduat_asl_log_entry_t entry; amduat_asl_store_error_t err; uint64_t first_offset = 0u; payload_len = (size_t)snprintf(payload_buf, sizeof(payload_buf), "worker-%u-%zu", (unsigned)worker->kind, i); if (payload_len == 0u || payload_len >= sizeof(payload_buf)) { worker->first_err = AMDUAT_ASL_STORE_ERR_IO; return NULL; } payload_artifact = amduat_artifact(amduat_octets((const uint8_t *)payload_buf, payload_len)); payload_ref = amduat_reference(0u, amduat_octets(NULL, 0u)); err = amduat_asl_store_put(worker->store, payload_artifact, &payload_ref); if (err != AMDUAT_ASL_STORE_OK) { worker->first_err = err; return NULL; } memset(&entry, 0, sizeof(entry)); entry.kind = worker->kind; entry.payload_ref = payload_ref; err = amduat_asl_log_append(worker->log_store, worker->log_name, &entry, 1u, &first_offset); amduat_reference_free(&payload_ref); if (err != AMDUAT_ASL_STORE_OK) { worker->first_err = err; return NULL; } if (worker->observed_offsets != NULL && i < worker->iterations) { worker->observed_offsets[i] = first_offset; } } return NULL; } static void *put_worker_main(void *opaque) { put_worker_t *worker = (put_worker_t *)opaque; size_t i; if (worker == NULL || worker->store == NULL) { return NULL; } worker->first_err = AMDUAT_ASL_STORE_OK; for (i = 0u; i < worker->iterations; ++i) { char payload_buf[64]; size_t payload_len; amduat_artifact_t payload_artifact; amduat_reference_t payload_ref; amduat_asl_store_error_t err; payload_len = (size_t)snprintf(payload_buf, sizeof(payload_buf), "put-only-%u-%zu", (unsigned)worker->kind, i); if (payload_len == 0u || payload_len >= sizeof(payload_buf)) { worker->first_err = AMDUAT_ASL_STORE_ERR_IO; return NULL; } payload_artifact = amduat_artifact(amduat_octets((const uint8_t *)payload_buf, payload_len)); payload_ref = amduat_reference(0u, amduat_octets(NULL, 0u)); err = amduat_asl_store_put(worker->store, payload_artifact, &payload_ref); amduat_reference_free(&payload_ref); if (err != AMDUAT_ASL_STORE_OK) { worker->first_err = err; return NULL; } } return NULL; } int main(void) { amduat_asl_store_config_t config; amduat_asl_store_index_fs_t fs_a; amduat_asl_store_index_fs_t fs_b; amduat_asl_store_t store_a; amduat_asl_store_t store_b; amduat_asl_log_store_t log_a; amduat_asl_log_store_t log_b; amduat_artifact_t payload_artifact_a; amduat_artifact_t payload_artifact_b; amduat_reference_t payload_ref_a; amduat_reference_t payload_ref_b; amduat_asl_log_entry_t entry_a; amduat_asl_log_entry_t entry_b; amduat_asl_log_entry_t *out_entries = NULL; size_t out_len = 0u; uint64_t out_next_offset = 0u; bool out_end = false; uint64_t first_offset = 0u; const char *log_name = "collection/space/app1/daemon/edges/log"; enum { WORKER_ITERS = 32 }; append_worker_t worker_a; append_worker_t worker_b; append_worker_t mixed_append_worker; put_worker_t mixed_put_worker; pthread_t thread_a; pthread_t thread_b; pthread_t mixed_append_thread; pthread_t mixed_put_thread; bool started_a = false; bool started_b = false; bool started_mixed_append = false; bool started_mixed_put = false; uint64_t worker_a_offsets[WORKER_ITERS]; uint64_t worker_b_offsets[WORKER_ITERS]; char *root = NULL; int rc = 1; root = strdup("/tmp/amduat-log-index-XXXXXX"); if (root == NULL) { fprintf(stderr, "alloc root template failed\n"); return 1; } if (mkdtemp(root) == NULL) { fprintf(stderr, "mkdtemp failed\n"); goto cleanup; } memset(&config, 0, sizeof(config)); config.encoding_profile_id = AMDUAT_ENC_ASL1_CORE_V1; config.hash_id = AMDUAT_HASH_ASL1_ID_SHA256; if (!amduat_asl_store_index_fs_init(&fs_a, config, root) || !amduat_asl_store_index_fs_init(&fs_b, config, root)) { fprintf(stderr, "index fs init failed\n"); goto cleanup; } amduat_asl_store_init(&store_a, config, amduat_asl_store_index_fs_ops(), &fs_a); amduat_asl_store_init(&store_b, config, amduat_asl_store_index_fs_ops(), &fs_b); if (!amduat_asl_log_store_init(&log_a, root, &store_a, NULL) || !amduat_asl_log_store_init(&log_b, root, &store_b, NULL)) { fprintf(stderr, "log store init failed\n"); goto cleanup; } payload_artifact_a = amduat_artifact(amduat_octets((const uint8_t *)"payload-a", 9u)); payload_ref_a = amduat_reference(0u, amduat_octets(NULL, 0u)); if (amduat_asl_store_put(&store_a, payload_artifact_a, &payload_ref_a) != AMDUAT_ASL_STORE_OK) { fprintf(stderr, "put payload a failed\n"); goto cleanup; } memset(&entry_a, 0, sizeof(entry_a)); entry_a.kind = 1u; entry_a.payload_ref = payload_ref_a; if (amduat_asl_log_append(&log_a, log_name, &entry_a, 1u, &first_offset) != AMDUAT_ASL_STORE_OK) { fprintf(stderr, "first append failed\n"); goto cleanup; } if (first_offset != 0u) { fprintf(stderr, "first offset mismatch: %" PRIu64 "\n", first_offset); goto cleanup; } payload_artifact_b = amduat_artifact(amduat_octets((const uint8_t *)"payload-b", 9u)); payload_ref_b = amduat_reference(0u, amduat_octets(NULL, 0u)); if (amduat_asl_store_put(&store_b, payload_artifact_b, &payload_ref_b) != AMDUAT_ASL_STORE_OK) { fprintf(stderr, "put payload b failed\n"); goto cleanup; } memset(&entry_b, 0, sizeof(entry_b)); entry_b.kind = 2u; entry_b.payload_ref = payload_ref_b; if (amduat_asl_log_append(&log_b, log_name, &entry_b, 1u, &first_offset) != AMDUAT_ASL_STORE_OK) { fprintf(stderr, "second append failed\n"); goto cleanup; } if (first_offset != 1u) { fprintf(stderr, "second first_offset mismatch: %" PRIu64 "\n", first_offset); goto cleanup; } if (amduat_asl_log_read(&log_a, log_name, 0u, 8u, &out_entries, &out_len, &out_next_offset, &out_end) != AMDUAT_ASL_STORE_OK) { fprintf(stderr, "log read failed\n"); goto cleanup; } if (out_len != 2u || out_next_offset != 2u || !out_end) { fprintf(stderr, "readback shape mismatch len=%zu next=%" PRIu64 " end=%d\n", out_len, out_next_offset, out_end ? 1 : 0); goto cleanup; } if (out_entries[0].kind != 1u || !refs_equal(out_entries[0].payload_ref, payload_ref_a)) { fprintf(stderr, "entry0 mismatch\n"); goto cleanup; } if (out_entries[1].kind != 2u || !refs_equal(out_entries[1].payload_ref, payload_ref_b)) { fprintf(stderr, "entry1 mismatch\n"); goto cleanup; } amduat_asl_log_entries_free(out_entries, out_len); out_entries = NULL; out_len = 0u; memset(&worker_a, 0, sizeof(worker_a)); worker_a.store = &store_a; worker_a.log_store = &log_a; worker_a.log_name = log_name; worker_a.kind = 10u; worker_a.iterations = WORKER_ITERS; worker_a.observed_offsets = worker_a_offsets; memset(&worker_b, 0, sizeof(worker_b)); worker_b.store = &store_b; worker_b.log_store = &log_b; worker_b.log_name = log_name; worker_b.kind = 11u; worker_b.iterations = WORKER_ITERS; worker_b.observed_offsets = worker_b_offsets; if (pthread_create(&thread_a, NULL, append_worker_main, &worker_a) != 0) { fprintf(stderr, "pthread_create failed\n"); goto cleanup; } started_a = true; if (pthread_create(&thread_b, NULL, append_worker_main, &worker_b) != 0) { fprintf(stderr, "pthread_create failed\n"); goto cleanup; } started_b = true; (void)pthread_join(thread_a, NULL); (void)pthread_join(thread_b, NULL); started_a = false; started_b = false; if (worker_a.first_err != AMDUAT_ASL_STORE_OK) { fprintf(stderr, "worker_a failed err=%d\n", (int)worker_a.first_err); goto cleanup; } if (worker_b.first_err != AMDUAT_ASL_STORE_OK) { fprintf(stderr, "worker_b failed err=%d\n", (int)worker_b.first_err); goto cleanup; } { bool seen[2u + 2u * WORKER_ITERS]; size_t i; memset(seen, 0, sizeof(seen)); for (i = 0u; i < WORKER_ITERS; ++i) { if (worker_a_offsets[i] < 2u || worker_a_offsets[i] >= (uint64_t)(2u + 2u * WORKER_ITERS)) { fprintf(stderr, "worker_a offset out of range: %" PRIu64 "\n", worker_a_offsets[i]); goto cleanup; } if (seen[worker_a_offsets[i]]) { fprintf(stderr, "duplicate offset from worker_a: %" PRIu64 "\n", worker_a_offsets[i]); goto cleanup; } seen[worker_a_offsets[i]] = true; } for (i = 0u; i < WORKER_ITERS; ++i) { if (worker_b_offsets[i] < 2u || worker_b_offsets[i] >= (uint64_t)(2u + 2u * WORKER_ITERS)) { fprintf(stderr, "worker_b offset out of range: %" PRIu64 "\n", worker_b_offsets[i]); goto cleanup; } if (seen[worker_b_offsets[i]]) { fprintf(stderr, "duplicate offset from worker_b: %" PRIu64 "\n", worker_b_offsets[i]); goto cleanup; } seen[worker_b_offsets[i]] = true; } for (i = 2u; i < (2u + 2u * WORKER_ITERS); ++i) { if (!seen[i]) { fprintf(stderr, "missing worker offset: %zu\n", i); goto cleanup; } } } if (amduat_asl_log_read(&log_a, log_name, 0u, 4096u, &out_entries, &out_len, &out_next_offset, &out_end) != AMDUAT_ASL_STORE_OK) { fprintf(stderr, "log read after stress failed\n"); goto cleanup; } if (out_len != (2u + 2u * WORKER_ITERS) || out_next_offset != out_len || !out_end) { fprintf(stderr, "stress read mismatch len=%zu next=%" PRIu64 " expected=%u end=%d\n", out_len, out_next_offset, (unsigned)(2u + 2u * WORKER_ITERS), out_end ? 1 : 0); goto cleanup; } amduat_asl_log_entries_free(out_entries, out_len); out_entries = NULL; out_len = 0u; memset(&mixed_append_worker, 0, sizeof(mixed_append_worker)); mixed_append_worker.store = &store_a; mixed_append_worker.log_store = &log_a; mixed_append_worker.log_name = log_name; mixed_append_worker.kind = 12u; mixed_append_worker.iterations = WORKER_ITERS; memset(&mixed_put_worker, 0, sizeof(mixed_put_worker)); mixed_put_worker.store = &store_b; mixed_put_worker.kind = 13u; mixed_put_worker.iterations = WORKER_ITERS * 4u; if (pthread_create(&mixed_append_thread, NULL, append_worker_main, &mixed_append_worker) != 0) { fprintf(stderr, "pthread_create mixed append failed\n"); goto cleanup; } started_mixed_append = true; if (pthread_create(&mixed_put_thread, NULL, put_worker_main, &mixed_put_worker) != 0) { fprintf(stderr, "pthread_create mixed put failed\n"); goto cleanup; } started_mixed_put = true; (void)pthread_join(mixed_append_thread, NULL); (void)pthread_join(mixed_put_thread, NULL); started_mixed_append = false; started_mixed_put = false; if (mixed_append_worker.first_err != AMDUAT_ASL_STORE_OK) { fprintf(stderr, "mixed append worker failed err=%d\n", (int)mixed_append_worker.first_err); goto cleanup; } if (mixed_put_worker.first_err != AMDUAT_ASL_STORE_OK) { fprintf(stderr, "mixed put worker failed err=%d\n", (int)mixed_put_worker.first_err); goto cleanup; } rc = 0; cleanup: if (started_a) { (void)pthread_join(thread_a, NULL); } if (started_b) { (void)pthread_join(thread_b, NULL); } if (started_mixed_append) { (void)pthread_join(mixed_append_thread, NULL); } if (started_mixed_put) { (void)pthread_join(mixed_put_thread, NULL); } amduat_asl_log_entries_free(out_entries, out_len); amduat_reference_free(&payload_ref_a); amduat_reference_free(&payload_ref_b); if (root != NULL) { if (!remove_tree(root)) { fprintf(stderr, "warning: cleanup failed for %s\n", root); } free(root); } return rc; }