#include "amduat/asl/asl_store_index_fs.h" #include "amduat/asl/index_accel.h" #include "amduat/asl/index_bloom.h" #include "amduat/asl/ref_text.h" #include "amduat/asl/ref_derive.h" #include "amduat/asl/store.h" #include "amduat/enc/asl1_core.h" #include "amduat/enc/asl_core_index.h" #include "amduat/enc/asl_log.h" #include "amduat/format/ref.h" #include "amduat/hash/asl1.h" #include #include #include #include #include #include #include #include #include #include #include #include #include #include static bool join_path(const char *base, const char *segment, char **out_path) { size_t base_len; size_t seg_len; bool needs_sep; size_t total_len; char *buffer; size_t offset; if (base == NULL || segment == NULL || out_path == NULL) { return false; } if (base[0] == '\0' || segment[0] == '\0') { return false; } base_len = strlen(base); seg_len = strlen(segment); needs_sep = base[base_len - 1u] != '/'; total_len = base_len + (needs_sep ? 1u : 0u) + seg_len + 1u; buffer = (char *)malloc(total_len); if (buffer == NULL) { return false; } offset = 0u; memcpy(buffer + offset, base, base_len); offset += base_len; if (needs_sep) { buffer[offset++] = '/'; } memcpy(buffer + offset, segment, seg_len); offset += seg_len; buffer[offset] = '\0'; *out_path = buffer; return true; } static bool remove_tree(const char *path) { struct stat st; DIR *dir; struct dirent *entry; if (path == NULL) { return false; } if (lstat(path, &st) != 0) { return errno == ENOENT; } if (!S_ISDIR(st.st_mode)) { return unlink(path) == 0; } dir = opendir(path); if (dir == NULL) { return false; } while ((entry = readdir(dir)) != NULL) { char *child = NULL; if (strcmp(entry->d_name, ".") == 0 || strcmp(entry->d_name, "..") == 0) { continue; } if (!join_path(path, entry->d_name, &child)) { closedir(dir); return false; } if (!remove_tree(child)) { free(child); closedir(dir); return false; } free(child); } if (closedir(dir) != 0) { return false; } return rmdir(path) == 0; } static bool read_file(const char *path, uint8_t **out_bytes, size_t *out_len) { FILE *fp; long size; uint8_t *buffer; size_t read_len; if (path == NULL || out_bytes == NULL || out_len == NULL) { return false; } *out_bytes = NULL; *out_len = 0u; fp = fopen(path, "rb"); if (fp == NULL) { return false; } if (fseek(fp, 0, SEEK_END) != 0) { fclose(fp); return false; } size = ftell(fp); if (size < 0) { fclose(fp); return false; } if (fseek(fp, 0, SEEK_SET) != 0) { fclose(fp); return false; } buffer = (uint8_t *)malloc((size_t)size); if (buffer == NULL) { fclose(fp); return false; } read_len = fread(buffer, 1u, (size_t)size, fp); fclose(fp); if (read_len != (size_t)size) { free(buffer); return false; } *out_bytes = buffer; *out_len = (size_t)size; return true; } static bool build_log_path(const char *root, char **out_path) { char *index_path = NULL; bool ok; if (root == NULL || out_path == NULL) { return false; } if (!join_path(root, "index", &index_path)) { return false; } ok = join_path(index_path, "log.asl", out_path); free(index_path); return ok; } static bool build_segment_path(const char *root, uint64_t segment_id, char **out_path) { int needed; char *buffer; if (root == NULL || out_path == NULL) { return false; } needed = snprintf(NULL, 0, "%s/index/segments/segment-%016" PRIx64 ".asl", root, segment_id); if (needed <= 0) { return false; } buffer = (char *)malloc((size_t)needed + 1u); if (buffer == NULL) { return false; } snprintf(buffer, (size_t)needed + 1u, "%s/index/segments/segment-%016" PRIx64 ".asl", root, segment_id); *out_path = buffer; return true; } static bool build_shard_segment_path(const char *root, uint16_t shard_id, uint64_t segment_id, char **out_path) { int needed; char *buffer; if (root == NULL || out_path == NULL) { return false; } needed = snprintf(NULL, 0, "%s/index/shards/shard-%016" PRIx64 "/segments/segment-%016" PRIx64 ".asl", root, (uint64_t)shard_id, segment_id); if (needed <= 0) { return false; } buffer = (char *)malloc((size_t)needed + 1u); if (buffer == NULL) { return false; } snprintf(buffer, (size_t)needed + 1u, "%s/index/shards/shard-%016" PRIx64 "/segments/segment-%016" PRIx64 ".asl", root, (uint64_t)shard_id, segment_id); *out_path = buffer; return true; } static uint64_t load_u64_le(const uint8_t *data) { return (uint64_t)data[0] | ((uint64_t)data[1] << 8) | ((uint64_t)data[2] << 16) | ((uint64_t)data[3] << 24) | ((uint64_t)data[4] << 32) | ((uint64_t)data[5] << 40) | ((uint64_t)data[6] << 48) | ((uint64_t)data[7] << 56); } static uint16_t ref_shard(amduat_reference_t ref, uint16_t shard_count) { return amduat_asl_index_accel_shard_for_ref( ref, false, amduat_type_tag(0u), shard_count); } static bool parse_size_env(const char *value, size_t *out) { char *endptr; unsigned long long parsed; if (value == NULL || out == NULL) { return false; } errno = 0; parsed = strtoull(value, &endptr, 10); if (errno != 0 || endptr == value || *endptr != '\0') { return false; } if (parsed > SIZE_MAX) { return false; } *out = (size_t)parsed; return true; } static uint64_t now_ns(void) { struct timespec ts; if (clock_gettime(CLOCK_MONOTONIC, &ts) != 0) { return 0u; } return (uint64_t)ts.tv_sec * 1000u * 1000u * 1000u + (uint64_t)ts.tv_nsec; } static void sleep_ms(unsigned int ms) { struct timespec ts; ts.tv_sec = (time_t)(ms / 1000u); ts.tv_nsec = (long)((ms % 1000u) * 1000u * 1000u); while (nanosleep(&ts, &ts) != 0 && errno == EINTR) { } } static long current_rss_kb(void) { struct rusage usage; if (getrusage(RUSAGE_SELF, &usage) != 0) { return -1; } return usage.ru_maxrss; } static bool parse_u64_env(const char *value, uint64_t *out) { char *endptr; unsigned long long parsed; if (value == NULL || out == NULL) { return false; } errno = 0; parsed = strtoull(value, &endptr, 10); if (errno != 0 || endptr == value || *endptr != '\0') { return false; } if (parsed > UINT64_MAX) { return false; } *out = (uint64_t)parsed; return true; } static char *make_temp_root(void) { char *templ; const char template_prefix[] = "/tmp/amduat_test_asl_store_index_fs_XXXXXX"; templ = (char *)malloc(sizeof(template_prefix)); if (templ == NULL) { return NULL; } memcpy(templ, template_prefix, sizeof(template_prefix)); if (mkdtemp(templ) == NULL) { free(templ); return NULL; } return templ; } static int test_round_trip(void) { amduat_asl_store_config_t config; amduat_asl_store_index_fs_t fs; amduat_asl_store_t store; amduat_asl_store_error_t err; amduat_asl_index_state_t state; amduat_asl_index_state_t current_state; amduat_artifact_t artifact; amduat_artifact_t loaded; amduat_reference_t ref; uint8_t payload[6]; char *root; int exit_code = 1; root = make_temp_root(); if (root == NULL) { fprintf(stderr, "temp root failed\n"); return 1; } config.encoding_profile_id = AMDUAT_ENC_ASL1_CORE_V1; config.hash_id = AMDUAT_HASH_ASL1_ID_SHA256; if (!amduat_asl_store_index_fs_init(&fs, config, root)) { fprintf(stderr, "index fs init failed\n"); goto cleanup; } amduat_asl_store_init(&store, config, amduat_asl_store_index_fs_ops(), &fs); memset(payload, 0x5a, sizeof(payload)); artifact = amduat_artifact(amduat_octets(payload, sizeof(payload))); ref = amduat_reference(0u, amduat_octets(NULL, 0u)); err = amduat_asl_store_put_indexed(&store, artifact, &ref, &state); if (err != AMDUAT_ASL_STORE_OK) { fprintf(stderr, "put_indexed failed: %d\n", err); goto cleanup; } if (!amduat_asl_index_current_state(&store, ¤t_state)) { fprintf(stderr, "current_state failed\n"); goto cleanup; } if (current_state.log_position != state.log_position) { fprintf(stderr, "log position mismatch\n"); goto cleanup; } loaded = amduat_artifact(amduat_octets(NULL, 0u)); err = amduat_asl_store_get_indexed(&store, ref, state, &loaded); if (err != AMDUAT_ASL_STORE_OK) { fprintf(stderr, "get_indexed failed: %d\n", err); goto cleanup; } if (!amduat_artifact_eq(artifact, loaded)) { fprintf(stderr, "artifact mismatch\n"); amduat_artifact_free(&loaded); goto cleanup; } amduat_artifact_free(&loaded); if (state.log_position > 0u) { amduat_asl_index_state_t cutoff = state; cutoff.log_position = state.log_position - 1u; loaded = amduat_artifact(amduat_octets(NULL, 0u)); err = amduat_asl_store_get_indexed(&store, ref, cutoff, &loaded); if (err != AMDUAT_ASL_STORE_ERR_NOT_FOUND) { fprintf(stderr, "cutoff lookup expected not found: %d\n", err); amduat_artifact_free(&loaded); goto cleanup; } } { char *segment_path = NULL; uint8_t *segment_bytes = NULL; size_t segment_len = 0u; amduat_asl_core_index_segment_t segment; bool bloom_nonzero = false; size_t i; memset(&segment, 0, sizeof(segment)); if (!build_segment_path(root, 1u, &segment_path)) { fprintf(stderr, "segment path build failed\n"); goto cleanup; } if (!read_file(segment_path, &segment_bytes, &segment_len)) { fprintf(stderr, "segment read failed\n"); free(segment_path); goto cleanup; } free(segment_path); if (!amduat_enc_asl_core_index_decode_v1( amduat_octets(segment_bytes, segment_len), &segment)) { fprintf(stderr, "segment decode failed\n"); free(segment_bytes); goto cleanup; } free(segment_bytes); if (segment.bloom.len != AMDUAT_ASL_INDEX_BLOOM_BYTES) { fprintf(stderr, "segment bloom size mismatch\n"); amduat_enc_asl_core_index_free(&segment); goto cleanup; } if (!amduat_asl_index_bloom_maybe_contains(segment.bloom, ref.hash_id, ref.digest)) { fprintf(stderr, "segment bloom missing digest\n"); amduat_enc_asl_core_index_free(&segment); goto cleanup; } for (i = 0u; i < segment.bloom.len; ++i) { if (segment.bloom.data[i] != 0u) { bloom_nonzero = true; break; } } if (!bloom_nonzero) { fprintf(stderr, "segment bloom was empty\n"); amduat_enc_asl_core_index_free(&segment); goto cleanup; } amduat_enc_asl_core_index_free(&segment); } exit_code = 0; cleanup: amduat_reference_free(&ref); if (!remove_tree(root)) { fprintf(stderr, "cleanup failed\n"); exit_code = 1; } free(root); return exit_code; } static int test_shard_routing(void) { amduat_asl_store_config_t config; amduat_asl_store_index_fs_t fs; amduat_asl_store_t store; char *root; uint8_t payload_a[2] = {0x10, 0x11}; uint8_t payload_b[2] = {0x20, 0x21}; amduat_reference_t ref_a; amduat_reference_t ref_b; amduat_octets_t bytes_a; amduat_octets_t bytes_b; amduat_hash_id_t hash_id; uint16_t shard_a; uint16_t shard_b; uint64_t local_counts[2] = {0u, 0u}; int exit_code = 1; size_t i; bool found = false; root = make_temp_root(); if (root == NULL) { fprintf(stderr, "temp root failed\n"); return 1; } memset(&config, 0, sizeof(config)); config.encoding_profile_id = AMDUAT_ENC_ASL1_CORE_V1; config.hash_id = AMDUAT_HASH_ASL1_ID_SHA256; if (!amduat_asl_store_index_fs_init(&fs, config, root)) { fprintf(stderr, "index fs init failed\n"); goto cleanup; } amduat_asl_store_index_fs_set_shard_count(&fs, 2); amduat_asl_store_init(&store, config, amduat_asl_store_index_fs_ops(), &fs); hash_id = config.hash_id; ref_a = amduat_reference(0u, amduat_octets(NULL, 0u)); ref_b = amduat_reference(0u, amduat_octets(NULL, 0u)); bytes_a = amduat_octets(NULL, 0u); bytes_b = amduat_octets(NULL, 0u); for (i = 0; i < 256; ++i) { payload_a[0] = (uint8_t)i; payload_b[0] = (uint8_t)(255u - i); amduat_reference_free(&ref_a); amduat_reference_free(&ref_b); amduat_octets_free(&bytes_a); amduat_octets_free(&bytes_b); if (!amduat_asl_ref_derive(amduat_artifact(amduat_octets(payload_a, sizeof(payload_a))), AMDUAT_ENC_ASL1_CORE_V1, hash_id, &ref_a, &bytes_a) || !amduat_asl_ref_derive(amduat_artifact(amduat_octets(payload_b, sizeof(payload_b))), AMDUAT_ENC_ASL1_CORE_V1, hash_id, &ref_b, &bytes_b)) { fprintf(stderr, "ref derive failed\n"); goto cleanup; } shard_a = ref_shard(ref_a, 2); shard_b = ref_shard(ref_b, 2); if (shard_a != shard_b) { found = true; break; } } if (!found) { fprintf(stderr, "shard selection failed\n"); goto cleanup; } { amduat_reference_t stored_ref; amduat_asl_index_state_t state; uint64_t segment_id; char *segment_path = NULL; amduat_asl_store_error_t err; stored_ref = amduat_reference(0u, amduat_octets(NULL, 0u)); err = amduat_asl_store_put_indexed(&store, amduat_artifact(amduat_octets(payload_a, sizeof(payload_a))), &stored_ref, &state); if (err != AMDUAT_ASL_STORE_OK) { fprintf(stderr, "put shard a failed\n"); amduat_reference_free(&stored_ref); goto cleanup; } amduat_reference_free(&stored_ref); local_counts[shard_a] += 1u; segment_id = ((uint64_t)shard_a << 48) | local_counts[shard_a]; if (!build_shard_segment_path(root, shard_a, segment_id, &segment_path)) { fprintf(stderr, "segment path failed\n"); goto cleanup; } if (access(segment_path, F_OK) != 0) { fprintf(stderr, "segment missing in shard a\n"); free(segment_path); goto cleanup; } free(segment_path); } { amduat_reference_t stored_ref; amduat_asl_index_state_t state; uint64_t segment_id; char *segment_path = NULL; amduat_asl_store_error_t err; stored_ref = amduat_reference(0u, amduat_octets(NULL, 0u)); err = amduat_asl_store_put_indexed(&store, amduat_artifact(amduat_octets(payload_b, sizeof(payload_b))), &stored_ref, &state); if (err != AMDUAT_ASL_STORE_OK) { fprintf(stderr, "put shard b failed\n"); amduat_reference_free(&stored_ref); goto cleanup; } amduat_reference_free(&stored_ref); local_counts[shard_b] += 1u; segment_id = ((uint64_t)shard_b << 48) | local_counts[shard_b]; if (!build_shard_segment_path(root, shard_b, segment_id, &segment_path)) { fprintf(stderr, "segment path failed\n"); goto cleanup; } if (access(segment_path, F_OK) != 0) { fprintf(stderr, "segment missing in shard b\n"); free(segment_path); goto cleanup; } free(segment_path); } exit_code = 0; cleanup: amduat_reference_free(&ref_a); amduat_reference_free(&ref_b); amduat_octets_free(&bytes_a); amduat_octets_free(&bytes_b); if (!remove_tree(root)) { fprintf(stderr, "cleanup failed\n"); exit_code = 1; } free(root); return exit_code; } static int test_snapshot_truncation(void) { amduat_asl_store_config_t config; amduat_asl_store_index_fs_t fs; amduat_asl_store_t store; amduat_asl_store_error_t err; amduat_asl_index_state_t state; amduat_artifact_t artifact; amduat_artifact_t loaded; amduat_reference_t ref; uint8_t payload[8]; char *root; char *log_path = NULL; uint8_t *log_before = NULL; uint8_t *log_after = NULL; size_t log_before_len = 0u; size_t log_after_len = 0u; int exit_code = 1; root = make_temp_root(); if (root == NULL) { fprintf(stderr, "temp root failed\n"); return 1; } memset(&config, 0, sizeof(config)); config.encoding_profile_id = AMDUAT_ENC_ASL1_CORE_V1; config.hash_id = AMDUAT_HASH_ASL1_ID_SHA256; if (!amduat_asl_store_index_fs_init(&fs, config, root)) { fprintf(stderr, "index fs init failed\n"); goto cleanup; } amduat_asl_store_init(&store, config, amduat_asl_store_index_fs_ops(), &fs); memset(payload, 0x33, sizeof(payload)); artifact = amduat_artifact(amduat_octets(payload, sizeof(payload))); ref = amduat_reference(0u, amduat_octets(NULL, 0u)); err = amduat_asl_store_put_indexed(&store, artifact, &ref, &state); if (err != AMDUAT_ASL_STORE_OK) { fprintf(stderr, "put_indexed failed: %d\n", err); goto cleanup; } if (!build_log_path(root, &log_path) || !read_file(log_path, &log_before, &log_before_len)) { fprintf(stderr, "log read before snapshot failed\n"); goto cleanup; } err = amduat_asl_store_index_fs_snapshot_create(&fs, 1u, NULL, NULL); if (err != AMDUAT_ASL_STORE_OK) { fprintf(stderr, "snapshot create failed: %d\n", err); goto cleanup; } if (!read_file(log_path, &log_after, &log_after_len)) { fprintf(stderr, "log read after snapshot failed\n"); goto cleanup; } if (log_after_len >= log_before_len) { fprintf(stderr, "log did not shrink after snapshot\n"); goto cleanup; } if (!amduat_asl_index_current_state(&store, &state)) { fprintf(stderr, "current_state failed\n"); goto cleanup; } loaded = amduat_artifact(amduat_octets(NULL, 0u)); err = amduat_asl_store_get_indexed(&store, ref, state, &loaded); if (err != AMDUAT_ASL_STORE_OK) { fprintf(stderr, "get_indexed after snapshot failed: %d\n", err); goto cleanup; } if (!amduat_artifact_eq(artifact, loaded)) { fprintf(stderr, "artifact mismatch after snapshot\n"); amduat_artifact_free(&loaded); goto cleanup; } amduat_artifact_free(&loaded); exit_code = 0; cleanup: free(log_path); free(log_before); free(log_after); amduat_reference_free(&ref); if (!remove_tree(root)) { fprintf(stderr, "cleanup failed\n"); exit_code = 1; } free(root); return exit_code; } static int test_segment_batch_packing(void) { amduat_asl_store_config_t config; amduat_asl_store_index_fs_t fs; amduat_asl_store_t store; amduat_asl_store_index_fs_segment_policy_t segment_policy; amduat_asl_store_index_fs_visibility_policy_t visibility_policy; amduat_asl_store_error_t err; amduat_asl_index_state_t state_a; amduat_asl_index_state_t state_b; amduat_artifact_t artifact_a; amduat_artifact_t artifact_b; amduat_artifact_t loaded; amduat_reference_t ref_a; amduat_reference_t ref_b; uint8_t payload_a[3] = {0x01, 0x02, 0x03}; uint8_t payload_b[4] = {0x10, 0x11, 0x12, 0x13}; char *root; amduat_asl_log_record_t *records = NULL; size_t record_count = 0u; uint64_t segment_id = 0u; char *segment_path = NULL; uint8_t *segment_bytes = NULL; size_t segment_len = 0u; amduat_asl_core_index_segment_t segment; int exit_code = 1; root = make_temp_root(); if (root == NULL) { fprintf(stderr, "temp root failed\n"); return 1; } memset(&config, 0, sizeof(config)); config.encoding_profile_id = AMDUAT_ENC_ASL1_CORE_V1; config.hash_id = AMDUAT_HASH_ASL1_ID_SHA256; if (!amduat_asl_store_index_fs_init(&fs, config, root)) { fprintf(stderr, "index fs init failed\n"); goto cleanup; } segment_policy.max_segment_records = 2u; segment_policy.max_segment_bytes = 0u; segment_policy.small_artifact_threshold = 1024u; segment_policy.allow_deferred_visibility = true; amduat_asl_store_index_fs_set_segment_policy(&fs, segment_policy); visibility_policy.segment_domain_id = 7u; visibility_policy.record_visibility = 1u; amduat_asl_store_index_fs_set_visibility_policy(&fs, visibility_policy); amduat_asl_store_init(&store, config, amduat_asl_store_index_fs_ops(), &fs); artifact_a = amduat_artifact(amduat_octets(payload_a, sizeof(payload_a))); artifact_b = amduat_artifact(amduat_octets(payload_b, sizeof(payload_b))); ref_a = amduat_reference(0u, amduat_octets(NULL, 0u)); ref_b = amduat_reference(0u, amduat_octets(NULL, 0u)); err = amduat_asl_store_put_indexed(&store, artifact_a, &ref_a, &state_a); if (err != AMDUAT_ASL_STORE_OK) { fprintf(stderr, "put_indexed a failed: %d\n", err); goto cleanup; } loaded = amduat_artifact(amduat_octets(NULL, 0u)); err = amduat_asl_store_get_indexed(&store, ref_a, state_a, &loaded); if (err != AMDUAT_ASL_STORE_ERR_NOT_FOUND) { fprintf(stderr, "expected a to be pending before seal: %d\n", err); amduat_artifact_free(&loaded); goto cleanup; } amduat_artifact_free(&loaded); err = amduat_asl_store_put_indexed(&store, artifact_b, &ref_b, &state_b); if (err != AMDUAT_ASL_STORE_OK) { fprintf(stderr, "put_indexed b failed: %d\n", err); goto cleanup; } loaded = amduat_artifact(amduat_octets(NULL, 0u)); err = amduat_asl_store_get_indexed(&store, ref_a, state_b, &loaded); if (err != AMDUAT_ASL_STORE_OK) { fprintf(stderr, "get_indexed a after seal failed: %d\n", err); goto cleanup; } amduat_artifact_free(&loaded); loaded = amduat_artifact(amduat_octets(NULL, 0u)); err = amduat_asl_store_get_indexed(&store, ref_b, state_b, &loaded); if (err != AMDUAT_ASL_STORE_OK) { fprintf(stderr, "get_indexed b after seal failed: %d\n", err); goto cleanup; } amduat_artifact_free(&loaded); err = amduat_asl_log_scan(&store, &records, &record_count); if (err != AMDUAT_ASL_STORE_OK || record_count != 1u || records[0].record_type != AMDUAT_ASL_LOG_RECORD_SEGMENT_SEAL) { fprintf(stderr, "log scan expected one segment seal\n"); goto cleanup; } if (records[0].payload.len < 8u) { fprintf(stderr, "segment seal payload too small\n"); goto cleanup; } segment_id = load_u64_le(records[0].payload.data); memset(&segment, 0, sizeof(segment)); if (!build_segment_path(root, segment_id, &segment_path)) { fprintf(stderr, "segment path build failed\n"); goto cleanup; } if (!read_file(segment_path, &segment_bytes, &segment_len)) { fprintf(stderr, "segment read failed\n"); goto cleanup; } if (!amduat_enc_asl_core_index_decode_v1( amduat_octets(segment_bytes, segment_len), &segment)) { fprintf(stderr, "segment decode failed\n"); goto cleanup; } if (segment.record_count != 2u || segment.extent_count != 2u) { fprintf(stderr, "segment record/extent count mismatch\n"); goto cleanup; } if (segment.header.segment_visibility != 1u || segment.header.segment_domain_id != 7u) { fprintf(stderr, "segment visibility metadata mismatch\n"); goto cleanup; } if (segment.records[0].visibility != 1u || segment.records[1].visibility != 1u || segment.records[0].domain_id != 7u || segment.records[1].domain_id != 7u) { fprintf(stderr, "record visibility metadata mismatch\n"); goto cleanup; } exit_code = 0; cleanup: amduat_enc_asl_core_index_free(&segment); free(segment_path); free(segment_bytes); amduat_enc_asl_log_free(records, record_count); amduat_reference_free(&ref_a); amduat_reference_free(&ref_b); if (!remove_tree(root)) { fprintf(stderr, "cleanup failed\n"); exit_code = 1; } free(root); return exit_code; } static int test_tombstone_round_trip(void) { amduat_asl_store_config_t config; amduat_asl_store_index_fs_t fs; amduat_asl_store_t store; amduat_asl_store_error_t err; amduat_asl_index_state_t state; amduat_asl_index_state_t tombstone_state; amduat_asl_index_state_t lift_state; amduat_artifact_t artifact; amduat_artifact_t loaded; amduat_reference_t ref; uint8_t payload[4]; char *root; amduat_asl_log_record_t *records = NULL; size_t record_count = 0u; int exit_code = 1; root = make_temp_root(); if (root == NULL) { fprintf(stderr, "temp root failed\n"); return 1; } memset(&config, 0, sizeof(config)); config.encoding_profile_id = AMDUAT_ENC_ASL1_CORE_V1; config.hash_id = AMDUAT_HASH_ASL1_ID_SHA256; if (!amduat_asl_store_index_fs_init(&fs, config, root)) { fprintf(stderr, "index fs init failed\n"); goto cleanup; } amduat_asl_store_init(&store, config, amduat_asl_store_index_fs_ops(), &fs); memset(payload, 0x2a, sizeof(payload)); artifact = amduat_artifact(amduat_octets(payload, sizeof(payload))); ref = amduat_reference(0u, amduat_octets(NULL, 0u)); err = amduat_asl_store_put_indexed(&store, artifact, &ref, &state); if (err != AMDUAT_ASL_STORE_OK) { fprintf(stderr, "put_indexed failed: %d\n", err); goto cleanup; } loaded = amduat_artifact(amduat_octets(NULL, 0u)); err = amduat_asl_store_get_indexed(&store, ref, state, &loaded); if (err != AMDUAT_ASL_STORE_OK) { fprintf(stderr, "get_indexed failed: %d\n", err); goto cleanup; } amduat_artifact_free(&loaded); err = amduat_asl_store_tombstone(&store, ref, 7u, 9u, &tombstone_state); if (err != AMDUAT_ASL_STORE_OK) { fprintf(stderr, "tombstone failed: %d\n", err); goto cleanup; } if (tombstone_state.log_position <= state.log_position) { fprintf(stderr, "tombstone did not advance log position\n"); goto cleanup; } if (!amduat_asl_index_current_state(&store, &state)) { fprintf(stderr, "current_state failed\n"); goto cleanup; } loaded = amduat_artifact(amduat_octets(NULL, 0u)); err = amduat_asl_store_get_indexed(&store, ref, state, &loaded); if (err != AMDUAT_ASL_STORE_ERR_NOT_FOUND) { fprintf(stderr, "get_indexed after tombstone expected not found: %d\n", err); amduat_artifact_free(&loaded); goto cleanup; } amduat_artifact_free(&loaded); err = amduat_asl_store_tombstone_lift(&store, ref, tombstone_state.log_position, &lift_state); if (err != AMDUAT_ASL_STORE_OK) { fprintf(stderr, "tombstone_lift failed: %d\n", err); goto cleanup; } if (lift_state.log_position <= tombstone_state.log_position) { fprintf(stderr, "tombstone_lift did not advance log position\n"); goto cleanup; } if (!amduat_asl_index_current_state(&store, &state)) { fprintf(stderr, "current_state failed\n"); goto cleanup; } loaded = amduat_artifact(amduat_octets(NULL, 0u)); err = amduat_asl_store_get_indexed(&store, ref, state, &loaded); if (err != AMDUAT_ASL_STORE_OK) { fprintf(stderr, "get_indexed after tombstone lift failed: %d\n", err); goto cleanup; } amduat_artifact_free(&loaded); err = amduat_asl_log_scan(&store, &records, &record_count); if (err != AMDUAT_ASL_STORE_OK) { fprintf(stderr, "log scan failed: %d\n", err); goto cleanup; } if (record_count < 3u || records[record_count - 1u].record_type != AMDUAT_ASL_LOG_RECORD_TOMBSTONE_LIFT) { fprintf(stderr, "log scan missing tombstone lift record\n"); goto cleanup; } exit_code = 0; cleanup: amduat_enc_asl_log_free(records, record_count); amduat_reference_free(&ref); if (!remove_tree(root)) { fprintf(stderr, "cleanup failed\n"); exit_code = 1; } free(root); return exit_code; } static int test_gc_keeps_visible(void) { amduat_asl_store_config_t config; amduat_asl_store_index_fs_t fs; amduat_asl_store_t store; amduat_asl_store_error_t err; amduat_asl_index_state_t state; amduat_artifact_t artifact; amduat_artifact_t loaded; amduat_reference_t ref; uint8_t payload[5]; char *root; int exit_code = 1; root = make_temp_root(); if (root == NULL) { fprintf(stderr, "temp root failed\n"); return 1; } memset(&config, 0, sizeof(config)); config.encoding_profile_id = AMDUAT_ENC_ASL1_CORE_V1; config.hash_id = AMDUAT_HASH_ASL1_ID_SHA256; if (!amduat_asl_store_index_fs_init(&fs, config, root)) { fprintf(stderr, "index fs init failed\n"); goto cleanup; } amduat_asl_store_init(&store, config, amduat_asl_store_index_fs_ops(), &fs); memset(payload, 0x77, sizeof(payload)); artifact = amduat_artifact(amduat_octets(payload, sizeof(payload))); ref = amduat_reference(0u, amduat_octets(NULL, 0u)); err = amduat_asl_store_put_indexed(&store, artifact, &ref, &state); if (err != AMDUAT_ASL_STORE_OK) { fprintf(stderr, "put_indexed failed: %d\n", err); goto cleanup; } err = amduat_asl_store_index_fs_gc(&fs, &state); if (err != AMDUAT_ASL_STORE_OK) { fprintf(stderr, "gc failed: %d\n", err); goto cleanup; } loaded = amduat_artifact(amduat_octets(NULL, 0u)); err = amduat_asl_store_get_indexed(&store, ref, state, &loaded); if (err != AMDUAT_ASL_STORE_OK) { fprintf(stderr, "get_indexed after gc failed: %d\n", err); goto cleanup; } amduat_artifact_free(&loaded); exit_code = 0; cleanup: amduat_reference_free(&ref); if (!remove_tree(root)) { fprintf(stderr, "cleanup failed\n"); exit_code = 1; } free(root); return exit_code; } static int test_large_round_trip_perf(void) { enum { k_artifact_count_default = 10000, k_payload_max = 64 }; amduat_asl_store_config_t config; amduat_asl_store_index_fs_t fs; amduat_asl_store_t store; amduat_asl_store_error_t err; amduat_asl_index_state_t state; amduat_reference_t *refs = NULL; uint8_t **payloads = NULL; size_t *payload_lens = NULL; char *root; int exit_code = 1; size_t i; size_t artifact_count; uint64_t start_ns; uint64_t end_ns; long rss_kb; const char *count_env; root = make_temp_root(); if (root == NULL) { fprintf(stderr, "temp root failed\n"); return 1; } artifact_count = k_artifact_count_default; count_env = getenv("AMDUAT_ASL_PERF_COUNT"); if (count_env != NULL) { size_t parsed; if (parse_size_env(count_env, &parsed) && parsed > 0u) { artifact_count = parsed; } } refs = (amduat_reference_t *)calloc(artifact_count, sizeof(*refs)); payloads = (uint8_t **)calloc(artifact_count, sizeof(*payloads)); payload_lens = (size_t *)calloc(artifact_count, sizeof(*payload_lens)); if (refs == NULL || payloads == NULL || payload_lens == NULL) { fprintf(stderr, "alloc failed\n"); goto cleanup; } memset(&config, 0, sizeof(config)); config.encoding_profile_id = AMDUAT_ENC_ASL1_CORE_V1; config.hash_id = AMDUAT_HASH_ASL1_ID_SHA256; if (!amduat_asl_store_index_fs_init(&fs, config, root)) { fprintf(stderr, "index fs init failed\n"); goto cleanup; } amduat_asl_store_init(&store, config, amduat_asl_store_index_fs_ops(), &fs); srand(1337); start_ns = now_ns(); for (i = 0; i < artifact_count; ++i) { size_t len = (size_t)((rand() % k_payload_max) + 1); uint8_t *payload = (uint8_t *)malloc(len); if (payload == NULL) { fprintf(stderr, "payload alloc failed\n"); goto cleanup; } for (size_t j = 0; j < len; ++j) { payload[j] = (uint8_t)(rand() & 0xffu); } payloads[i] = payload; payload_lens[i] = len; refs[i] = amduat_reference(0u, amduat_octets(NULL, 0u)); err = amduat_asl_store_put(&store, amduat_artifact(amduat_octets(payload, len)), &refs[i]); if (err != AMDUAT_ASL_STORE_OK) { fprintf(stderr, "put failed: %d\n", err); goto cleanup; } } end_ns = now_ns(); rss_kb = current_rss_kb(); if (start_ns != 0u && end_ns != 0u) { fprintf(stderr, "perf: put %zu artifacts in %.3f ms\n", artifact_count, (double)(end_ns - start_ns) / 1000000.0); } if (rss_kb >= 0) { fprintf(stderr, "perf: maxrss %ld KB\n", rss_kb); } if (!amduat_asl_index_current_state(&store, &state)) { fprintf(stderr, "current_state failed\n"); goto cleanup; } start_ns = now_ns(); for (i = 0; i < artifact_count; ++i) { amduat_artifact_t loaded = amduat_artifact(amduat_octets(NULL, 0u)); err = amduat_asl_store_get_indexed(&store, refs[i], state, &loaded); if (err != AMDUAT_ASL_STORE_OK) { char *ref_text = NULL; (void)amduat_format_ref_to_text(refs[i], AMDUAT_FORMAT_REF_HEX, &ref_text); fprintf(stderr, "get failed: %d index=%zu snapshot=%" PRIu64 " log=%" PRIu64 " ref=%s\n", err, i, state.snapshot_id, state.log_position, ref_text == NULL ? "(null)" : ref_text); free(ref_text); goto cleanup; } if (loaded.bytes.len != payload_lens[i] || (loaded.bytes.len != 0u && memcmp(loaded.bytes.data, payloads[i], loaded.bytes.len) != 0)) { fprintf(stderr, "payload mismatch\n"); amduat_artifact_free(&loaded); goto cleanup; } amduat_artifact_free(&loaded); } end_ns = now_ns(); if (start_ns != 0u && end_ns != 0u) { fprintf(stderr, "perf: get %zu artifacts in %.3f ms\n", artifact_count, (double)(end_ns - start_ns) / 1000000.0); } exit_code = 0; cleanup: if (refs != NULL) { for (i = 0; i < artifact_count; ++i) { amduat_reference_free(&refs[i]); } } if (payloads != NULL) { for (i = 0; i < artifact_count; ++i) { free(payloads[i]); } } free(payloads); free(payload_lens); free(refs); if (!remove_tree(root)) { fprintf(stderr, "cleanup failed\n"); exit_code = 1; } free(root); return exit_code; } typedef struct { const char *root; const char *log_path; amduat_asl_store_config_t config; uint64_t duration_ns; volatile bool stop; volatile bool failed; volatile uint64_t put_count; volatile uint64_t get_count; } stress_state_t; static void *stress_writer_thread(void *arg) { stress_state_t *state = (stress_state_t *)arg; const amduat_hash_asl1_desc_t *hash_desc; amduat_asl_store_index_fs_t fs; amduat_asl_store_t store; amduat_asl_store_index_fs_snapshot_policy_t policy; uint8_t *payload = NULL; FILE *log_fp = NULL; uint64_t start_ns; uint64_t now; uint64_t flush_count = 0u; if (state == NULL) { return NULL; } hash_desc = amduat_hash_asl1_desc_lookup(state->config.hash_id); if (hash_desc == NULL || hash_desc->digest_len == 0) { state->failed = true; state->stop = true; return NULL; } if (!amduat_asl_store_index_fs_init(&fs, state->config, state->root)) { state->failed = true; state->stop = true; return NULL; } memset(&policy, 0, sizeof(policy)); policy.enabled = false; amduat_asl_store_index_fs_set_snapshot_policy(&fs, policy); amduat_asl_store_init(&store, state->config, amduat_asl_store_index_fs_ops(), &fs); payload = (uint8_t *)calloc(hash_desc->digest_len, 1u); if (payload == NULL) { state->failed = true; state->stop = true; return NULL; } log_fp = fopen(state->log_path, "ab"); if (log_fp == NULL) { free(payload); state->failed = true; state->stop = true; return NULL; } start_ns = now_ns(); while (!state->stop) { amduat_reference_t ref = amduat_reference(0u, amduat_octets(NULL, 0u)); amduat_asl_store_error_t err; char *hex = NULL; err = amduat_asl_store_put(&store, amduat_artifact(amduat_octets( payload, hash_desc->digest_len)), &ref); if (err != AMDUAT_ASL_STORE_OK) { state->failed = true; state->stop = true; break; } if (ref.digest.len == hash_desc->digest_len && ref.digest.data != NULL) { memcpy(payload, ref.digest.data, ref.digest.len); } else { amduat_reference_free(&ref); state->failed = true; state->stop = true; break; } if (!amduat_asl_ref_encode_hex(ref, &hex) || hex == NULL) { amduat_reference_free(&ref); state->failed = true; state->stop = true; break; } fprintf(log_fp, "%s\n", hex); free(hex); amduat_reference_free(&ref); __sync_fetch_and_add(&state->put_count, 1u); flush_count++; if ((flush_count % 256u) == 0u) { fflush(log_fp); } now = now_ns(); if (now != 0u && now - start_ns >= state->duration_ns) { state->stop = true; break; } } fflush(log_fp); fclose(log_fp); free(payload); return NULL; } static void *stress_reader_thread(void *arg) { stress_state_t *state = (stress_state_t *)arg; amduat_asl_store_index_fs_t fs; amduat_asl_store_t store; amduat_asl_store_index_fs_snapshot_policy_t policy; FILE *log_fp = NULL; unsigned int seed = 1337u; struct stat st; if (state == NULL) { return NULL; } if (!amduat_asl_store_index_fs_init(&fs, state->config, state->root)) { state->failed = true; state->stop = true; return NULL; } memset(&policy, 0, sizeof(policy)); policy.enabled = false; amduat_asl_store_index_fs_set_snapshot_policy(&fs, policy); amduat_asl_store_init(&store, state->config, amduat_asl_store_index_fs_ops(), &fs); log_fp = fopen(state->log_path, "rb"); while (log_fp == NULL && !state->stop) { if (errno != ENOENT) { state->failed = true; state->stop = true; return NULL; } sleep_ms(10u); log_fp = fopen(state->log_path, "rb"); } if (log_fp == NULL) { state->failed = true; state->stop = true; return NULL; } while (!state->stop) { char line[256]; long offset; size_t len; amduat_reference_t ref; if (fstat(fileno(log_fp), &st) != 0) { state->failed = true; state->stop = true; break; } if (st.st_size <= 0) { continue; } offset = (long)(rand_r(&seed) % (unsigned int)st.st_size); if (fseek(log_fp, offset, SEEK_SET) != 0) { state->failed = true; state->stop = true; break; } if (offset != 0) { if (fgets(line, sizeof(line), log_fp) == NULL) { continue; } } if (fgets(line, sizeof(line), log_fp) == NULL) { continue; } len = strlen(line); if (len == 0u || line[len - 1u] != '\n') { continue; } line[len - 1u] = '\0'; if (!amduat_asl_ref_decode_hex(line, &ref)) { fprintf(stderr, "stress: decode ref failed\n"); state->failed = true; state->stop = true; break; } { amduat_artifact_t loaded = amduat_artifact(amduat_octets(NULL, 0u)); amduat_asl_store_error_t err = amduat_asl_store_get(&store, ref, &loaded); if (err != AMDUAT_ASL_STORE_OK) { char *ref_text = NULL; (void)amduat_format_ref_to_text(ref, AMDUAT_FORMAT_REF_HEX, &ref_text); fprintf(stderr, "stress: get failed: %d ref=%s\n", err, ref_text == NULL ? "(null)" : ref_text); free(ref_text); state->failed = true; state->stop = true; amduat_artifact_free(&loaded); amduat_reference_free(&ref); break; } amduat_artifact_free(&loaded); __sync_fetch_and_add(&state->get_count, 1u); } amduat_reference_free(&ref); } fclose(log_fp); return NULL; } static int test_writer_reader_stress(void) { amduat_asl_store_config_t config; pthread_t writer; pthread_t reader; stress_state_t state; char *root; char *log_path = NULL; uint64_t duration_ns = 60u * 1000u * 1000u * 1000u; const char *duration_env; uint64_t start_ns; uint64_t last_ns; uint64_t now; uint64_t last_put = 0u; uint64_t last_get = 0u; double put_rate_first = 0.0; double get_rate_first = 0.0; double put_rate_last = 0.0; double get_rate_last = 0.0; double put_rate_min = 0.0; double put_rate_max = 0.0; double get_rate_min = 0.0; double get_rate_max = 0.0; double put_rate_sum = 0.0; double get_rate_sum = 0.0; uint64_t rate_samples = 0u; root = make_temp_root(); if (root == NULL) { fprintf(stderr, "temp root failed\n"); return 1; } if (!join_path(root, "refs.log", &log_path)) { fprintf(stderr, "log path failed\n"); free(root); return 1; } memset(&config, 0, sizeof(config)); config.encoding_profile_id = AMDUAT_ENC_ASL1_CORE_V1; config.hash_id = AMDUAT_HASH_ASL1_ID_SHA256; duration_env = getenv("AMDUAT_ASL_STRESS_SECS"); if (duration_env != NULL) { uint64_t seconds; if (parse_u64_env(duration_env, &seconds)) { duration_ns = seconds * 1000u * 1000u * 1000u; } } memset(&state, 0, sizeof(state)); state.root = root; state.log_path = log_path; state.config = config; state.duration_ns = duration_ns; state.stop = false; state.failed = false; start_ns = now_ns(); last_ns = start_ns; if (pthread_create(&writer, NULL, stress_writer_thread, &state) != 0 || pthread_create(&reader, NULL, stress_reader_thread, &state) != 0) { fprintf(stderr, "thread start failed\n"); state.stop = true; } while (!state.stop) { now = now_ns(); if (now != 0u && now - last_ns >= 5u * 1000u * 1000u * 1000u) { uint64_t puts = state.put_count; uint64_t gets = state.get_count; double elapsed_s = (double)(now - last_ns) / 1000000000.0; double put_rate = (puts - last_put) / (elapsed_s > 0 ? elapsed_s : 1.0); double get_rate = (gets - last_get) / (elapsed_s > 0 ? elapsed_s : 1.0); if (rate_samples == 0u) { put_rate_first = put_rate; get_rate_first = get_rate; put_rate_min = put_rate; put_rate_max = put_rate; get_rate_min = get_rate; get_rate_max = get_rate; } else { if (put_rate < put_rate_min) { put_rate_min = put_rate; } if (put_rate > put_rate_max) { put_rate_max = put_rate; } if (get_rate < get_rate_min) { get_rate_min = get_rate; } if (get_rate > get_rate_max) { get_rate_max = get_rate; } } put_rate_last = put_rate; get_rate_last = get_rate; put_rate_sum += put_rate; get_rate_sum += get_rate; rate_samples++; fprintf(stderr, "stress: t=%.1fs puts=%" PRIu64 " gets=%" PRIu64 " put/s=%.1f get/s=%.1f\n", (double)(now - start_ns) / 1000000000.0, puts, gets, put_rate, get_rate); last_put = puts; last_get = gets; last_ns = now; } if (now != 0u && now - start_ns >= duration_ns) { state.stop = true; } if (state.failed) { state.stop = true; } sleep_ms(10u); } pthread_join(writer, NULL); pthread_join(reader, NULL); if (state.failed) { fprintf(stderr, "stress: failed\n"); } if (rate_samples > 0u) { fprintf(stderr, "stress: put/s first=%.1f last=%.1f min=%.1f max=%.1f avg=%.1f\n", put_rate_first, put_rate_last, put_rate_min, put_rate_max, put_rate_sum / (double)rate_samples); fprintf(stderr, "stress: get/s first=%.1f last=%.1f min=%.1f max=%.1f avg=%.1f\n", get_rate_first, get_rate_last, get_rate_min, get_rate_max, get_rate_sum / (double)rate_samples); } free(log_path); if (!remove_tree(root)) { fprintf(stderr, "cleanup failed\n"); free(root); return 1; } free(root); return state.failed ? 1 : 0; } int main(void) { if (test_round_trip() != 0) { return 1; } if (test_snapshot_truncation() != 0) { return 1; } if (test_segment_batch_packing() != 0) { return 1; } if (test_tombstone_round_trip() != 0) { return 1; } if (test_gc_keeps_visible() != 0) { return 1; } if (test_large_round_trip_perf() != 0) { return 1; } if (test_writer_reader_stress() != 0) { return 1; } return test_shard_routing(); }