asl_store_index_fs: add perf/stress tests and summaries

This commit is contained in:
Carl Niklas Rydberg 2026-01-17 19:49:12 +01:00
parent 017bc87e04
commit 83cbe28ede
5 changed files with 1090 additions and 9 deletions

View file

@ -391,7 +391,7 @@ target_compile_definitions(amduat_test_asl_store_index_fs
PRIVATE _POSIX_C_SOURCE=200809L
)
target_link_libraries(amduat_test_asl_store_index_fs
PRIVATE amduat_asl_store_index_fs
PRIVATE amduat_asl_store_index_fs amduat_format pthread
)
add_test(NAME asl_store_index_fs COMMAND amduat_test_asl_store_index_fs)

View file

@ -38,7 +38,11 @@ enum {
AMDUAT_ASL_STORE_INDEX_FS_LOG_MAGIC_LEN = 8,
AMDUAT_ASL_STORE_INDEX_FS_LOG_HASH_LEN = 32,
AMDUAT_ASL_STORE_INDEX_FS_LOG_HEADER_LEN = 24,
AMDUAT_ASL_STORE_INDEX_FS_LOG_VERSION = 1
AMDUAT_ASL_STORE_INDEX_FS_LOG_VERSION = 1,
AMDUAT_ASL_STORE_INDEX_FS_SUMMARY_MAGIC_LEN = 8,
AMDUAT_ASL_STORE_INDEX_FS_SUMMARY_HEADER_LEN = 24,
AMDUAT_ASL_STORE_INDEX_FS_SUMMARY_ENTRY_LEN = 40,
AMDUAT_ASL_STORE_INDEX_FS_SUMMARY_VERSION = 1
};
typedef enum {
@ -65,6 +69,9 @@ static uint64_t amduat_asl_store_index_fs_load_u64_le(const uint8_t *data);
static const uint8_t k_amduat_asl_store_index_fs_log_magic[
AMDUAT_ASL_STORE_INDEX_FS_LOG_MAGIC_LEN] = {'A', 'S', 'L', 'L',
'O', 'G', '0', '1'};
static const uint8_t k_amduat_asl_store_index_fs_summary_magic[
AMDUAT_ASL_STORE_INDEX_FS_SUMMARY_MAGIC_LEN] = {'A', 'S', 'L', 'S',
'U', 'M', '0', '1'};
static bool amduat_asl_store_index_fs_ensure_directory(const char *path) {
struct stat st;
@ -87,6 +94,46 @@ static bool amduat_asl_store_index_fs_ensure_directory(const char *path) {
return true;
}
static bool amduat_asl_store_index_fs_join_path(const char *base,
const char *suffix,
char **out_path) {
size_t base_len;
size_t suffix_len;
bool needs_sep;
size_t total_len;
char *buffer;
if (base == NULL || suffix == NULL || out_path == NULL) {
return false;
}
if (base[0] == '\0' || suffix[0] == '\0') {
return false;
}
base_len = strlen(base);
suffix_len = strlen(suffix);
needs_sep = base[base_len - 1u] != '/';
total_len = base_len + (needs_sep ? 1u : 0u) + suffix_len + 1u;
buffer = (char *)malloc(total_len);
if (buffer == NULL) {
return false;
}
memcpy(buffer, base, base_len);
if (needs_sep) {
buffer[base_len] = '/';
memcpy(buffer + base_len + 1u, suffix, suffix_len);
buffer[base_len + 1u + suffix_len] = '\0';
} else {
memcpy(buffer + base_len, suffix, suffix_len);
buffer[base_len + suffix_len] = '\0';
}
*out_path = buffer;
return true;
}
static bool amduat_asl_store_index_fs_fsync_directory(const char *path) {
int fd;
int fsync_errno;
@ -250,6 +297,78 @@ static bool amduat_asl_store_index_fs_log_write_exact(int fd,
return true;
}
static bool amduat_asl_store_index_fs_summary_write_header(int fd) {
uint8_t header[AMDUAT_ASL_STORE_INDEX_FS_SUMMARY_HEADER_LEN];
memcpy(header,
k_amduat_asl_store_index_fs_summary_magic,
AMDUAT_ASL_STORE_INDEX_FS_SUMMARY_MAGIC_LEN);
amduat_asl_store_index_fs_log_store_u32_le(
header + AMDUAT_ASL_STORE_INDEX_FS_SUMMARY_MAGIC_LEN,
AMDUAT_ASL_STORE_INDEX_FS_SUMMARY_VERSION);
amduat_asl_store_index_fs_log_store_u32_le(
header + AMDUAT_ASL_STORE_INDEX_FS_SUMMARY_MAGIC_LEN + 4u,
AMDUAT_ASL_STORE_INDEX_FS_SUMMARY_HEADER_LEN);
amduat_asl_store_index_fs_log_store_u64_le(
header + AMDUAT_ASL_STORE_INDEX_FS_SUMMARY_MAGIC_LEN + 8u, 0u);
return amduat_asl_store_index_fs_log_write_exact(fd,
header,
sizeof(header));
}
static bool amduat_asl_store_index_fs_summary_read_header(int fd,
size_t *out_size) {
uint8_t header[AMDUAT_ASL_STORE_INDEX_FS_SUMMARY_HEADER_LEN];
uint32_t version;
uint32_t header_size;
uint64_t flags;
size_t extra;
uint8_t discard[256];
if (out_size == NULL) {
return false;
}
*out_size = 0u;
if (!amduat_asl_store_index_fs_log_read_exact(fd,
header,
sizeof(header))) {
return false;
}
if (memcmp(header,
k_amduat_asl_store_index_fs_summary_magic,
AMDUAT_ASL_STORE_INDEX_FS_SUMMARY_MAGIC_LEN) != 0) {
return false;
}
version = amduat_asl_store_index_fs_log_load_u32_le(
header + AMDUAT_ASL_STORE_INDEX_FS_SUMMARY_MAGIC_LEN);
header_size = amduat_asl_store_index_fs_log_load_u32_le(
header + AMDUAT_ASL_STORE_INDEX_FS_SUMMARY_MAGIC_LEN + 4u);
flags = amduat_asl_store_index_fs_load_u64_le(
header + AMDUAT_ASL_STORE_INDEX_FS_SUMMARY_MAGIC_LEN + 8u);
if (version != AMDUAT_ASL_STORE_INDEX_FS_SUMMARY_VERSION ||
flags != 0u ||
header_size < AMDUAT_ASL_STORE_INDEX_FS_SUMMARY_HEADER_LEN) {
return false;
}
extra = header_size - AMDUAT_ASL_STORE_INDEX_FS_SUMMARY_HEADER_LEN;
while (extra > 0u) {
size_t chunk = extra > sizeof(discard) ? sizeof(discard) : extra;
if (!amduat_asl_store_index_fs_log_read_exact(fd, discard, chunk)) {
return false;
}
extra -= chunk;
}
*out_size = header_size;
return true;
}
static bool amduat_asl_store_index_fs_log_read_header_bytes(
int fd,
uint8_t **out_bytes,
@ -637,6 +756,28 @@ static bool amduat_asl_store_index_fs_build_segment_path(
fs->root_path, shard_id, segment_id, out_path);
}
static bool amduat_asl_store_index_fs_build_segment_summary_path(
amduat_asl_store_index_fs_t *fs,
uint16_t shard_id,
char **out_path) {
char *segments_path;
bool ok;
if (fs == NULL || out_path == NULL) {
return false;
}
if (!amduat_asl_store_index_fs_build_segments_path(fs,
shard_id,
&segments_path)) {
return false;
}
ok = amduat_asl_store_index_fs_join_path(segments_path,
"summary",
out_path);
free(segments_path);
return ok;
}
static bool amduat_asl_store_index_fs_build_block_path(
amduat_asl_store_index_fs_t *fs,
uint16_t shard_id,
@ -783,6 +924,148 @@ static amduat_asl_store_error_t amduat_asl_store_index_fs_write_log(
return AMDUAT_ASL_STORE_ERR_IO;
}
static bool amduat_asl_store_index_fs_summary_append(
amduat_asl_store_index_fs_t *fs,
uint16_t shard_id,
uint64_t segment_id,
const uint8_t segment_hash[AMDUAT_ASL_STORE_INDEX_FS_SEGMENT_HASH_LEN]) {
char *summary_path;
char *segments_path;
int fd;
struct stat st;
uint8_t entry[AMDUAT_ASL_STORE_INDEX_FS_SUMMARY_ENTRY_LEN];
bool ok = false;
if (fs == NULL || segment_hash == NULL) {
return false;
}
if (!amduat_asl_store_index_fs_build_segment_summary_path(fs,
shard_id,
&summary_path)) {
return false;
}
if (!amduat_asl_store_index_fs_build_segments_path(fs,
shard_id,
&segments_path)) {
free(summary_path);
return false;
}
fd = open(summary_path, O_RDWR | O_CREAT, 0644);
if (fd < 0) {
free(summary_path);
free(segments_path);
return false;
}
if (fstat(fd, &st) != 0) {
close(fd);
free(summary_path);
free(segments_path);
return false;
}
if (st.st_size == 0) {
if (!amduat_asl_store_index_fs_summary_write_header(fd)) {
close(fd);
free(summary_path);
free(segments_path);
return false;
}
} else if (st.st_size < AMDUAT_ASL_STORE_INDEX_FS_SUMMARY_HEADER_LEN) {
close(fd);
free(summary_path);
free(segments_path);
return false;
}
if (lseek(fd, 0, SEEK_END) < 0) {
close(fd);
free(summary_path);
free(segments_path);
return false;
}
amduat_asl_store_index_fs_log_store_u64_le(entry, segment_id);
memcpy(entry + 8u, segment_hash, AMDUAT_ASL_STORE_INDEX_FS_SEGMENT_HASH_LEN);
if (!amduat_asl_store_index_fs_log_write_exact(fd,
entry,
sizeof(entry))) {
close(fd);
free(summary_path);
free(segments_path);
return false;
}
ok = fsync(fd) == 0;
if (close(fd) != 0) {
ok = false;
}
if (ok) {
ok = amduat_asl_store_index_fs_fsync_directory(segments_path);
}
free(summary_path);
free(segments_path);
return ok;
}
static bool amduat_asl_store_index_fs_summary_lookup_hash(
amduat_asl_store_index_fs_t *fs,
uint16_t shard_id,
uint64_t segment_id,
uint8_t out_hash[AMDUAT_ASL_STORE_INDEX_FS_SEGMENT_HASH_LEN]) {
char *summary_path;
int fd;
size_t header_size;
uint8_t entry[AMDUAT_ASL_STORE_INDEX_FS_SUMMARY_ENTRY_LEN];
if (fs == NULL || out_hash == NULL) {
return false;
}
if (!amduat_asl_store_index_fs_build_segment_summary_path(fs,
shard_id,
&summary_path)) {
return false;
}
fd = open(summary_path, O_RDONLY);
free(summary_path);
if (fd < 0) {
return false;
}
if (!amduat_asl_store_index_fs_summary_read_header(fd, &header_size)) {
close(fd);
return false;
}
(void)header_size;
while (true) {
uint64_t entry_segment_id;
int rc = amduat_asl_store_index_fs_log_read_exact_eof(
fd, entry, sizeof(entry));
if (rc == 0) {
break;
}
if (rc < 0) {
close(fd);
return false;
}
entry_segment_id = amduat_asl_store_index_fs_load_u64_le(entry);
if (entry_segment_id == segment_id) {
memcpy(out_hash,
entry + 8u,
AMDUAT_ASL_STORE_INDEX_FS_SEGMENT_HASH_LEN);
close(fd);
return true;
}
}
close(fd);
return false;
}
static amduat_asl_store_error_t amduat_asl_store_index_fs_stream_log_apply(
const char *log_path,
amduat_asl_replay_state_t *replay_state,
@ -1375,6 +1658,129 @@ static bool amduat_asl_store_index_fs_parse_snapshot_filename(
return true;
}
static bool amduat_asl_store_index_fs_snapshot_manifest_exists(
const char *root_path,
amduat_asl_snapshot_id_t snapshot_id) {
char *manifest_path;
struct stat st;
bool ok;
if (root_path == NULL || snapshot_id == 0u) {
return false;
}
if (!amduat_asl_store_index_fs_layout_build_snapshot_manifest_path(
root_path, snapshot_id, &manifest_path)) {
return false;
}
ok = stat(manifest_path, &st) == 0 && S_ISREG(st.st_mode);
free(manifest_path);
return ok;
}
static bool amduat_asl_store_index_fs_read_latest_snapshot_id(
amduat_asl_store_index_fs_t *fs,
amduat_asl_snapshot_id_t *out_id) {
char *latest_path;
uint8_t *latest_bytes;
size_t latest_len;
amduat_asl_store_index_fs_read_status_t status;
char *latest_str;
size_t trimmed_len;
uint64_t parsed;
if (fs == NULL || out_id == NULL) {
return false;
}
if (!amduat_asl_store_index_fs_layout_build_snapshot_latest_path(
fs->root_path, &latest_path)) {
return false;
}
latest_bytes = NULL;
latest_len = 0u;
status = amduat_asl_store_index_fs_read_file(latest_path,
&latest_bytes,
&latest_len);
free(latest_path);
if (status == AMDUAT_ASL_STORE_INDEX_FS_READ_NOT_FOUND) {
return false;
}
if (status != AMDUAT_ASL_STORE_INDEX_FS_READ_OK || latest_len == 0u) {
free(latest_bytes);
return false;
}
latest_str = (char *)malloc(latest_len + 1u);
if (latest_str == NULL) {
free(latest_bytes);
return false;
}
memcpy(latest_str, latest_bytes, latest_len);
latest_str[latest_len] = '\0';
trimmed_len = latest_len;
while (trimmed_len > 0u) {
char c = latest_str[trimmed_len - 1u];
if (c != '\n' && c != '\r' && c != ' ' && c != '\t') {
break;
}
latest_str[trimmed_len - 1u] = '\0';
trimmed_len -= 1u;
}
if (trimmed_len == 0u ||
!amduat_asl_store_index_fs_parse_u64(latest_str, &parsed) ||
parsed == 0u) {
free(latest_bytes);
free(latest_str);
return false;
}
free(latest_bytes);
free(latest_str);
*out_id = (amduat_asl_snapshot_id_t)parsed;
return true;
}
static void amduat_asl_store_index_fs_write_latest_snapshot_id(
amduat_asl_store_index_fs_t *fs,
amduat_asl_snapshot_id_t snapshot_id) {
char *latest_path;
char *snapshots_path;
char buffer[32];
int len;
amduat_asl_store_index_fs_write_status_t status;
if (fs == NULL || snapshot_id == 0u) {
return;
}
if (!amduat_asl_store_index_fs_layout_build_snapshot_latest_path(
fs->root_path, &latest_path)) {
return;
}
if (!amduat_asl_store_index_fs_layout_build_snapshots_path(
fs->root_path, &snapshots_path)) {
free(latest_path);
return;
}
len = snprintf(buffer, sizeof(buffer), "%" PRIu64, snapshot_id);
if (len <= 0 || (size_t)len >= sizeof(buffer)) {
free(latest_path);
free(snapshots_path);
return;
}
status = amduat_asl_store_index_fs_write_replace(snapshots_path,
latest_path,
(const uint8_t *)buffer,
(size_t)len);
if (status == AMDUAT_ASL_STORE_INDEX_FS_WRITE_OK) {
(void)amduat_asl_store_index_fs_fsync_directory(snapshots_path);
}
free(latest_path);
free(snapshots_path);
}
static bool amduat_asl_store_index_fs_find_latest_snapshot_id(
amduat_asl_store_index_fs_t *fs,
amduat_asl_snapshot_id_t *out_id) {
@ -1388,6 +1794,13 @@ static bool amduat_asl_store_index_fs_find_latest_snapshot_id(
return false;
}
if (amduat_asl_store_index_fs_read_latest_snapshot_id(fs, &latest) &&
amduat_asl_store_index_fs_snapshot_manifest_exists(fs->root_path,
latest)) {
*out_id = latest;
return true;
}
if (!amduat_asl_store_index_fs_layout_build_snapshots_path(fs->root_path,
&snapshots_path)) {
return false;
@ -1966,12 +2379,16 @@ static bool amduat_asl_store_index_fs_is_tombstoned(
}
static amduat_asl_store_error_t amduat_asl_store_index_fs_load_segment(
amduat_asl_store_index_fs_t *fs,
uint16_t shard_id,
uint64_t segment_id,
const char *segment_path,
amduat_asl_core_index_segment_t *out_segment,
uint8_t out_hash[AMDUAT_ASL_STORE_INDEX_FS_SEGMENT_HASH_LEN]) {
uint8_t *segment_bytes;
size_t segment_len;
amduat_asl_store_index_fs_read_status_t status;
bool has_summary_hash;
if (out_segment == NULL || out_hash == NULL) {
return AMDUAT_ASL_STORE_ERR_IO;
@ -1996,12 +2413,16 @@ static amduat_asl_store_error_t amduat_asl_store_index_fs_load_segment(
return AMDUAT_ASL_STORE_ERR_IO;
}
if (!amduat_hash_asl1_digest(AMDUAT_HASH_ASL1_ID_SHA256,
amduat_octets(segment_bytes, segment_len),
out_hash,
AMDUAT_ASL_STORE_INDEX_FS_SEGMENT_HASH_LEN)) {
free(segment_bytes);
return AMDUAT_ASL_STORE_ERR_IO;
has_summary_hash = amduat_asl_store_index_fs_summary_lookup_hash(
fs, shard_id, segment_id, out_hash);
if (!has_summary_hash) {
if (!amduat_hash_asl1_digest(AMDUAT_HASH_ASL1_ID_SHA256,
amduat_octets(segment_bytes, segment_len),
out_hash,
AMDUAT_ASL_STORE_INDEX_FS_SEGMENT_HASH_LEN)) {
free(segment_bytes);
return AMDUAT_ASL_STORE_ERR_IO;
}
}
if (!amduat_enc_asl_core_index_decode_v1(
@ -2271,7 +2692,10 @@ static amduat_asl_store_error_t amduat_asl_store_index_fs_scan_segments(
return AMDUAT_ASL_STORE_ERR_IO;
}
err = amduat_asl_store_index_fs_load_segment(segment_path,
err = amduat_asl_store_index_fs_load_segment(fs,
shard_id,
seal->segment_id,
segment_path,
&segment,
segment_hash);
free(segment_path);
@ -2811,6 +3235,11 @@ static amduat_asl_store_error_t amduat_asl_store_index_fs_put_indexed_impl(
return AMDUAT_ASL_STORE_ERR_IO;
}
(void)amduat_asl_store_index_fs_summary_append(fs,
shard_id,
segment_id,
segment_hash);
if (!amduat_asl_store_index_fs_layout_build_log_path(fs->root_path,
&log_path)) {
amduat_reference_free(&derived_ref);
@ -3135,6 +3564,7 @@ amduat_asl_store_error_t amduat_asl_store_index_fs_snapshot_create(
fs->next_snapshot_id = snapshot_id + 1u;
}
}
amduat_asl_store_index_fs_write_latest_snapshot_id(fs, snapshot_id);
free(manifest_path);
return AMDUAT_ASL_STORE_OK;
}

View file

@ -311,6 +311,23 @@ bool amduat_asl_store_index_fs_layout_build_snapshot_manifest_path(
return ok;
}
bool amduat_asl_store_index_fs_layout_build_snapshot_latest_path(
const char *root_path,
char **out_path) {
char *snapshots_path;
bool ok;
if (!amduat_asl_store_index_fs_layout_build_snapshots_path(root_path,
&snapshots_path)) {
return false;
}
ok = amduat_asl_store_index_fs_layout_join(snapshots_path,
"latest",
out_path);
free(snapshots_path);
return ok;
}
bool amduat_asl_store_index_fs_layout_build_segment_meta_path(
const char *root_path,
char **out_path) {
@ -328,6 +345,23 @@ bool amduat_asl_store_index_fs_layout_build_segment_meta_path(
return ok;
}
bool amduat_asl_store_index_fs_layout_build_segment_summary_path(
const char *root_path,
char **out_path) {
char *segments_path;
bool ok;
if (!amduat_asl_store_index_fs_layout_build_segments_path(root_path,
&segments_path)) {
return false;
}
ok = amduat_asl_store_index_fs_layout_join(segments_path,
"summary",
out_path);
free(segments_path);
return ok;
}
bool amduat_asl_store_index_fs_layout_build_segment_path(
const char *root_path,
uint64_t segment_id,

View file

@ -66,10 +66,18 @@ bool amduat_asl_store_index_fs_layout_build_snapshot_manifest_path(
uint64_t snapshot_id,
char **out_path);
bool amduat_asl_store_index_fs_layout_build_snapshot_latest_path(
const char *root_path,
char **out_path);
bool amduat_asl_store_index_fs_layout_build_segment_meta_path(
const char *root_path,
char **out_path);
bool amduat_asl_store_index_fs_layout_build_segment_summary_path(
const char *root_path,
char **out_path);
bool amduat_asl_store_index_fs_layout_build_segment_path(
const char *root_path,
uint64_t segment_id,

View file

@ -1,21 +1,26 @@
#include "amduat/asl/asl_store_index_fs.h"
#include "amduat/asl/index_bloom.h"
#include "amduat/asl/ref_text.h"
#include "amduat/asl/ref_derive.h"
#include "amduat/asl/store.h"
#include "amduat/enc/asl1_core.h"
#include "amduat/enc/asl_core_index.h"
#include "amduat/format/ref.h"
#include "amduat/hash/asl1.h"
#include <dirent.h>
#include <errno.h>
#include <inttypes.h>
#include <pthread.h>
#include <stdbool.h>
#include <stdint.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/resource.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <time.h>
#include <unistd.h>
static bool join_path(const char *base, const char *segment, char **out_path) {
@ -262,6 +267,72 @@ static uint16_t ref_shard(amduat_reference_t ref, uint16_t shard_count) {
return (uint16_t)(hash % shard_count);
}
static bool parse_size_env(const char *value, size_t *out) {
char *endptr;
unsigned long long parsed;
if (value == NULL || out == NULL) {
return false;
}
errno = 0;
parsed = strtoull(value, &endptr, 10);
if (errno != 0 || endptr == value || *endptr != '\0') {
return false;
}
if (parsed > SIZE_MAX) {
return false;
}
*out = (size_t)parsed;
return true;
}
static uint64_t now_ns(void) {
struct timespec ts;
if (clock_gettime(CLOCK_MONOTONIC, &ts) != 0) {
return 0u;
}
return (uint64_t)ts.tv_sec * 1000u * 1000u * 1000u +
(uint64_t)ts.tv_nsec;
}
static void sleep_ms(unsigned int ms) {
struct timespec ts;
ts.tv_sec = (time_t)(ms / 1000u);
ts.tv_nsec = (long)((ms % 1000u) * 1000u * 1000u);
while (nanosleep(&ts, &ts) != 0 && errno == EINTR) {
}
}
static long current_rss_kb(void) {
struct rusage usage;
if (getrusage(RUSAGE_SELF, &usage) != 0) {
return -1;
}
return usage.ru_maxrss;
}
static bool parse_u64_env(const char *value, uint64_t *out) {
char *endptr;
unsigned long long parsed;
if (value == NULL || out == NULL) {
return false;
}
errno = 0;
parsed = strtoull(value, &endptr, 10);
if (errno != 0 || endptr == value || *endptr != '\0') {
return false;
}
if (parsed > UINT64_MAX) {
return false;
}
*out = (uint64_t)parsed;
return true;
}
static char *make_temp_root(void) {
char *templ;
const char template_prefix[] = "/tmp/amduat_test_asl_store_index_fs_XXXXXX";
@ -668,6 +739,538 @@ cleanup:
return exit_code;
}
static int test_large_round_trip_perf(void) {
enum { k_artifact_count_default = 10000, k_payload_max = 64 };
amduat_asl_store_config_t config;
amduat_asl_store_index_fs_t fs;
amduat_asl_store_t store;
amduat_asl_store_error_t err;
amduat_asl_index_state_t state;
amduat_reference_t *refs = NULL;
uint8_t **payloads = NULL;
size_t *payload_lens = NULL;
char *root;
int exit_code = 1;
size_t i;
size_t artifact_count;
uint64_t start_ns;
uint64_t end_ns;
long rss_kb;
const char *count_env;
root = make_temp_root();
if (root == NULL) {
fprintf(stderr, "temp root failed\n");
return 1;
}
artifact_count = k_artifact_count_default;
count_env = getenv("AMDUAT_ASL_PERF_COUNT");
if (count_env != NULL) {
size_t parsed;
if (parse_size_env(count_env, &parsed) && parsed > 0u) {
artifact_count = parsed;
}
}
refs = (amduat_reference_t *)calloc(artifact_count, sizeof(*refs));
payloads = (uint8_t **)calloc(artifact_count, sizeof(*payloads));
payload_lens = (size_t *)calloc(artifact_count, sizeof(*payload_lens));
if (refs == NULL || payloads == NULL || payload_lens == NULL) {
fprintf(stderr, "alloc failed\n");
goto cleanup;
}
memset(&config, 0, sizeof(config));
config.encoding_profile_id = AMDUAT_ENC_ASL1_CORE_V1;
config.hash_id = AMDUAT_HASH_ASL1_ID_SHA256;
if (!amduat_asl_store_index_fs_init(&fs, config, root)) {
fprintf(stderr, "index fs init failed\n");
goto cleanup;
}
amduat_asl_store_init(&store, config, amduat_asl_store_index_fs_ops(), &fs);
srand(1337);
start_ns = now_ns();
for (i = 0; i < artifact_count; ++i) {
size_t len = (size_t)((rand() % k_payload_max) + 1);
uint8_t *payload = (uint8_t *)malloc(len);
if (payload == NULL) {
fprintf(stderr, "payload alloc failed\n");
goto cleanup;
}
for (size_t j = 0; j < len; ++j) {
payload[j] = (uint8_t)(rand() & 0xffu);
}
payloads[i] = payload;
payload_lens[i] = len;
refs[i] = amduat_reference(0u, amduat_octets(NULL, 0u));
err = amduat_asl_store_put(&store,
amduat_artifact(amduat_octets(payload, len)),
&refs[i]);
if (err != AMDUAT_ASL_STORE_OK) {
fprintf(stderr, "put failed: %d\n", err);
goto cleanup;
}
}
end_ns = now_ns();
rss_kb = current_rss_kb();
if (start_ns != 0u && end_ns != 0u) {
fprintf(stderr,
"perf: put %zu artifacts in %.3f ms\n",
artifact_count,
(double)(end_ns - start_ns) / 1000000.0);
}
if (rss_kb >= 0) {
fprintf(stderr, "perf: maxrss %ld KB\n", rss_kb);
}
if (!amduat_asl_index_current_state(&store, &state)) {
fprintf(stderr, "current_state failed\n");
goto cleanup;
}
start_ns = now_ns();
for (i = 0; i < artifact_count; ++i) {
amduat_artifact_t loaded =
amduat_artifact(amduat_octets(NULL, 0u));
err = amduat_asl_store_get_indexed(&store, refs[i], state, &loaded);
if (err != AMDUAT_ASL_STORE_OK) {
char *ref_text = NULL;
(void)amduat_format_ref_to_text(refs[i], AMDUAT_FORMAT_REF_HEX,
&ref_text);
fprintf(stderr,
"get failed: %d index=%zu snapshot=%" PRIu64 " log=%" PRIu64
" ref=%s\n",
err,
i,
state.snapshot_id,
state.log_position,
ref_text == NULL ? "(null)" : ref_text);
free(ref_text);
goto cleanup;
}
if (loaded.bytes.len != payload_lens[i] ||
(loaded.bytes.len != 0u &&
memcmp(loaded.bytes.data, payloads[i], loaded.bytes.len) != 0)) {
fprintf(stderr, "payload mismatch\n");
amduat_artifact_free(&loaded);
goto cleanup;
}
amduat_artifact_free(&loaded);
}
end_ns = now_ns();
if (start_ns != 0u && end_ns != 0u) {
fprintf(stderr,
"perf: get %zu artifacts in %.3f ms\n",
artifact_count,
(double)(end_ns - start_ns) / 1000000.0);
}
exit_code = 0;
cleanup:
if (refs != NULL) {
for (i = 0; i < artifact_count; ++i) {
amduat_reference_free(&refs[i]);
}
}
if (payloads != NULL) {
for (i = 0; i < artifact_count; ++i) {
free(payloads[i]);
}
}
free(payloads);
free(payload_lens);
free(refs);
if (!remove_tree(root)) {
fprintf(stderr, "cleanup failed\n");
exit_code = 1;
}
free(root);
return exit_code;
}
typedef struct {
const char *root;
const char *log_path;
amduat_asl_store_config_t config;
uint64_t duration_ns;
volatile bool stop;
volatile bool failed;
volatile uint64_t put_count;
volatile uint64_t get_count;
} stress_state_t;
static void *stress_writer_thread(void *arg) {
stress_state_t *state = (stress_state_t *)arg;
const amduat_hash_asl1_desc_t *hash_desc;
amduat_asl_store_index_fs_t fs;
amduat_asl_store_t store;
amduat_asl_store_index_fs_snapshot_policy_t policy;
uint8_t *payload = NULL;
FILE *log_fp = NULL;
uint64_t start_ns;
uint64_t now;
uint64_t flush_count = 0u;
if (state == NULL) {
return NULL;
}
hash_desc = amduat_hash_asl1_desc_lookup(state->config.hash_id);
if (hash_desc == NULL || hash_desc->digest_len == 0) {
state->failed = true;
state->stop = true;
return NULL;
}
if (!amduat_asl_store_index_fs_init(&fs, state->config, state->root)) {
state->failed = true;
state->stop = true;
return NULL;
}
memset(&policy, 0, sizeof(policy));
policy.enabled = false;
amduat_asl_store_index_fs_set_snapshot_policy(&fs, policy);
amduat_asl_store_init(&store, state->config, amduat_asl_store_index_fs_ops(),
&fs);
payload = (uint8_t *)calloc(hash_desc->digest_len, 1u);
if (payload == NULL) {
state->failed = true;
state->stop = true;
return NULL;
}
log_fp = fopen(state->log_path, "ab");
if (log_fp == NULL) {
free(payload);
state->failed = true;
state->stop = true;
return NULL;
}
start_ns = now_ns();
while (!state->stop) {
amduat_reference_t ref = amduat_reference(0u, amduat_octets(NULL, 0u));
amduat_asl_store_error_t err;
char *hex = NULL;
err = amduat_asl_store_put(&store,
amduat_artifact(amduat_octets(
payload, hash_desc->digest_len)),
&ref);
if (err != AMDUAT_ASL_STORE_OK) {
state->failed = true;
state->stop = true;
break;
}
if (ref.digest.len == hash_desc->digest_len &&
ref.digest.data != NULL) {
memcpy(payload, ref.digest.data, ref.digest.len);
} else {
amduat_reference_free(&ref);
state->failed = true;
state->stop = true;
break;
}
if (!amduat_asl_ref_encode_hex(ref, &hex) || hex == NULL) {
amduat_reference_free(&ref);
state->failed = true;
state->stop = true;
break;
}
fprintf(log_fp, "%s\n", hex);
free(hex);
amduat_reference_free(&ref);
__sync_fetch_and_add(&state->put_count, 1u);
flush_count++;
if ((flush_count % 256u) == 0u) {
fflush(log_fp);
}
now = now_ns();
if (now != 0u && now - start_ns >= state->duration_ns) {
state->stop = true;
break;
}
}
fflush(log_fp);
fclose(log_fp);
free(payload);
return NULL;
}
static void *stress_reader_thread(void *arg) {
stress_state_t *state = (stress_state_t *)arg;
amduat_asl_store_index_fs_t fs;
amduat_asl_store_t store;
amduat_asl_store_index_fs_snapshot_policy_t policy;
FILE *log_fp = NULL;
unsigned int seed = 1337u;
struct stat st;
if (state == NULL) {
return NULL;
}
if (!amduat_asl_store_index_fs_init(&fs, state->config, state->root)) {
state->failed = true;
state->stop = true;
return NULL;
}
memset(&policy, 0, sizeof(policy));
policy.enabled = false;
amduat_asl_store_index_fs_set_snapshot_policy(&fs, policy);
amduat_asl_store_init(&store, state->config, amduat_asl_store_index_fs_ops(),
&fs);
log_fp = fopen(state->log_path, "rb");
while (log_fp == NULL && !state->stop) {
if (errno != ENOENT) {
state->failed = true;
state->stop = true;
return NULL;
}
sleep_ms(10u);
log_fp = fopen(state->log_path, "rb");
}
if (log_fp == NULL) {
state->failed = true;
state->stop = true;
return NULL;
}
while (!state->stop) {
char line[256];
long offset;
size_t len;
amduat_reference_t ref;
if (fstat(fileno(log_fp), &st) != 0) {
state->failed = true;
state->stop = true;
break;
}
if (st.st_size <= 0) {
continue;
}
offset = (long)(rand_r(&seed) % (unsigned int)st.st_size);
if (fseek(log_fp, offset, SEEK_SET) != 0) {
state->failed = true;
state->stop = true;
break;
}
if (offset != 0) {
if (fgets(line, sizeof(line), log_fp) == NULL) {
continue;
}
}
if (fgets(line, sizeof(line), log_fp) == NULL) {
continue;
}
len = strlen(line);
if (len == 0u || line[len - 1u] != '\n') {
continue;
}
line[len - 1u] = '\0';
if (!amduat_asl_ref_decode_hex(line, &ref)) {
fprintf(stderr, "stress: decode ref failed\n");
state->failed = true;
state->stop = true;
break;
}
{
amduat_artifact_t loaded =
amduat_artifact(amduat_octets(NULL, 0u));
amduat_asl_store_error_t err =
amduat_asl_store_get(&store, ref, &loaded);
if (err != AMDUAT_ASL_STORE_OK) {
char *ref_text = NULL;
(void)amduat_format_ref_to_text(ref, AMDUAT_FORMAT_REF_HEX,
&ref_text);
fprintf(stderr,
"stress: get failed: %d ref=%s\n",
err,
ref_text == NULL ? "(null)" : ref_text);
free(ref_text);
state->failed = true;
state->stop = true;
amduat_artifact_free(&loaded);
amduat_reference_free(&ref);
break;
}
amduat_artifact_free(&loaded);
__sync_fetch_and_add(&state->get_count, 1u);
}
amduat_reference_free(&ref);
}
fclose(log_fp);
return NULL;
}
static int test_writer_reader_stress(void) {
amduat_asl_store_config_t config;
pthread_t writer;
pthread_t reader;
stress_state_t state;
char *root;
char *log_path = NULL;
uint64_t duration_ns = 60u * 1000u * 1000u * 1000u;
const char *duration_env;
uint64_t start_ns;
uint64_t last_ns;
uint64_t now;
uint64_t last_put = 0u;
uint64_t last_get = 0u;
double put_rate_first = 0.0;
double get_rate_first = 0.0;
double put_rate_last = 0.0;
double get_rate_last = 0.0;
double put_rate_min = 0.0;
double put_rate_max = 0.0;
double get_rate_min = 0.0;
double get_rate_max = 0.0;
double put_rate_sum = 0.0;
double get_rate_sum = 0.0;
uint64_t rate_samples = 0u;
root = make_temp_root();
if (root == NULL) {
fprintf(stderr, "temp root failed\n");
return 1;
}
if (!join_path(root, "refs.log", &log_path)) {
fprintf(stderr, "log path failed\n");
free(root);
return 1;
}
memset(&config, 0, sizeof(config));
config.encoding_profile_id = AMDUAT_ENC_ASL1_CORE_V1;
config.hash_id = AMDUAT_HASH_ASL1_ID_SHA256;
duration_env = getenv("AMDUAT_ASL_STRESS_SECS");
if (duration_env != NULL) {
uint64_t seconds;
if (parse_u64_env(duration_env, &seconds)) {
duration_ns = seconds * 1000u * 1000u * 1000u;
}
}
memset(&state, 0, sizeof(state));
state.root = root;
state.log_path = log_path;
state.config = config;
state.duration_ns = duration_ns;
state.stop = false;
state.failed = false;
start_ns = now_ns();
last_ns = start_ns;
if (pthread_create(&writer, NULL, stress_writer_thread, &state) != 0 ||
pthread_create(&reader, NULL, stress_reader_thread, &state) != 0) {
fprintf(stderr, "thread start failed\n");
state.stop = true;
}
while (!state.stop) {
now = now_ns();
if (now != 0u && now - last_ns >= 5u * 1000u * 1000u * 1000u) {
uint64_t puts = state.put_count;
uint64_t gets = state.get_count;
double elapsed_s = (double)(now - last_ns) / 1000000000.0;
double put_rate = (puts - last_put) / (elapsed_s > 0 ? elapsed_s : 1.0);
double get_rate = (gets - last_get) / (elapsed_s > 0 ? elapsed_s : 1.0);
if (rate_samples == 0u) {
put_rate_first = put_rate;
get_rate_first = get_rate;
put_rate_min = put_rate;
put_rate_max = put_rate;
get_rate_min = get_rate;
get_rate_max = get_rate;
} else {
if (put_rate < put_rate_min) {
put_rate_min = put_rate;
}
if (put_rate > put_rate_max) {
put_rate_max = put_rate;
}
if (get_rate < get_rate_min) {
get_rate_min = get_rate;
}
if (get_rate > get_rate_max) {
get_rate_max = get_rate;
}
}
put_rate_last = put_rate;
get_rate_last = get_rate;
put_rate_sum += put_rate;
get_rate_sum += get_rate;
rate_samples++;
fprintf(stderr,
"stress: t=%.1fs puts=%" PRIu64 " gets=%" PRIu64
" put/s=%.1f get/s=%.1f\n",
(double)(now - start_ns) / 1000000000.0,
puts,
gets,
put_rate,
get_rate);
last_put = puts;
last_get = gets;
last_ns = now;
}
if (now != 0u && now - start_ns >= duration_ns) {
state.stop = true;
}
if (state.failed) {
state.stop = true;
}
sleep_ms(10u);
}
pthread_join(writer, NULL);
pthread_join(reader, NULL);
if (state.failed) {
fprintf(stderr, "stress: failed\n");
}
if (rate_samples > 0u) {
fprintf(stderr,
"stress: put/s first=%.1f last=%.1f min=%.1f max=%.1f avg=%.1f\n",
put_rate_first,
put_rate_last,
put_rate_min,
put_rate_max,
put_rate_sum / (double)rate_samples);
fprintf(stderr,
"stress: get/s first=%.1f last=%.1f min=%.1f max=%.1f avg=%.1f\n",
get_rate_first,
get_rate_last,
get_rate_min,
get_rate_max,
get_rate_sum / (double)rate_samples);
}
free(log_path);
if (!remove_tree(root)) {
fprintf(stderr, "cleanup failed\n");
free(root);
return 1;
}
free(root);
return state.failed ? 1 : 0;
}
int main(void) {
if (test_round_trip() != 0) {
return 1;
@ -675,5 +1278,11 @@ int main(void) {
if (test_snapshot_truncation() != 0) {
return 1;
}
if (test_large_round_trip_perf() != 0) {
return 1;
}
if (test_writer_reader_stress() != 0) {
return 1;
}
return test_shard_routing();
}