amduat/tests/asl/test_asl_store_index_fs.c

1621 lines
44 KiB
C
Raw Permalink Normal View History

2026-01-17 14:08:41 +01:00
#include "amduat/asl/asl_store_index_fs.h"
2026-01-18 06:20:48 +01:00
#include "amduat/asl/index_accel.h"
#include "amduat/asl/index_bloom.h"
#include "amduat/asl/ref_text.h"
#include "amduat/asl/ref_derive.h"
2026-01-17 14:08:41 +01:00
#include "amduat/asl/store.h"
#include "amduat/enc/asl1_core.h"
#include "amduat/enc/asl_core_index.h"
2026-01-18 05:54:31 +01:00
#include "amduat/enc/asl_log.h"
#include "amduat/format/ref.h"
2026-01-17 14:08:41 +01:00
#include "amduat/hash/asl1.h"
#include <dirent.h>
#include <errno.h>
#include <inttypes.h>
#include <pthread.h>
2026-01-17 14:08:41 +01:00
#include <stdbool.h>
#include <stdint.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/resource.h>
2026-01-17 14:08:41 +01:00
#include <sys/stat.h>
#include <sys/types.h>
#include <time.h>
2026-01-17 14:08:41 +01:00
#include <unistd.h>
static bool join_path(const char *base, const char *segment, char **out_path) {
size_t base_len;
size_t seg_len;
bool needs_sep;
size_t total_len;
char *buffer;
size_t offset;
if (base == NULL || segment == NULL || out_path == NULL) {
return false;
}
if (base[0] == '\0' || segment[0] == '\0') {
return false;
}
base_len = strlen(base);
seg_len = strlen(segment);
needs_sep = base[base_len - 1u] != '/';
total_len = base_len + (needs_sep ? 1u : 0u) + seg_len + 1u;
buffer = (char *)malloc(total_len);
if (buffer == NULL) {
return false;
}
offset = 0u;
memcpy(buffer + offset, base, base_len);
offset += base_len;
if (needs_sep) {
buffer[offset++] = '/';
}
memcpy(buffer + offset, segment, seg_len);
offset += seg_len;
buffer[offset] = '\0';
*out_path = buffer;
return true;
}
static bool remove_tree(const char *path) {
struct stat st;
DIR *dir;
struct dirent *entry;
if (path == NULL) {
return false;
}
if (lstat(path, &st) != 0) {
return errno == ENOENT;
}
if (!S_ISDIR(st.st_mode)) {
return unlink(path) == 0;
}
dir = opendir(path);
if (dir == NULL) {
return false;
}
while ((entry = readdir(dir)) != NULL) {
char *child = NULL;
if (strcmp(entry->d_name, ".") == 0 ||
strcmp(entry->d_name, "..") == 0) {
continue;
}
if (!join_path(path, entry->d_name, &child)) {
closedir(dir);
return false;
}
if (!remove_tree(child)) {
free(child);
closedir(dir);
return false;
}
free(child);
}
if (closedir(dir) != 0) {
return false;
}
return rmdir(path) == 0;
}
static bool read_file(const char *path, uint8_t **out_bytes, size_t *out_len) {
FILE *fp;
long size;
uint8_t *buffer;
size_t read_len;
if (path == NULL || out_bytes == NULL || out_len == NULL) {
return false;
}
*out_bytes = NULL;
*out_len = 0u;
fp = fopen(path, "rb");
if (fp == NULL) {
return false;
}
if (fseek(fp, 0, SEEK_END) != 0) {
fclose(fp);
return false;
}
size = ftell(fp);
if (size < 0) {
fclose(fp);
return false;
}
if (fseek(fp, 0, SEEK_SET) != 0) {
fclose(fp);
return false;
}
buffer = (uint8_t *)malloc((size_t)size);
if (buffer == NULL) {
fclose(fp);
return false;
}
read_len = fread(buffer, 1u, (size_t)size, fp);
fclose(fp);
if (read_len != (size_t)size) {
free(buffer);
return false;
}
*out_bytes = buffer;
*out_len = (size_t)size;
return true;
}
static bool build_log_path(const char *root, char **out_path) {
char *index_path = NULL;
bool ok;
if (root == NULL || out_path == NULL) {
return false;
}
if (!join_path(root, "index", &index_path)) {
return false;
}
ok = join_path(index_path, "log.asl", out_path);
free(index_path);
return ok;
}
static bool build_segment_path(const char *root,
uint64_t segment_id,
char **out_path) {
int needed;
char *buffer;
if (root == NULL || out_path == NULL) {
return false;
}
needed = snprintf(NULL,
0,
"%s/index/segments/segment-%016" PRIx64 ".asl",
root,
segment_id);
if (needed <= 0) {
return false;
}
buffer = (char *)malloc((size_t)needed + 1u);
if (buffer == NULL) {
return false;
}
snprintf(buffer,
(size_t)needed + 1u,
"%s/index/segments/segment-%016" PRIx64 ".asl",
root,
segment_id);
*out_path = buffer;
return true;
}
static bool build_shard_segment_path(const char *root,
uint16_t shard_id,
uint64_t segment_id,
char **out_path) {
int needed;
char *buffer;
if (root == NULL || out_path == NULL) {
return false;
}
needed = snprintf(NULL,
0,
"%s/index/shards/shard-%016" PRIx64
"/segments/segment-%016" PRIx64 ".asl",
root,
(uint64_t)shard_id,
segment_id);
if (needed <= 0) {
return false;
}
buffer = (char *)malloc((size_t)needed + 1u);
if (buffer == NULL) {
return false;
}
snprintf(buffer,
(size_t)needed + 1u,
"%s/index/shards/shard-%016" PRIx64
"/segments/segment-%016" PRIx64 ".asl",
root,
(uint64_t)shard_id,
segment_id);
*out_path = buffer;
return true;
}
2026-01-18 05:54:31 +01:00
static uint64_t load_u64_le(const uint8_t *data) {
return (uint64_t)data[0] | ((uint64_t)data[1] << 8) |
((uint64_t)data[2] << 16) | ((uint64_t)data[3] << 24) |
((uint64_t)data[4] << 32) | ((uint64_t)data[5] << 40) |
((uint64_t)data[6] << 48) | ((uint64_t)data[7] << 56);
}
static uint16_t ref_shard(amduat_reference_t ref, uint16_t shard_count) {
2026-01-18 06:20:48 +01:00
return amduat_asl_index_accel_shard_for_ref(
ref, false, amduat_type_tag(0u), shard_count);
}
static bool parse_size_env(const char *value, size_t *out) {
char *endptr;
unsigned long long parsed;
if (value == NULL || out == NULL) {
return false;
}
errno = 0;
parsed = strtoull(value, &endptr, 10);
if (errno != 0 || endptr == value || *endptr != '\0') {
return false;
}
if (parsed > SIZE_MAX) {
return false;
}
*out = (size_t)parsed;
return true;
}
static uint64_t now_ns(void) {
struct timespec ts;
if (clock_gettime(CLOCK_MONOTONIC, &ts) != 0) {
return 0u;
}
return (uint64_t)ts.tv_sec * 1000u * 1000u * 1000u +
(uint64_t)ts.tv_nsec;
}
static void sleep_ms(unsigned int ms) {
struct timespec ts;
ts.tv_sec = (time_t)(ms / 1000u);
ts.tv_nsec = (long)((ms % 1000u) * 1000u * 1000u);
while (nanosleep(&ts, &ts) != 0 && errno == EINTR) {
}
}
static long current_rss_kb(void) {
struct rusage usage;
if (getrusage(RUSAGE_SELF, &usage) != 0) {
return -1;
}
return usage.ru_maxrss;
}
static bool parse_u64_env(const char *value, uint64_t *out) {
char *endptr;
unsigned long long parsed;
if (value == NULL || out == NULL) {
return false;
}
errno = 0;
parsed = strtoull(value, &endptr, 10);
if (errno != 0 || endptr == value || *endptr != '\0') {
return false;
}
if (parsed > UINT64_MAX) {
return false;
}
*out = (uint64_t)parsed;
return true;
}
2026-01-17 14:08:41 +01:00
static char *make_temp_root(void) {
char *templ;
const char template_prefix[] = "/tmp/amduat_test_asl_store_index_fs_XXXXXX";
templ = (char *)malloc(sizeof(template_prefix));
if (templ == NULL) {
return NULL;
}
memcpy(templ, template_prefix, sizeof(template_prefix));
if (mkdtemp(templ) == NULL) {
free(templ);
return NULL;
}
return templ;
}
static int test_round_trip(void) {
amduat_asl_store_config_t config;
amduat_asl_store_index_fs_t fs;
amduat_asl_store_t store;
amduat_asl_store_error_t err;
amduat_asl_index_state_t state;
amduat_asl_index_state_t current_state;
amduat_artifact_t artifact;
amduat_artifact_t loaded;
amduat_reference_t ref;
uint8_t payload[6];
char *root;
int exit_code = 1;
root = make_temp_root();
if (root == NULL) {
fprintf(stderr, "temp root failed\n");
return 1;
}
config.encoding_profile_id = AMDUAT_ENC_ASL1_CORE_V1;
config.hash_id = AMDUAT_HASH_ASL1_ID_SHA256;
if (!amduat_asl_store_index_fs_init(&fs, config, root)) {
fprintf(stderr, "index fs init failed\n");
goto cleanup;
}
amduat_asl_store_init(&store, config, amduat_asl_store_index_fs_ops(), &fs);
memset(payload, 0x5a, sizeof(payload));
artifact = amduat_artifact(amduat_octets(payload, sizeof(payload)));
ref = amduat_reference(0u, amduat_octets(NULL, 0u));
err = amduat_asl_store_put_indexed(&store, artifact, &ref, &state);
if (err != AMDUAT_ASL_STORE_OK) {
fprintf(stderr, "put_indexed failed: %d\n", err);
goto cleanup;
}
if (!amduat_asl_index_current_state(&store, &current_state)) {
fprintf(stderr, "current_state failed\n");
goto cleanup;
}
if (current_state.log_position != state.log_position) {
fprintf(stderr, "log position mismatch\n");
goto cleanup;
}
loaded = amduat_artifact(amduat_octets(NULL, 0u));
err = amduat_asl_store_get_indexed(&store, ref, state, &loaded);
if (err != AMDUAT_ASL_STORE_OK) {
fprintf(stderr, "get_indexed failed: %d\n", err);
goto cleanup;
}
if (!amduat_artifact_eq(artifact, loaded)) {
fprintf(stderr, "artifact mismatch\n");
amduat_artifact_free(&loaded);
goto cleanup;
}
amduat_artifact_free(&loaded);
if (state.log_position > 0u) {
amduat_asl_index_state_t cutoff = state;
cutoff.log_position = state.log_position - 1u;
loaded = amduat_artifact(amduat_octets(NULL, 0u));
err = amduat_asl_store_get_indexed(&store, ref, cutoff, &loaded);
if (err != AMDUAT_ASL_STORE_ERR_NOT_FOUND) {
fprintf(stderr, "cutoff lookup expected not found: %d\n", err);
amduat_artifact_free(&loaded);
goto cleanup;
}
}
{
char *segment_path = NULL;
uint8_t *segment_bytes = NULL;
size_t segment_len = 0u;
amduat_asl_core_index_segment_t segment;
bool bloom_nonzero = false;
size_t i;
memset(&segment, 0, sizeof(segment));
if (!build_segment_path(root, 1u, &segment_path)) {
fprintf(stderr, "segment path build failed\n");
goto cleanup;
}
if (!read_file(segment_path, &segment_bytes, &segment_len)) {
fprintf(stderr, "segment read failed\n");
free(segment_path);
goto cleanup;
}
free(segment_path);
if (!amduat_enc_asl_core_index_decode_v1(
amduat_octets(segment_bytes, segment_len), &segment)) {
fprintf(stderr, "segment decode failed\n");
free(segment_bytes);
goto cleanup;
}
free(segment_bytes);
if (segment.bloom.len != AMDUAT_ASL_INDEX_BLOOM_BYTES) {
fprintf(stderr, "segment bloom size mismatch\n");
amduat_enc_asl_core_index_free(&segment);
goto cleanup;
}
if (!amduat_asl_index_bloom_maybe_contains(segment.bloom,
ref.hash_id,
ref.digest)) {
fprintf(stderr, "segment bloom missing digest\n");
amduat_enc_asl_core_index_free(&segment);
goto cleanup;
}
for (i = 0u; i < segment.bloom.len; ++i) {
if (segment.bloom.data[i] != 0u) {
bloom_nonzero = true;
break;
}
}
if (!bloom_nonzero) {
fprintf(stderr, "segment bloom was empty\n");
amduat_enc_asl_core_index_free(&segment);
goto cleanup;
}
amduat_enc_asl_core_index_free(&segment);
}
2026-01-17 14:08:41 +01:00
exit_code = 0;
cleanup:
amduat_reference_free(&ref);
if (!remove_tree(root)) {
fprintf(stderr, "cleanup failed\n");
exit_code = 1;
}
free(root);
return exit_code;
}
static int test_shard_routing(void) {
amduat_asl_store_config_t config;
amduat_asl_store_index_fs_t fs;
amduat_asl_store_t store;
char *root;
uint8_t payload_a[2] = {0x10, 0x11};
uint8_t payload_b[2] = {0x20, 0x21};
amduat_reference_t ref_a;
amduat_reference_t ref_b;
amduat_octets_t bytes_a;
amduat_octets_t bytes_b;
amduat_hash_id_t hash_id;
uint16_t shard_a;
uint16_t shard_b;
uint64_t local_counts[2] = {0u, 0u};
int exit_code = 1;
size_t i;
bool found = false;
root = make_temp_root();
if (root == NULL) {
fprintf(stderr, "temp root failed\n");
return 1;
}
memset(&config, 0, sizeof(config));
config.encoding_profile_id = AMDUAT_ENC_ASL1_CORE_V1;
config.hash_id = AMDUAT_HASH_ASL1_ID_SHA256;
if (!amduat_asl_store_index_fs_init(&fs, config, root)) {
fprintf(stderr, "index fs init failed\n");
goto cleanup;
}
amduat_asl_store_index_fs_set_shard_count(&fs, 2);
amduat_asl_store_init(&store, config, amduat_asl_store_index_fs_ops(), &fs);
hash_id = config.hash_id;
ref_a = amduat_reference(0u, amduat_octets(NULL, 0u));
ref_b = amduat_reference(0u, amduat_octets(NULL, 0u));
bytes_a = amduat_octets(NULL, 0u);
bytes_b = amduat_octets(NULL, 0u);
for (i = 0; i < 256; ++i) {
payload_a[0] = (uint8_t)i;
payload_b[0] = (uint8_t)(255u - i);
amduat_reference_free(&ref_a);
amduat_reference_free(&ref_b);
amduat_octets_free(&bytes_a);
amduat_octets_free(&bytes_b);
if (!amduat_asl_ref_derive(amduat_artifact(amduat_octets(payload_a,
sizeof(payload_a))),
AMDUAT_ENC_ASL1_CORE_V1,
hash_id,
&ref_a,
&bytes_a) ||
!amduat_asl_ref_derive(amduat_artifact(amduat_octets(payload_b,
sizeof(payload_b))),
AMDUAT_ENC_ASL1_CORE_V1,
hash_id,
&ref_b,
&bytes_b)) {
fprintf(stderr, "ref derive failed\n");
goto cleanup;
}
shard_a = ref_shard(ref_a, 2);
shard_b = ref_shard(ref_b, 2);
if (shard_a != shard_b) {
found = true;
break;
}
}
if (!found) {
fprintf(stderr, "shard selection failed\n");
goto cleanup;
}
{
amduat_reference_t stored_ref;
amduat_asl_index_state_t state;
uint64_t segment_id;
char *segment_path = NULL;
amduat_asl_store_error_t err;
stored_ref = amduat_reference(0u, amduat_octets(NULL, 0u));
err = amduat_asl_store_put_indexed(&store,
amduat_artifact(amduat_octets(payload_a,
sizeof(payload_a))),
&stored_ref,
&state);
if (err != AMDUAT_ASL_STORE_OK) {
fprintf(stderr, "put shard a failed\n");
amduat_reference_free(&stored_ref);
goto cleanup;
}
amduat_reference_free(&stored_ref);
local_counts[shard_a] += 1u;
segment_id = ((uint64_t)shard_a << 48) | local_counts[shard_a];
if (!build_shard_segment_path(root, shard_a, segment_id, &segment_path)) {
fprintf(stderr, "segment path failed\n");
goto cleanup;
}
if (access(segment_path, F_OK) != 0) {
fprintf(stderr, "segment missing in shard a\n");
free(segment_path);
goto cleanup;
}
free(segment_path);
}
{
amduat_reference_t stored_ref;
amduat_asl_index_state_t state;
uint64_t segment_id;
char *segment_path = NULL;
amduat_asl_store_error_t err;
stored_ref = amduat_reference(0u, amduat_octets(NULL, 0u));
err = amduat_asl_store_put_indexed(&store,
amduat_artifact(amduat_octets(payload_b,
sizeof(payload_b))),
&stored_ref,
&state);
if (err != AMDUAT_ASL_STORE_OK) {
fprintf(stderr, "put shard b failed\n");
amduat_reference_free(&stored_ref);
goto cleanup;
}
amduat_reference_free(&stored_ref);
local_counts[shard_b] += 1u;
segment_id = ((uint64_t)shard_b << 48) | local_counts[shard_b];
if (!build_shard_segment_path(root, shard_b, segment_id, &segment_path)) {
fprintf(stderr, "segment path failed\n");
goto cleanup;
}
if (access(segment_path, F_OK) != 0) {
fprintf(stderr, "segment missing in shard b\n");
free(segment_path);
goto cleanup;
}
free(segment_path);
}
exit_code = 0;
cleanup:
amduat_reference_free(&ref_a);
amduat_reference_free(&ref_b);
amduat_octets_free(&bytes_a);
amduat_octets_free(&bytes_b);
if (!remove_tree(root)) {
fprintf(stderr, "cleanup failed\n");
exit_code = 1;
}
free(root);
return exit_code;
}
static int test_snapshot_truncation(void) {
amduat_asl_store_config_t config;
amduat_asl_store_index_fs_t fs;
amduat_asl_store_t store;
amduat_asl_store_error_t err;
amduat_asl_index_state_t state;
amduat_artifact_t artifact;
amduat_artifact_t loaded;
amduat_reference_t ref;
uint8_t payload[8];
char *root;
char *log_path = NULL;
uint8_t *log_before = NULL;
uint8_t *log_after = NULL;
size_t log_before_len = 0u;
size_t log_after_len = 0u;
int exit_code = 1;
root = make_temp_root();
if (root == NULL) {
fprintf(stderr, "temp root failed\n");
return 1;
}
memset(&config, 0, sizeof(config));
config.encoding_profile_id = AMDUAT_ENC_ASL1_CORE_V1;
config.hash_id = AMDUAT_HASH_ASL1_ID_SHA256;
if (!amduat_asl_store_index_fs_init(&fs, config, root)) {
fprintf(stderr, "index fs init failed\n");
goto cleanup;
}
amduat_asl_store_init(&store, config, amduat_asl_store_index_fs_ops(), &fs);
memset(payload, 0x33, sizeof(payload));
artifact = amduat_artifact(amduat_octets(payload, sizeof(payload)));
ref = amduat_reference(0u, amduat_octets(NULL, 0u));
err = amduat_asl_store_put_indexed(&store, artifact, &ref, &state);
if (err != AMDUAT_ASL_STORE_OK) {
fprintf(stderr, "put_indexed failed: %d\n", err);
goto cleanup;
}
if (!build_log_path(root, &log_path) ||
!read_file(log_path, &log_before, &log_before_len)) {
fprintf(stderr, "log read before snapshot failed\n");
goto cleanup;
}
err = amduat_asl_store_index_fs_snapshot_create(&fs, 1u, NULL, NULL);
if (err != AMDUAT_ASL_STORE_OK) {
fprintf(stderr, "snapshot create failed: %d\n", err);
goto cleanup;
}
if (!read_file(log_path, &log_after, &log_after_len)) {
fprintf(stderr, "log read after snapshot failed\n");
goto cleanup;
}
if (log_after_len >= log_before_len) {
fprintf(stderr, "log did not shrink after snapshot\n");
goto cleanup;
}
if (!amduat_asl_index_current_state(&store, &state)) {
fprintf(stderr, "current_state failed\n");
goto cleanup;
}
loaded = amduat_artifact(amduat_octets(NULL, 0u));
err = amduat_asl_store_get_indexed(&store, ref, state, &loaded);
if (err != AMDUAT_ASL_STORE_OK) {
fprintf(stderr, "get_indexed after snapshot failed: %d\n", err);
goto cleanup;
}
if (!amduat_artifact_eq(artifact, loaded)) {
fprintf(stderr, "artifact mismatch after snapshot\n");
amduat_artifact_free(&loaded);
goto cleanup;
}
amduat_artifact_free(&loaded);
exit_code = 0;
cleanup:
free(log_path);
free(log_before);
free(log_after);
amduat_reference_free(&ref);
if (!remove_tree(root)) {
fprintf(stderr, "cleanup failed\n");
exit_code = 1;
}
free(root);
return exit_code;
}
2026-01-18 05:54:31 +01:00
static int test_segment_batch_packing(void) {
amduat_asl_store_config_t config;
amduat_asl_store_index_fs_t fs;
amduat_asl_store_t store;
amduat_asl_store_index_fs_segment_policy_t segment_policy;
amduat_asl_store_index_fs_visibility_policy_t visibility_policy;
amduat_asl_store_error_t err;
amduat_asl_index_state_t state_a;
amduat_asl_index_state_t state_b;
amduat_artifact_t artifact_a;
amduat_artifact_t artifact_b;
amduat_artifact_t loaded;
amduat_reference_t ref_a;
amduat_reference_t ref_b;
uint8_t payload_a[3] = {0x01, 0x02, 0x03};
uint8_t payload_b[4] = {0x10, 0x11, 0x12, 0x13};
char *root;
amduat_asl_log_record_t *records = NULL;
size_t record_count = 0u;
uint64_t segment_id = 0u;
char *segment_path = NULL;
uint8_t *segment_bytes = NULL;
size_t segment_len = 0u;
amduat_asl_core_index_segment_t segment;
int exit_code = 1;
root = make_temp_root();
if (root == NULL) {
fprintf(stderr, "temp root failed\n");
return 1;
}
memset(&config, 0, sizeof(config));
config.encoding_profile_id = AMDUAT_ENC_ASL1_CORE_V1;
config.hash_id = AMDUAT_HASH_ASL1_ID_SHA256;
if (!amduat_asl_store_index_fs_init(&fs, config, root)) {
fprintf(stderr, "index fs init failed\n");
goto cleanup;
}
segment_policy.max_segment_records = 2u;
segment_policy.max_segment_bytes = 0u;
segment_policy.small_artifact_threshold = 1024u;
segment_policy.allow_deferred_visibility = true;
amduat_asl_store_index_fs_set_segment_policy(&fs, segment_policy);
visibility_policy.segment_domain_id = 7u;
visibility_policy.record_visibility = 1u;
amduat_asl_store_index_fs_set_visibility_policy(&fs, visibility_policy);
amduat_asl_store_init(&store, config, amduat_asl_store_index_fs_ops(), &fs);
artifact_a = amduat_artifact(amduat_octets(payload_a, sizeof(payload_a)));
artifact_b = amduat_artifact(amduat_octets(payload_b, sizeof(payload_b)));
ref_a = amduat_reference(0u, amduat_octets(NULL, 0u));
ref_b = amduat_reference(0u, amduat_octets(NULL, 0u));
err = amduat_asl_store_put_indexed(&store, artifact_a, &ref_a, &state_a);
if (err != AMDUAT_ASL_STORE_OK) {
fprintf(stderr, "put_indexed a failed: %d\n", err);
goto cleanup;
}
loaded = amduat_artifact(amduat_octets(NULL, 0u));
err = amduat_asl_store_get_indexed(&store, ref_a, state_a, &loaded);
if (err != AMDUAT_ASL_STORE_ERR_NOT_FOUND) {
fprintf(stderr, "expected a to be pending before seal: %d\n", err);
amduat_artifact_free(&loaded);
goto cleanup;
}
amduat_artifact_free(&loaded);
err = amduat_asl_store_put_indexed(&store, artifact_b, &ref_b, &state_b);
if (err != AMDUAT_ASL_STORE_OK) {
fprintf(stderr, "put_indexed b failed: %d\n", err);
goto cleanup;
}
loaded = amduat_artifact(amduat_octets(NULL, 0u));
err = amduat_asl_store_get_indexed(&store, ref_a, state_b, &loaded);
if (err != AMDUAT_ASL_STORE_OK) {
fprintf(stderr, "get_indexed a after seal failed: %d\n", err);
goto cleanup;
}
amduat_artifact_free(&loaded);
loaded = amduat_artifact(amduat_octets(NULL, 0u));
err = amduat_asl_store_get_indexed(&store, ref_b, state_b, &loaded);
if (err != AMDUAT_ASL_STORE_OK) {
fprintf(stderr, "get_indexed b after seal failed: %d\n", err);
goto cleanup;
}
amduat_artifact_free(&loaded);
err = amduat_asl_log_scan(&store, &records, &record_count);
if (err != AMDUAT_ASL_STORE_OK || record_count != 1u ||
records[0].record_type != AMDUAT_ASL_LOG_RECORD_SEGMENT_SEAL) {
fprintf(stderr, "log scan expected one segment seal\n");
goto cleanup;
}
if (records[0].payload.len < 8u) {
fprintf(stderr, "segment seal payload too small\n");
goto cleanup;
}
segment_id = load_u64_le(records[0].payload.data);
memset(&segment, 0, sizeof(segment));
if (!build_segment_path(root, segment_id, &segment_path)) {
fprintf(stderr, "segment path build failed\n");
goto cleanup;
}
if (!read_file(segment_path, &segment_bytes, &segment_len)) {
fprintf(stderr, "segment read failed\n");
goto cleanup;
}
if (!amduat_enc_asl_core_index_decode_v1(
amduat_octets(segment_bytes, segment_len), &segment)) {
fprintf(stderr, "segment decode failed\n");
goto cleanup;
}
if (segment.record_count != 2u || segment.extent_count != 2u) {
fprintf(stderr, "segment record/extent count mismatch\n");
goto cleanup;
}
if (segment.header.segment_visibility != 1u ||
segment.header.segment_domain_id != 7u) {
fprintf(stderr, "segment visibility metadata mismatch\n");
goto cleanup;
}
if (segment.records[0].visibility != 1u ||
segment.records[1].visibility != 1u ||
segment.records[0].domain_id != 7u ||
segment.records[1].domain_id != 7u) {
fprintf(stderr, "record visibility metadata mismatch\n");
goto cleanup;
}
exit_code = 0;
cleanup:
amduat_enc_asl_core_index_free(&segment);
free(segment_path);
free(segment_bytes);
amduat_enc_asl_log_free(records, record_count);
amduat_reference_free(&ref_a);
amduat_reference_free(&ref_b);
if (!remove_tree(root)) {
fprintf(stderr, "cleanup failed\n");
exit_code = 1;
}
free(root);
return exit_code;
}
static int test_tombstone_round_trip(void) {
amduat_asl_store_config_t config;
amduat_asl_store_index_fs_t fs;
amduat_asl_store_t store;
amduat_asl_store_error_t err;
amduat_asl_index_state_t state;
amduat_asl_index_state_t tombstone_state;
amduat_asl_index_state_t lift_state;
amduat_artifact_t artifact;
amduat_artifact_t loaded;
amduat_reference_t ref;
uint8_t payload[4];
char *root;
amduat_asl_log_record_t *records = NULL;
size_t record_count = 0u;
int exit_code = 1;
root = make_temp_root();
if (root == NULL) {
fprintf(stderr, "temp root failed\n");
return 1;
}
memset(&config, 0, sizeof(config));
config.encoding_profile_id = AMDUAT_ENC_ASL1_CORE_V1;
config.hash_id = AMDUAT_HASH_ASL1_ID_SHA256;
if (!amduat_asl_store_index_fs_init(&fs, config, root)) {
fprintf(stderr, "index fs init failed\n");
goto cleanup;
}
amduat_asl_store_init(&store, config, amduat_asl_store_index_fs_ops(), &fs);
memset(payload, 0x2a, sizeof(payload));
artifact = amduat_artifact(amduat_octets(payload, sizeof(payload)));
ref = amduat_reference(0u, amduat_octets(NULL, 0u));
err = amduat_asl_store_put_indexed(&store, artifact, &ref, &state);
if (err != AMDUAT_ASL_STORE_OK) {
fprintf(stderr, "put_indexed failed: %d\n", err);
goto cleanup;
}
loaded = amduat_artifact(amduat_octets(NULL, 0u));
err = amduat_asl_store_get_indexed(&store, ref, state, &loaded);
if (err != AMDUAT_ASL_STORE_OK) {
fprintf(stderr, "get_indexed failed: %d\n", err);
goto cleanup;
}
amduat_artifact_free(&loaded);
err = amduat_asl_store_tombstone(&store, ref, 7u, 9u, &tombstone_state);
if (err != AMDUAT_ASL_STORE_OK) {
fprintf(stderr, "tombstone failed: %d\n", err);
goto cleanup;
}
if (tombstone_state.log_position <= state.log_position) {
fprintf(stderr, "tombstone did not advance log position\n");
goto cleanup;
}
if (!amduat_asl_index_current_state(&store, &state)) {
fprintf(stderr, "current_state failed\n");
goto cleanup;
}
loaded = amduat_artifact(amduat_octets(NULL, 0u));
err = amduat_asl_store_get_indexed(&store, ref, state, &loaded);
if (err != AMDUAT_ASL_STORE_ERR_NOT_FOUND) {
fprintf(stderr, "get_indexed after tombstone expected not found: %d\n", err);
amduat_artifact_free(&loaded);
goto cleanup;
}
amduat_artifact_free(&loaded);
err = amduat_asl_store_tombstone_lift(&store,
ref,
tombstone_state.log_position,
&lift_state);
if (err != AMDUAT_ASL_STORE_OK) {
fprintf(stderr, "tombstone_lift failed: %d\n", err);
goto cleanup;
}
if (lift_state.log_position <= tombstone_state.log_position) {
fprintf(stderr, "tombstone_lift did not advance log position\n");
goto cleanup;
}
if (!amduat_asl_index_current_state(&store, &state)) {
fprintf(stderr, "current_state failed\n");
goto cleanup;
}
loaded = amduat_artifact(amduat_octets(NULL, 0u));
err = amduat_asl_store_get_indexed(&store, ref, state, &loaded);
if (err != AMDUAT_ASL_STORE_OK) {
fprintf(stderr, "get_indexed after tombstone lift failed: %d\n", err);
goto cleanup;
}
amduat_artifact_free(&loaded);
err = amduat_asl_log_scan(&store, &records, &record_count);
if (err != AMDUAT_ASL_STORE_OK) {
fprintf(stderr, "log scan failed: %d\n", err);
goto cleanup;
}
if (record_count < 3u ||
records[record_count - 1u].record_type !=
AMDUAT_ASL_LOG_RECORD_TOMBSTONE_LIFT) {
fprintf(stderr, "log scan missing tombstone lift record\n");
goto cleanup;
}
exit_code = 0;
cleanup:
amduat_enc_asl_log_free(records, record_count);
amduat_reference_free(&ref);
if (!remove_tree(root)) {
fprintf(stderr, "cleanup failed\n");
exit_code = 1;
}
free(root);
return exit_code;
}
static int test_gc_keeps_visible(void) {
amduat_asl_store_config_t config;
amduat_asl_store_index_fs_t fs;
amduat_asl_store_t store;
amduat_asl_store_error_t err;
amduat_asl_index_state_t state;
amduat_artifact_t artifact;
amduat_artifact_t loaded;
amduat_reference_t ref;
uint8_t payload[5];
char *root;
int exit_code = 1;
root = make_temp_root();
if (root == NULL) {
fprintf(stderr, "temp root failed\n");
return 1;
}
memset(&config, 0, sizeof(config));
config.encoding_profile_id = AMDUAT_ENC_ASL1_CORE_V1;
config.hash_id = AMDUAT_HASH_ASL1_ID_SHA256;
if (!amduat_asl_store_index_fs_init(&fs, config, root)) {
fprintf(stderr, "index fs init failed\n");
goto cleanup;
}
amduat_asl_store_init(&store, config, amduat_asl_store_index_fs_ops(), &fs);
memset(payload, 0x77, sizeof(payload));
artifact = amduat_artifact(amduat_octets(payload, sizeof(payload)));
ref = amduat_reference(0u, amduat_octets(NULL, 0u));
err = amduat_asl_store_put_indexed(&store, artifact, &ref, &state);
if (err != AMDUAT_ASL_STORE_OK) {
fprintf(stderr, "put_indexed failed: %d\n", err);
goto cleanup;
}
err = amduat_asl_store_index_fs_gc(&fs, &state);
if (err != AMDUAT_ASL_STORE_OK) {
fprintf(stderr, "gc failed: %d\n", err);
goto cleanup;
}
loaded = amduat_artifact(amduat_octets(NULL, 0u));
err = amduat_asl_store_get_indexed(&store, ref, state, &loaded);
if (err != AMDUAT_ASL_STORE_OK) {
fprintf(stderr, "get_indexed after gc failed: %d\n", err);
goto cleanup;
}
amduat_artifact_free(&loaded);
exit_code = 0;
cleanup:
amduat_reference_free(&ref);
if (!remove_tree(root)) {
fprintf(stderr, "cleanup failed\n");
exit_code = 1;
}
free(root);
return exit_code;
}
static int test_large_round_trip_perf(void) {
enum { k_artifact_count_default = 10000, k_payload_max = 64 };
amduat_asl_store_config_t config;
amduat_asl_store_index_fs_t fs;
amduat_asl_store_t store;
amduat_asl_store_error_t err;
amduat_asl_index_state_t state;
amduat_reference_t *refs = NULL;
uint8_t **payloads = NULL;
size_t *payload_lens = NULL;
char *root;
int exit_code = 1;
size_t i;
size_t artifact_count;
uint64_t start_ns;
uint64_t end_ns;
long rss_kb;
const char *count_env;
root = make_temp_root();
if (root == NULL) {
fprintf(stderr, "temp root failed\n");
return 1;
}
artifact_count = k_artifact_count_default;
count_env = getenv("AMDUAT_ASL_PERF_COUNT");
if (count_env != NULL) {
size_t parsed;
if (parse_size_env(count_env, &parsed) && parsed > 0u) {
artifact_count = parsed;
}
}
refs = (amduat_reference_t *)calloc(artifact_count, sizeof(*refs));
payloads = (uint8_t **)calloc(artifact_count, sizeof(*payloads));
payload_lens = (size_t *)calloc(artifact_count, sizeof(*payload_lens));
if (refs == NULL || payloads == NULL || payload_lens == NULL) {
fprintf(stderr, "alloc failed\n");
goto cleanup;
}
memset(&config, 0, sizeof(config));
config.encoding_profile_id = AMDUAT_ENC_ASL1_CORE_V1;
config.hash_id = AMDUAT_HASH_ASL1_ID_SHA256;
if (!amduat_asl_store_index_fs_init(&fs, config, root)) {
fprintf(stderr, "index fs init failed\n");
goto cleanup;
}
amduat_asl_store_init(&store, config, amduat_asl_store_index_fs_ops(), &fs);
srand(1337);
start_ns = now_ns();
for (i = 0; i < artifact_count; ++i) {
size_t len = (size_t)((rand() % k_payload_max) + 1);
uint8_t *payload = (uint8_t *)malloc(len);
if (payload == NULL) {
fprintf(stderr, "payload alloc failed\n");
goto cleanup;
}
for (size_t j = 0; j < len; ++j) {
payload[j] = (uint8_t)(rand() & 0xffu);
}
payloads[i] = payload;
payload_lens[i] = len;
refs[i] = amduat_reference(0u, amduat_octets(NULL, 0u));
err = amduat_asl_store_put(&store,
amduat_artifact(amduat_octets(payload, len)),
&refs[i]);
if (err != AMDUAT_ASL_STORE_OK) {
fprintf(stderr, "put failed: %d\n", err);
goto cleanup;
}
}
end_ns = now_ns();
rss_kb = current_rss_kb();
if (start_ns != 0u && end_ns != 0u) {
fprintf(stderr,
"perf: put %zu artifacts in %.3f ms\n",
artifact_count,
(double)(end_ns - start_ns) / 1000000.0);
}
if (rss_kb >= 0) {
fprintf(stderr, "perf: maxrss %ld KB\n", rss_kb);
}
if (!amduat_asl_index_current_state(&store, &state)) {
fprintf(stderr, "current_state failed\n");
goto cleanup;
}
start_ns = now_ns();
for (i = 0; i < artifact_count; ++i) {
amduat_artifact_t loaded =
amduat_artifact(amduat_octets(NULL, 0u));
err = amduat_asl_store_get_indexed(&store, refs[i], state, &loaded);
if (err != AMDUAT_ASL_STORE_OK) {
char *ref_text = NULL;
(void)amduat_format_ref_to_text(refs[i], AMDUAT_FORMAT_REF_HEX,
&ref_text);
fprintf(stderr,
"get failed: %d index=%zu snapshot=%" PRIu64 " log=%" PRIu64
" ref=%s\n",
err,
i,
state.snapshot_id,
state.log_position,
ref_text == NULL ? "(null)" : ref_text);
free(ref_text);
goto cleanup;
}
if (loaded.bytes.len != payload_lens[i] ||
(loaded.bytes.len != 0u &&
memcmp(loaded.bytes.data, payloads[i], loaded.bytes.len) != 0)) {
fprintf(stderr, "payload mismatch\n");
amduat_artifact_free(&loaded);
goto cleanup;
}
amduat_artifact_free(&loaded);
}
end_ns = now_ns();
if (start_ns != 0u && end_ns != 0u) {
fprintf(stderr,
"perf: get %zu artifacts in %.3f ms\n",
artifact_count,
(double)(end_ns - start_ns) / 1000000.0);
}
exit_code = 0;
cleanup:
if (refs != NULL) {
for (i = 0; i < artifact_count; ++i) {
amduat_reference_free(&refs[i]);
}
}
if (payloads != NULL) {
for (i = 0; i < artifact_count; ++i) {
free(payloads[i]);
}
}
free(payloads);
free(payload_lens);
free(refs);
if (!remove_tree(root)) {
fprintf(stderr, "cleanup failed\n");
exit_code = 1;
}
free(root);
return exit_code;
}
typedef struct {
const char *root;
const char *log_path;
amduat_asl_store_config_t config;
uint64_t duration_ns;
volatile bool stop;
volatile bool failed;
volatile uint64_t put_count;
volatile uint64_t get_count;
} stress_state_t;
static void *stress_writer_thread(void *arg) {
stress_state_t *state = (stress_state_t *)arg;
const amduat_hash_asl1_desc_t *hash_desc;
amduat_asl_store_index_fs_t fs;
amduat_asl_store_t store;
amduat_asl_store_index_fs_snapshot_policy_t policy;
uint8_t *payload = NULL;
FILE *log_fp = NULL;
uint64_t start_ns;
uint64_t now;
uint64_t flush_count = 0u;
if (state == NULL) {
return NULL;
}
hash_desc = amduat_hash_asl1_desc_lookup(state->config.hash_id);
if (hash_desc == NULL || hash_desc->digest_len == 0) {
state->failed = true;
state->stop = true;
return NULL;
}
if (!amduat_asl_store_index_fs_init(&fs, state->config, state->root)) {
state->failed = true;
state->stop = true;
return NULL;
}
memset(&policy, 0, sizeof(policy));
policy.enabled = false;
amduat_asl_store_index_fs_set_snapshot_policy(&fs, policy);
amduat_asl_store_init(&store, state->config, amduat_asl_store_index_fs_ops(),
&fs);
payload = (uint8_t *)calloc(hash_desc->digest_len, 1u);
if (payload == NULL) {
state->failed = true;
state->stop = true;
return NULL;
}
log_fp = fopen(state->log_path, "ab");
if (log_fp == NULL) {
free(payload);
state->failed = true;
state->stop = true;
return NULL;
}
start_ns = now_ns();
while (!state->stop) {
amduat_reference_t ref = amduat_reference(0u, amduat_octets(NULL, 0u));
amduat_asl_store_error_t err;
char *hex = NULL;
err = amduat_asl_store_put(&store,
amduat_artifact(amduat_octets(
payload, hash_desc->digest_len)),
&ref);
if (err != AMDUAT_ASL_STORE_OK) {
state->failed = true;
state->stop = true;
break;
}
if (ref.digest.len == hash_desc->digest_len &&
ref.digest.data != NULL) {
memcpy(payload, ref.digest.data, ref.digest.len);
} else {
amduat_reference_free(&ref);
state->failed = true;
state->stop = true;
break;
}
if (!amduat_asl_ref_encode_hex(ref, &hex) || hex == NULL) {
amduat_reference_free(&ref);
state->failed = true;
state->stop = true;
break;
}
fprintf(log_fp, "%s\n", hex);
free(hex);
amduat_reference_free(&ref);
__sync_fetch_and_add(&state->put_count, 1u);
flush_count++;
if ((flush_count % 256u) == 0u) {
fflush(log_fp);
}
now = now_ns();
if (now != 0u && now - start_ns >= state->duration_ns) {
state->stop = true;
break;
}
}
fflush(log_fp);
fclose(log_fp);
free(payload);
return NULL;
}
static void *stress_reader_thread(void *arg) {
stress_state_t *state = (stress_state_t *)arg;
amduat_asl_store_index_fs_t fs;
amduat_asl_store_t store;
amduat_asl_store_index_fs_snapshot_policy_t policy;
FILE *log_fp = NULL;
unsigned int seed = 1337u;
struct stat st;
if (state == NULL) {
return NULL;
}
if (!amduat_asl_store_index_fs_init(&fs, state->config, state->root)) {
state->failed = true;
state->stop = true;
return NULL;
}
memset(&policy, 0, sizeof(policy));
policy.enabled = false;
amduat_asl_store_index_fs_set_snapshot_policy(&fs, policy);
amduat_asl_store_init(&store, state->config, amduat_asl_store_index_fs_ops(),
&fs);
log_fp = fopen(state->log_path, "rb");
while (log_fp == NULL && !state->stop) {
if (errno != ENOENT) {
state->failed = true;
state->stop = true;
return NULL;
}
sleep_ms(10u);
log_fp = fopen(state->log_path, "rb");
}
if (log_fp == NULL) {
state->failed = true;
state->stop = true;
return NULL;
}
while (!state->stop) {
char line[256];
long offset;
size_t len;
amduat_reference_t ref;
if (fstat(fileno(log_fp), &st) != 0) {
state->failed = true;
state->stop = true;
break;
}
if (st.st_size <= 0) {
continue;
}
offset = (long)(rand_r(&seed) % (unsigned int)st.st_size);
if (fseek(log_fp, offset, SEEK_SET) != 0) {
state->failed = true;
state->stop = true;
break;
}
if (offset != 0) {
if (fgets(line, sizeof(line), log_fp) == NULL) {
continue;
}
}
if (fgets(line, sizeof(line), log_fp) == NULL) {
continue;
}
len = strlen(line);
if (len == 0u || line[len - 1u] != '\n') {
continue;
}
line[len - 1u] = '\0';
if (!amduat_asl_ref_decode_hex(line, &ref)) {
fprintf(stderr, "stress: decode ref failed\n");
state->failed = true;
state->stop = true;
break;
}
{
amduat_artifact_t loaded =
amduat_artifact(amduat_octets(NULL, 0u));
amduat_asl_store_error_t err =
amduat_asl_store_get(&store, ref, &loaded);
if (err != AMDUAT_ASL_STORE_OK) {
char *ref_text = NULL;
(void)amduat_format_ref_to_text(ref, AMDUAT_FORMAT_REF_HEX,
&ref_text);
fprintf(stderr,
"stress: get failed: %d ref=%s\n",
err,
ref_text == NULL ? "(null)" : ref_text);
free(ref_text);
state->failed = true;
state->stop = true;
amduat_artifact_free(&loaded);
amduat_reference_free(&ref);
break;
}
amduat_artifact_free(&loaded);
__sync_fetch_and_add(&state->get_count, 1u);
}
amduat_reference_free(&ref);
}
fclose(log_fp);
return NULL;
}
static int test_writer_reader_stress(void) {
amduat_asl_store_config_t config;
pthread_t writer;
pthread_t reader;
stress_state_t state;
char *root;
char *log_path = NULL;
uint64_t duration_ns = 60u * 1000u * 1000u * 1000u;
const char *duration_env;
uint64_t start_ns;
uint64_t last_ns;
uint64_t now;
uint64_t last_put = 0u;
uint64_t last_get = 0u;
double put_rate_first = 0.0;
double get_rate_first = 0.0;
double put_rate_last = 0.0;
double get_rate_last = 0.0;
double put_rate_min = 0.0;
double put_rate_max = 0.0;
double get_rate_min = 0.0;
double get_rate_max = 0.0;
double put_rate_sum = 0.0;
double get_rate_sum = 0.0;
uint64_t rate_samples = 0u;
root = make_temp_root();
if (root == NULL) {
fprintf(stderr, "temp root failed\n");
return 1;
}
if (!join_path(root, "refs.log", &log_path)) {
fprintf(stderr, "log path failed\n");
free(root);
return 1;
}
memset(&config, 0, sizeof(config));
config.encoding_profile_id = AMDUAT_ENC_ASL1_CORE_V1;
config.hash_id = AMDUAT_HASH_ASL1_ID_SHA256;
duration_env = getenv("AMDUAT_ASL_STRESS_SECS");
if (duration_env != NULL) {
uint64_t seconds;
if (parse_u64_env(duration_env, &seconds)) {
duration_ns = seconds * 1000u * 1000u * 1000u;
}
}
memset(&state, 0, sizeof(state));
state.root = root;
state.log_path = log_path;
state.config = config;
state.duration_ns = duration_ns;
state.stop = false;
state.failed = false;
start_ns = now_ns();
last_ns = start_ns;
if (pthread_create(&writer, NULL, stress_writer_thread, &state) != 0 ||
pthread_create(&reader, NULL, stress_reader_thread, &state) != 0) {
fprintf(stderr, "thread start failed\n");
state.stop = true;
}
while (!state.stop) {
now = now_ns();
if (now != 0u && now - last_ns >= 5u * 1000u * 1000u * 1000u) {
uint64_t puts = state.put_count;
uint64_t gets = state.get_count;
double elapsed_s = (double)(now - last_ns) / 1000000000.0;
double put_rate = (puts - last_put) / (elapsed_s > 0 ? elapsed_s : 1.0);
double get_rate = (gets - last_get) / (elapsed_s > 0 ? elapsed_s : 1.0);
if (rate_samples == 0u) {
put_rate_first = put_rate;
get_rate_first = get_rate;
put_rate_min = put_rate;
put_rate_max = put_rate;
get_rate_min = get_rate;
get_rate_max = get_rate;
} else {
if (put_rate < put_rate_min) {
put_rate_min = put_rate;
}
if (put_rate > put_rate_max) {
put_rate_max = put_rate;
}
if (get_rate < get_rate_min) {
get_rate_min = get_rate;
}
if (get_rate > get_rate_max) {
get_rate_max = get_rate;
}
}
put_rate_last = put_rate;
get_rate_last = get_rate;
put_rate_sum += put_rate;
get_rate_sum += get_rate;
rate_samples++;
fprintf(stderr,
"stress: t=%.1fs puts=%" PRIu64 " gets=%" PRIu64
" put/s=%.1f get/s=%.1f\n",
(double)(now - start_ns) / 1000000000.0,
puts,
gets,
put_rate,
get_rate);
last_put = puts;
last_get = gets;
last_ns = now;
}
if (now != 0u && now - start_ns >= duration_ns) {
state.stop = true;
}
if (state.failed) {
state.stop = true;
}
sleep_ms(10u);
}
pthread_join(writer, NULL);
pthread_join(reader, NULL);
if (state.failed) {
fprintf(stderr, "stress: failed\n");
}
if (rate_samples > 0u) {
fprintf(stderr,
"stress: put/s first=%.1f last=%.1f min=%.1f max=%.1f avg=%.1f\n",
put_rate_first,
put_rate_last,
put_rate_min,
put_rate_max,
put_rate_sum / (double)rate_samples);
fprintf(stderr,
"stress: get/s first=%.1f last=%.1f min=%.1f max=%.1f avg=%.1f\n",
get_rate_first,
get_rate_last,
get_rate_min,
get_rate_max,
get_rate_sum / (double)rate_samples);
}
free(log_path);
if (!remove_tree(root)) {
fprintf(stderr, "cleanup failed\n");
free(root);
return 1;
}
free(root);
return state.failed ? 1 : 0;
}
2026-01-17 14:08:41 +01:00
int main(void) {
if (test_round_trip() != 0) {
return 1;
}
if (test_snapshot_truncation() != 0) {
return 1;
}
2026-01-18 05:54:31 +01:00
if (test_segment_batch_packing() != 0) {
return 1;
}
if (test_tombstone_round_trip() != 0) {
return 1;
}
if (test_gc_keeps_visible() != 0) {
return 1;
}
if (test_large_round_trip_perf() != 0) {
return 1;
}
if (test_writer_reader_stress() != 0) {
return 1;
}
return test_shard_routing();
2026-01-17 14:08:41 +01:00
}