From 83cbe28ede3331b68368c04d2bb52734ecc5af95 Mon Sep 17 00:00:00 2001 From: Carl Niklas Rydberg Date: Sat, 17 Jan 2026 19:49:12 +0100 Subject: [PATCH] asl_store_index_fs: add perf/stress tests and summaries --- CMakeLists.txt | 2 +- .../asl_store_index_fs/asl_store_index_fs.c | 446 ++++++++++++- .../asl_store_index_fs_layout.c | 34 + .../asl_store_index_fs_layout.h | 8 + tests/asl/test_asl_store_index_fs.c | 609 ++++++++++++++++++ 5 files changed, 1090 insertions(+), 9 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index b3fe118..56a5f9a 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -391,7 +391,7 @@ target_compile_definitions(amduat_test_asl_store_index_fs PRIVATE _POSIX_C_SOURCE=200809L ) target_link_libraries(amduat_test_asl_store_index_fs - PRIVATE amduat_asl_store_index_fs + PRIVATE amduat_asl_store_index_fs amduat_format pthread ) add_test(NAME asl_store_index_fs COMMAND amduat_test_asl_store_index_fs) diff --git a/src/adapters/asl_store_index_fs/asl_store_index_fs.c b/src/adapters/asl_store_index_fs/asl_store_index_fs.c index 07ace06..3ec1c42 100644 --- a/src/adapters/asl_store_index_fs/asl_store_index_fs.c +++ b/src/adapters/asl_store_index_fs/asl_store_index_fs.c @@ -38,7 +38,11 @@ enum { AMDUAT_ASL_STORE_INDEX_FS_LOG_MAGIC_LEN = 8, AMDUAT_ASL_STORE_INDEX_FS_LOG_HASH_LEN = 32, AMDUAT_ASL_STORE_INDEX_FS_LOG_HEADER_LEN = 24, - AMDUAT_ASL_STORE_INDEX_FS_LOG_VERSION = 1 + AMDUAT_ASL_STORE_INDEX_FS_LOG_VERSION = 1, + AMDUAT_ASL_STORE_INDEX_FS_SUMMARY_MAGIC_LEN = 8, + AMDUAT_ASL_STORE_INDEX_FS_SUMMARY_HEADER_LEN = 24, + AMDUAT_ASL_STORE_INDEX_FS_SUMMARY_ENTRY_LEN = 40, + AMDUAT_ASL_STORE_INDEX_FS_SUMMARY_VERSION = 1 }; typedef enum { @@ -65,6 +69,9 @@ static uint64_t amduat_asl_store_index_fs_load_u64_le(const uint8_t *data); static const uint8_t k_amduat_asl_store_index_fs_log_magic[ AMDUAT_ASL_STORE_INDEX_FS_LOG_MAGIC_LEN] = {'A', 'S', 'L', 'L', 'O', 'G', '0', '1'}; +static const uint8_t k_amduat_asl_store_index_fs_summary_magic[ + AMDUAT_ASL_STORE_INDEX_FS_SUMMARY_MAGIC_LEN] = {'A', 'S', 'L', 'S', + 'U', 'M', '0', '1'}; static bool amduat_asl_store_index_fs_ensure_directory(const char *path) { struct stat st; @@ -87,6 +94,46 @@ static bool amduat_asl_store_index_fs_ensure_directory(const char *path) { return true; } +static bool amduat_asl_store_index_fs_join_path(const char *base, + const char *suffix, + char **out_path) { + size_t base_len; + size_t suffix_len; + bool needs_sep; + size_t total_len; + char *buffer; + + if (base == NULL || suffix == NULL || out_path == NULL) { + return false; + } + if (base[0] == '\0' || suffix[0] == '\0') { + return false; + } + + base_len = strlen(base); + suffix_len = strlen(suffix); + needs_sep = base[base_len - 1u] != '/'; + total_len = base_len + (needs_sep ? 1u : 0u) + suffix_len + 1u; + + buffer = (char *)malloc(total_len); + if (buffer == NULL) { + return false; + } + + memcpy(buffer, base, base_len); + if (needs_sep) { + buffer[base_len] = '/'; + memcpy(buffer + base_len + 1u, suffix, suffix_len); + buffer[base_len + 1u + suffix_len] = '\0'; + } else { + memcpy(buffer + base_len, suffix, suffix_len); + buffer[base_len + suffix_len] = '\0'; + } + + *out_path = buffer; + return true; +} + static bool amduat_asl_store_index_fs_fsync_directory(const char *path) { int fd; int fsync_errno; @@ -250,6 +297,78 @@ static bool amduat_asl_store_index_fs_log_write_exact(int fd, return true; } +static bool amduat_asl_store_index_fs_summary_write_header(int fd) { + uint8_t header[AMDUAT_ASL_STORE_INDEX_FS_SUMMARY_HEADER_LEN]; + + memcpy(header, + k_amduat_asl_store_index_fs_summary_magic, + AMDUAT_ASL_STORE_INDEX_FS_SUMMARY_MAGIC_LEN); + amduat_asl_store_index_fs_log_store_u32_le( + header + AMDUAT_ASL_STORE_INDEX_FS_SUMMARY_MAGIC_LEN, + AMDUAT_ASL_STORE_INDEX_FS_SUMMARY_VERSION); + amduat_asl_store_index_fs_log_store_u32_le( + header + AMDUAT_ASL_STORE_INDEX_FS_SUMMARY_MAGIC_LEN + 4u, + AMDUAT_ASL_STORE_INDEX_FS_SUMMARY_HEADER_LEN); + amduat_asl_store_index_fs_log_store_u64_le( + header + AMDUAT_ASL_STORE_INDEX_FS_SUMMARY_MAGIC_LEN + 8u, 0u); + + return amduat_asl_store_index_fs_log_write_exact(fd, + header, + sizeof(header)); +} + +static bool amduat_asl_store_index_fs_summary_read_header(int fd, + size_t *out_size) { + uint8_t header[AMDUAT_ASL_STORE_INDEX_FS_SUMMARY_HEADER_LEN]; + uint32_t version; + uint32_t header_size; + uint64_t flags; + size_t extra; + uint8_t discard[256]; + + if (out_size == NULL) { + return false; + } + *out_size = 0u; + + if (!amduat_asl_store_index_fs_log_read_exact(fd, + header, + sizeof(header))) { + return false; + } + + if (memcmp(header, + k_amduat_asl_store_index_fs_summary_magic, + AMDUAT_ASL_STORE_INDEX_FS_SUMMARY_MAGIC_LEN) != 0) { + return false; + } + + version = amduat_asl_store_index_fs_log_load_u32_le( + header + AMDUAT_ASL_STORE_INDEX_FS_SUMMARY_MAGIC_LEN); + header_size = amduat_asl_store_index_fs_log_load_u32_le( + header + AMDUAT_ASL_STORE_INDEX_FS_SUMMARY_MAGIC_LEN + 4u); + flags = amduat_asl_store_index_fs_load_u64_le( + header + AMDUAT_ASL_STORE_INDEX_FS_SUMMARY_MAGIC_LEN + 8u); + + if (version != AMDUAT_ASL_STORE_INDEX_FS_SUMMARY_VERSION || + flags != 0u || + header_size < AMDUAT_ASL_STORE_INDEX_FS_SUMMARY_HEADER_LEN) { + return false; + } + + extra = header_size - AMDUAT_ASL_STORE_INDEX_FS_SUMMARY_HEADER_LEN; + while (extra > 0u) { + size_t chunk = extra > sizeof(discard) ? sizeof(discard) : extra; + if (!amduat_asl_store_index_fs_log_read_exact(fd, discard, chunk)) { + return false; + } + extra -= chunk; + } + + *out_size = header_size; + return true; +} + static bool amduat_asl_store_index_fs_log_read_header_bytes( int fd, uint8_t **out_bytes, @@ -637,6 +756,28 @@ static bool amduat_asl_store_index_fs_build_segment_path( fs->root_path, shard_id, segment_id, out_path); } +static bool amduat_asl_store_index_fs_build_segment_summary_path( + amduat_asl_store_index_fs_t *fs, + uint16_t shard_id, + char **out_path) { + char *segments_path; + bool ok; + + if (fs == NULL || out_path == NULL) { + return false; + } + if (!amduat_asl_store_index_fs_build_segments_path(fs, + shard_id, + &segments_path)) { + return false; + } + ok = amduat_asl_store_index_fs_join_path(segments_path, + "summary", + out_path); + free(segments_path); + return ok; +} + static bool amduat_asl_store_index_fs_build_block_path( amduat_asl_store_index_fs_t *fs, uint16_t shard_id, @@ -783,6 +924,148 @@ static amduat_asl_store_error_t amduat_asl_store_index_fs_write_log( return AMDUAT_ASL_STORE_ERR_IO; } +static bool amduat_asl_store_index_fs_summary_append( + amduat_asl_store_index_fs_t *fs, + uint16_t shard_id, + uint64_t segment_id, + const uint8_t segment_hash[AMDUAT_ASL_STORE_INDEX_FS_SEGMENT_HASH_LEN]) { + char *summary_path; + char *segments_path; + int fd; + struct stat st; + uint8_t entry[AMDUAT_ASL_STORE_INDEX_FS_SUMMARY_ENTRY_LEN]; + bool ok = false; + + if (fs == NULL || segment_hash == NULL) { + return false; + } + + if (!amduat_asl_store_index_fs_build_segment_summary_path(fs, + shard_id, + &summary_path)) { + return false; + } + if (!amduat_asl_store_index_fs_build_segments_path(fs, + shard_id, + &segments_path)) { + free(summary_path); + return false; + } + + fd = open(summary_path, O_RDWR | O_CREAT, 0644); + if (fd < 0) { + free(summary_path); + free(segments_path); + return false; + } + + if (fstat(fd, &st) != 0) { + close(fd); + free(summary_path); + free(segments_path); + return false; + } + if (st.st_size == 0) { + if (!amduat_asl_store_index_fs_summary_write_header(fd)) { + close(fd); + free(summary_path); + free(segments_path); + return false; + } + } else if (st.st_size < AMDUAT_ASL_STORE_INDEX_FS_SUMMARY_HEADER_LEN) { + close(fd); + free(summary_path); + free(segments_path); + return false; + } + + if (lseek(fd, 0, SEEK_END) < 0) { + close(fd); + free(summary_path); + free(segments_path); + return false; + } + + amduat_asl_store_index_fs_log_store_u64_le(entry, segment_id); + memcpy(entry + 8u, segment_hash, AMDUAT_ASL_STORE_INDEX_FS_SEGMENT_HASH_LEN); + + if (!amduat_asl_store_index_fs_log_write_exact(fd, + entry, + sizeof(entry))) { + close(fd); + free(summary_path); + free(segments_path); + return false; + } + ok = fsync(fd) == 0; + if (close(fd) != 0) { + ok = false; + } + if (ok) { + ok = amduat_asl_store_index_fs_fsync_directory(segments_path); + } + + free(summary_path); + free(segments_path); + return ok; +} + +static bool amduat_asl_store_index_fs_summary_lookup_hash( + amduat_asl_store_index_fs_t *fs, + uint16_t shard_id, + uint64_t segment_id, + uint8_t out_hash[AMDUAT_ASL_STORE_INDEX_FS_SEGMENT_HASH_LEN]) { + char *summary_path; + int fd; + size_t header_size; + uint8_t entry[AMDUAT_ASL_STORE_INDEX_FS_SUMMARY_ENTRY_LEN]; + + if (fs == NULL || out_hash == NULL) { + return false; + } + + if (!amduat_asl_store_index_fs_build_segment_summary_path(fs, + shard_id, + &summary_path)) { + return false; + } + fd = open(summary_path, O_RDONLY); + free(summary_path); + if (fd < 0) { + return false; + } + + if (!amduat_asl_store_index_fs_summary_read_header(fd, &header_size)) { + close(fd); + return false; + } + (void)header_size; + + while (true) { + uint64_t entry_segment_id; + int rc = amduat_asl_store_index_fs_log_read_exact_eof( + fd, entry, sizeof(entry)); + if (rc == 0) { + break; + } + if (rc < 0) { + close(fd); + return false; + } + entry_segment_id = amduat_asl_store_index_fs_load_u64_le(entry); + if (entry_segment_id == segment_id) { + memcpy(out_hash, + entry + 8u, + AMDUAT_ASL_STORE_INDEX_FS_SEGMENT_HASH_LEN); + close(fd); + return true; + } + } + + close(fd); + return false; +} + static amduat_asl_store_error_t amduat_asl_store_index_fs_stream_log_apply( const char *log_path, amduat_asl_replay_state_t *replay_state, @@ -1375,6 +1658,129 @@ static bool amduat_asl_store_index_fs_parse_snapshot_filename( return true; } +static bool amduat_asl_store_index_fs_snapshot_manifest_exists( + const char *root_path, + amduat_asl_snapshot_id_t snapshot_id) { + char *manifest_path; + struct stat st; + bool ok; + + if (root_path == NULL || snapshot_id == 0u) { + return false; + } + if (!amduat_asl_store_index_fs_layout_build_snapshot_manifest_path( + root_path, snapshot_id, &manifest_path)) { + return false; + } + ok = stat(manifest_path, &st) == 0 && S_ISREG(st.st_mode); + free(manifest_path); + return ok; +} + +static bool amduat_asl_store_index_fs_read_latest_snapshot_id( + amduat_asl_store_index_fs_t *fs, + amduat_asl_snapshot_id_t *out_id) { + char *latest_path; + uint8_t *latest_bytes; + size_t latest_len; + amduat_asl_store_index_fs_read_status_t status; + char *latest_str; + size_t trimmed_len; + uint64_t parsed; + + if (fs == NULL || out_id == NULL) { + return false; + } + + if (!amduat_asl_store_index_fs_layout_build_snapshot_latest_path( + fs->root_path, &latest_path)) { + return false; + } + + latest_bytes = NULL; + latest_len = 0u; + status = amduat_asl_store_index_fs_read_file(latest_path, + &latest_bytes, + &latest_len); + free(latest_path); + if (status == AMDUAT_ASL_STORE_INDEX_FS_READ_NOT_FOUND) { + return false; + } + if (status != AMDUAT_ASL_STORE_INDEX_FS_READ_OK || latest_len == 0u) { + free(latest_bytes); + return false; + } + + latest_str = (char *)malloc(latest_len + 1u); + if (latest_str == NULL) { + free(latest_bytes); + return false; + } + memcpy(latest_str, latest_bytes, latest_len); + latest_str[latest_len] = '\0'; + trimmed_len = latest_len; + while (trimmed_len > 0u) { + char c = latest_str[trimmed_len - 1u]; + if (c != '\n' && c != '\r' && c != ' ' && c != '\t') { + break; + } + latest_str[trimmed_len - 1u] = '\0'; + trimmed_len -= 1u; + } + if (trimmed_len == 0u || + !amduat_asl_store_index_fs_parse_u64(latest_str, &parsed) || + parsed == 0u) { + free(latest_bytes); + free(latest_str); + return false; + } + + free(latest_bytes); + free(latest_str); + *out_id = (amduat_asl_snapshot_id_t)parsed; + return true; +} + +static void amduat_asl_store_index_fs_write_latest_snapshot_id( + amduat_asl_store_index_fs_t *fs, + amduat_asl_snapshot_id_t snapshot_id) { + char *latest_path; + char *snapshots_path; + char buffer[32]; + int len; + amduat_asl_store_index_fs_write_status_t status; + + if (fs == NULL || snapshot_id == 0u) { + return; + } + if (!amduat_asl_store_index_fs_layout_build_snapshot_latest_path( + fs->root_path, &latest_path)) { + return; + } + if (!amduat_asl_store_index_fs_layout_build_snapshots_path( + fs->root_path, &snapshots_path)) { + free(latest_path); + return; + } + + len = snprintf(buffer, sizeof(buffer), "%" PRIu64, snapshot_id); + if (len <= 0 || (size_t)len >= sizeof(buffer)) { + free(latest_path); + free(snapshots_path); + return; + } + + status = amduat_asl_store_index_fs_write_replace(snapshots_path, + latest_path, + (const uint8_t *)buffer, + (size_t)len); + if (status == AMDUAT_ASL_STORE_INDEX_FS_WRITE_OK) { + (void)amduat_asl_store_index_fs_fsync_directory(snapshots_path); + } + free(latest_path); + free(snapshots_path); +} + static bool amduat_asl_store_index_fs_find_latest_snapshot_id( amduat_asl_store_index_fs_t *fs, amduat_asl_snapshot_id_t *out_id) { @@ -1388,6 +1794,13 @@ static bool amduat_asl_store_index_fs_find_latest_snapshot_id( return false; } + if (amduat_asl_store_index_fs_read_latest_snapshot_id(fs, &latest) && + amduat_asl_store_index_fs_snapshot_manifest_exists(fs->root_path, + latest)) { + *out_id = latest; + return true; + } + if (!amduat_asl_store_index_fs_layout_build_snapshots_path(fs->root_path, &snapshots_path)) { return false; @@ -1966,12 +2379,16 @@ static bool amduat_asl_store_index_fs_is_tombstoned( } static amduat_asl_store_error_t amduat_asl_store_index_fs_load_segment( + amduat_asl_store_index_fs_t *fs, + uint16_t shard_id, + uint64_t segment_id, const char *segment_path, amduat_asl_core_index_segment_t *out_segment, uint8_t out_hash[AMDUAT_ASL_STORE_INDEX_FS_SEGMENT_HASH_LEN]) { uint8_t *segment_bytes; size_t segment_len; amduat_asl_store_index_fs_read_status_t status; + bool has_summary_hash; if (out_segment == NULL || out_hash == NULL) { return AMDUAT_ASL_STORE_ERR_IO; @@ -1996,12 +2413,16 @@ static amduat_asl_store_error_t amduat_asl_store_index_fs_load_segment( return AMDUAT_ASL_STORE_ERR_IO; } - if (!amduat_hash_asl1_digest(AMDUAT_HASH_ASL1_ID_SHA256, - amduat_octets(segment_bytes, segment_len), - out_hash, - AMDUAT_ASL_STORE_INDEX_FS_SEGMENT_HASH_LEN)) { - free(segment_bytes); - return AMDUAT_ASL_STORE_ERR_IO; + has_summary_hash = amduat_asl_store_index_fs_summary_lookup_hash( + fs, shard_id, segment_id, out_hash); + if (!has_summary_hash) { + if (!amduat_hash_asl1_digest(AMDUAT_HASH_ASL1_ID_SHA256, + amduat_octets(segment_bytes, segment_len), + out_hash, + AMDUAT_ASL_STORE_INDEX_FS_SEGMENT_HASH_LEN)) { + free(segment_bytes); + return AMDUAT_ASL_STORE_ERR_IO; + } } if (!amduat_enc_asl_core_index_decode_v1( @@ -2271,7 +2692,10 @@ static amduat_asl_store_error_t amduat_asl_store_index_fs_scan_segments( return AMDUAT_ASL_STORE_ERR_IO; } - err = amduat_asl_store_index_fs_load_segment(segment_path, + err = amduat_asl_store_index_fs_load_segment(fs, + shard_id, + seal->segment_id, + segment_path, &segment, segment_hash); free(segment_path); @@ -2811,6 +3235,11 @@ static amduat_asl_store_error_t amduat_asl_store_index_fs_put_indexed_impl( return AMDUAT_ASL_STORE_ERR_IO; } + (void)amduat_asl_store_index_fs_summary_append(fs, + shard_id, + segment_id, + segment_hash); + if (!amduat_asl_store_index_fs_layout_build_log_path(fs->root_path, &log_path)) { amduat_reference_free(&derived_ref); @@ -3135,6 +3564,7 @@ amduat_asl_store_error_t amduat_asl_store_index_fs_snapshot_create( fs->next_snapshot_id = snapshot_id + 1u; } } + amduat_asl_store_index_fs_write_latest_snapshot_id(fs, snapshot_id); free(manifest_path); return AMDUAT_ASL_STORE_OK; } diff --git a/src/adapters/asl_store_index_fs/asl_store_index_fs_layout.c b/src/adapters/asl_store_index_fs/asl_store_index_fs_layout.c index e6716d6..d539290 100644 --- a/src/adapters/asl_store_index_fs/asl_store_index_fs_layout.c +++ b/src/adapters/asl_store_index_fs/asl_store_index_fs_layout.c @@ -311,6 +311,23 @@ bool amduat_asl_store_index_fs_layout_build_snapshot_manifest_path( return ok; } +bool amduat_asl_store_index_fs_layout_build_snapshot_latest_path( + const char *root_path, + char **out_path) { + char *snapshots_path; + bool ok; + + if (!amduat_asl_store_index_fs_layout_build_snapshots_path(root_path, + &snapshots_path)) { + return false; + } + ok = amduat_asl_store_index_fs_layout_join(snapshots_path, + "latest", + out_path); + free(snapshots_path); + return ok; +} + bool amduat_asl_store_index_fs_layout_build_segment_meta_path( const char *root_path, char **out_path) { @@ -328,6 +345,23 @@ bool amduat_asl_store_index_fs_layout_build_segment_meta_path( return ok; } +bool amduat_asl_store_index_fs_layout_build_segment_summary_path( + const char *root_path, + char **out_path) { + char *segments_path; + bool ok; + + if (!amduat_asl_store_index_fs_layout_build_segments_path(root_path, + &segments_path)) { + return false; + } + ok = amduat_asl_store_index_fs_layout_join(segments_path, + "summary", + out_path); + free(segments_path); + return ok; +} + bool amduat_asl_store_index_fs_layout_build_segment_path( const char *root_path, uint64_t segment_id, diff --git a/src/adapters/asl_store_index_fs/asl_store_index_fs_layout.h b/src/adapters/asl_store_index_fs/asl_store_index_fs_layout.h index 4bddb7b..8c25e55 100644 --- a/src/adapters/asl_store_index_fs/asl_store_index_fs_layout.h +++ b/src/adapters/asl_store_index_fs/asl_store_index_fs_layout.h @@ -66,10 +66,18 @@ bool amduat_asl_store_index_fs_layout_build_snapshot_manifest_path( uint64_t snapshot_id, char **out_path); +bool amduat_asl_store_index_fs_layout_build_snapshot_latest_path( + const char *root_path, + char **out_path); + bool amduat_asl_store_index_fs_layout_build_segment_meta_path( const char *root_path, char **out_path); +bool amduat_asl_store_index_fs_layout_build_segment_summary_path( + const char *root_path, + char **out_path); + bool amduat_asl_store_index_fs_layout_build_segment_path( const char *root_path, uint64_t segment_id, diff --git a/tests/asl/test_asl_store_index_fs.c b/tests/asl/test_asl_store_index_fs.c index 7850fd4..c6c5614 100644 --- a/tests/asl/test_asl_store_index_fs.c +++ b/tests/asl/test_asl_store_index_fs.c @@ -1,21 +1,26 @@ #include "amduat/asl/asl_store_index_fs.h" #include "amduat/asl/index_bloom.h" +#include "amduat/asl/ref_text.h" #include "amduat/asl/ref_derive.h" #include "amduat/asl/store.h" #include "amduat/enc/asl1_core.h" #include "amduat/enc/asl_core_index.h" +#include "amduat/format/ref.h" #include "amduat/hash/asl1.h" #include #include #include +#include #include #include #include #include #include +#include #include #include +#include #include static bool join_path(const char *base, const char *segment, char **out_path) { @@ -262,6 +267,72 @@ static uint16_t ref_shard(amduat_reference_t ref, uint16_t shard_count) { return (uint16_t)(hash % shard_count); } +static bool parse_size_env(const char *value, size_t *out) { + char *endptr; + unsigned long long parsed; + + if (value == NULL || out == NULL) { + return false; + } + errno = 0; + parsed = strtoull(value, &endptr, 10); + if (errno != 0 || endptr == value || *endptr != '\0') { + return false; + } + if (parsed > SIZE_MAX) { + return false; + } + *out = (size_t)parsed; + return true; +} + +static uint64_t now_ns(void) { + struct timespec ts; + + if (clock_gettime(CLOCK_MONOTONIC, &ts) != 0) { + return 0u; + } + return (uint64_t)ts.tv_sec * 1000u * 1000u * 1000u + + (uint64_t)ts.tv_nsec; +} + +static void sleep_ms(unsigned int ms) { + struct timespec ts; + + ts.tv_sec = (time_t)(ms / 1000u); + ts.tv_nsec = (long)((ms % 1000u) * 1000u * 1000u); + while (nanosleep(&ts, &ts) != 0 && errno == EINTR) { + } +} + +static long current_rss_kb(void) { + struct rusage usage; + + if (getrusage(RUSAGE_SELF, &usage) != 0) { + return -1; + } + return usage.ru_maxrss; +} + +static bool parse_u64_env(const char *value, uint64_t *out) { + char *endptr; + unsigned long long parsed; + + if (value == NULL || out == NULL) { + return false; + } + errno = 0; + parsed = strtoull(value, &endptr, 10); + if (errno != 0 || endptr == value || *endptr != '\0') { + return false; + } + if (parsed > UINT64_MAX) { + return false; + } + *out = (uint64_t)parsed; + return true; +} + static char *make_temp_root(void) { char *templ; const char template_prefix[] = "/tmp/amduat_test_asl_store_index_fs_XXXXXX"; @@ -668,6 +739,538 @@ cleanup: return exit_code; } +static int test_large_round_trip_perf(void) { + enum { k_artifact_count_default = 10000, k_payload_max = 64 }; + amduat_asl_store_config_t config; + amduat_asl_store_index_fs_t fs; + amduat_asl_store_t store; + amduat_asl_store_error_t err; + amduat_asl_index_state_t state; + amduat_reference_t *refs = NULL; + uint8_t **payloads = NULL; + size_t *payload_lens = NULL; + char *root; + int exit_code = 1; + size_t i; + size_t artifact_count; + uint64_t start_ns; + uint64_t end_ns; + long rss_kb; + const char *count_env; + + root = make_temp_root(); + if (root == NULL) { + fprintf(stderr, "temp root failed\n"); + return 1; + } + + artifact_count = k_artifact_count_default; + count_env = getenv("AMDUAT_ASL_PERF_COUNT"); + if (count_env != NULL) { + size_t parsed; + if (parse_size_env(count_env, &parsed) && parsed > 0u) { + artifact_count = parsed; + } + } + + refs = (amduat_reference_t *)calloc(artifact_count, sizeof(*refs)); + payloads = (uint8_t **)calloc(artifact_count, sizeof(*payloads)); + payload_lens = (size_t *)calloc(artifact_count, sizeof(*payload_lens)); + if (refs == NULL || payloads == NULL || payload_lens == NULL) { + fprintf(stderr, "alloc failed\n"); + goto cleanup; + } + + memset(&config, 0, sizeof(config)); + config.encoding_profile_id = AMDUAT_ENC_ASL1_CORE_V1; + config.hash_id = AMDUAT_HASH_ASL1_ID_SHA256; + if (!amduat_asl_store_index_fs_init(&fs, config, root)) { + fprintf(stderr, "index fs init failed\n"); + goto cleanup; + } + amduat_asl_store_init(&store, config, amduat_asl_store_index_fs_ops(), &fs); + + srand(1337); + start_ns = now_ns(); + for (i = 0; i < artifact_count; ++i) { + size_t len = (size_t)((rand() % k_payload_max) + 1); + uint8_t *payload = (uint8_t *)malloc(len); + if (payload == NULL) { + fprintf(stderr, "payload alloc failed\n"); + goto cleanup; + } + for (size_t j = 0; j < len; ++j) { + payload[j] = (uint8_t)(rand() & 0xffu); + } + payloads[i] = payload; + payload_lens[i] = len; + refs[i] = amduat_reference(0u, amduat_octets(NULL, 0u)); + err = amduat_asl_store_put(&store, + amduat_artifact(amduat_octets(payload, len)), + &refs[i]); + if (err != AMDUAT_ASL_STORE_OK) { + fprintf(stderr, "put failed: %d\n", err); + goto cleanup; + } + } + end_ns = now_ns(); + rss_kb = current_rss_kb(); + if (start_ns != 0u && end_ns != 0u) { + fprintf(stderr, + "perf: put %zu artifacts in %.3f ms\n", + artifact_count, + (double)(end_ns - start_ns) / 1000000.0); + } + if (rss_kb >= 0) { + fprintf(stderr, "perf: maxrss %ld KB\n", rss_kb); + } + + if (!amduat_asl_index_current_state(&store, &state)) { + fprintf(stderr, "current_state failed\n"); + goto cleanup; + } + + start_ns = now_ns(); + for (i = 0; i < artifact_count; ++i) { + amduat_artifact_t loaded = + amduat_artifact(amduat_octets(NULL, 0u)); + err = amduat_asl_store_get_indexed(&store, refs[i], state, &loaded); + if (err != AMDUAT_ASL_STORE_OK) { + char *ref_text = NULL; + (void)amduat_format_ref_to_text(refs[i], AMDUAT_FORMAT_REF_HEX, + &ref_text); + fprintf(stderr, + "get failed: %d index=%zu snapshot=%" PRIu64 " log=%" PRIu64 + " ref=%s\n", + err, + i, + state.snapshot_id, + state.log_position, + ref_text == NULL ? "(null)" : ref_text); + free(ref_text); + goto cleanup; + } + if (loaded.bytes.len != payload_lens[i] || + (loaded.bytes.len != 0u && + memcmp(loaded.bytes.data, payloads[i], loaded.bytes.len) != 0)) { + fprintf(stderr, "payload mismatch\n"); + amduat_artifact_free(&loaded); + goto cleanup; + } + amduat_artifact_free(&loaded); + } + end_ns = now_ns(); + if (start_ns != 0u && end_ns != 0u) { + fprintf(stderr, + "perf: get %zu artifacts in %.3f ms\n", + artifact_count, + (double)(end_ns - start_ns) / 1000000.0); + } + + exit_code = 0; + +cleanup: + if (refs != NULL) { + for (i = 0; i < artifact_count; ++i) { + amduat_reference_free(&refs[i]); + } + } + if (payloads != NULL) { + for (i = 0; i < artifact_count; ++i) { + free(payloads[i]); + } + } + free(payloads); + free(payload_lens); + free(refs); + if (!remove_tree(root)) { + fprintf(stderr, "cleanup failed\n"); + exit_code = 1; + } + free(root); + return exit_code; +} + +typedef struct { + const char *root; + const char *log_path; + amduat_asl_store_config_t config; + uint64_t duration_ns; + volatile bool stop; + volatile bool failed; + volatile uint64_t put_count; + volatile uint64_t get_count; +} stress_state_t; + +static void *stress_writer_thread(void *arg) { + stress_state_t *state = (stress_state_t *)arg; + const amduat_hash_asl1_desc_t *hash_desc; + amduat_asl_store_index_fs_t fs; + amduat_asl_store_t store; + amduat_asl_store_index_fs_snapshot_policy_t policy; + uint8_t *payload = NULL; + FILE *log_fp = NULL; + uint64_t start_ns; + uint64_t now; + uint64_t flush_count = 0u; + + if (state == NULL) { + return NULL; + } + + hash_desc = amduat_hash_asl1_desc_lookup(state->config.hash_id); + if (hash_desc == NULL || hash_desc->digest_len == 0) { + state->failed = true; + state->stop = true; + return NULL; + } + + if (!amduat_asl_store_index_fs_init(&fs, state->config, state->root)) { + state->failed = true; + state->stop = true; + return NULL; + } + memset(&policy, 0, sizeof(policy)); + policy.enabled = false; + amduat_asl_store_index_fs_set_snapshot_policy(&fs, policy); + amduat_asl_store_init(&store, state->config, amduat_asl_store_index_fs_ops(), + &fs); + + payload = (uint8_t *)calloc(hash_desc->digest_len, 1u); + if (payload == NULL) { + state->failed = true; + state->stop = true; + return NULL; + } + + log_fp = fopen(state->log_path, "ab"); + if (log_fp == NULL) { + free(payload); + state->failed = true; + state->stop = true; + return NULL; + } + + start_ns = now_ns(); + while (!state->stop) { + amduat_reference_t ref = amduat_reference(0u, amduat_octets(NULL, 0u)); + amduat_asl_store_error_t err; + char *hex = NULL; + + err = amduat_asl_store_put(&store, + amduat_artifact(amduat_octets( + payload, hash_desc->digest_len)), + &ref); + if (err != AMDUAT_ASL_STORE_OK) { + state->failed = true; + state->stop = true; + break; + } + + if (ref.digest.len == hash_desc->digest_len && + ref.digest.data != NULL) { + memcpy(payload, ref.digest.data, ref.digest.len); + } else { + amduat_reference_free(&ref); + state->failed = true; + state->stop = true; + break; + } + + if (!amduat_asl_ref_encode_hex(ref, &hex) || hex == NULL) { + amduat_reference_free(&ref); + state->failed = true; + state->stop = true; + break; + } + fprintf(log_fp, "%s\n", hex); + free(hex); + amduat_reference_free(&ref); + + __sync_fetch_and_add(&state->put_count, 1u); + flush_count++; + if ((flush_count % 256u) == 0u) { + fflush(log_fp); + } + + now = now_ns(); + if (now != 0u && now - start_ns >= state->duration_ns) { + state->stop = true; + break; + } + } + + fflush(log_fp); + fclose(log_fp); + free(payload); + return NULL; +} + +static void *stress_reader_thread(void *arg) { + stress_state_t *state = (stress_state_t *)arg; + amduat_asl_store_index_fs_t fs; + amduat_asl_store_t store; + amduat_asl_store_index_fs_snapshot_policy_t policy; + FILE *log_fp = NULL; + unsigned int seed = 1337u; + struct stat st; + + if (state == NULL) { + return NULL; + } + if (!amduat_asl_store_index_fs_init(&fs, state->config, state->root)) { + state->failed = true; + state->stop = true; + return NULL; + } + memset(&policy, 0, sizeof(policy)); + policy.enabled = false; + amduat_asl_store_index_fs_set_snapshot_policy(&fs, policy); + amduat_asl_store_init(&store, state->config, amduat_asl_store_index_fs_ops(), + &fs); + + log_fp = fopen(state->log_path, "rb"); + while (log_fp == NULL && !state->stop) { + if (errno != ENOENT) { + state->failed = true; + state->stop = true; + return NULL; + } + sleep_ms(10u); + log_fp = fopen(state->log_path, "rb"); + } + if (log_fp == NULL) { + state->failed = true; + state->stop = true; + return NULL; + } + + while (!state->stop) { + char line[256]; + long offset; + size_t len; + amduat_reference_t ref; + + if (fstat(fileno(log_fp), &st) != 0) { + state->failed = true; + state->stop = true; + break; + } + if (st.st_size <= 0) { + continue; + } + + offset = (long)(rand_r(&seed) % (unsigned int)st.st_size); + if (fseek(log_fp, offset, SEEK_SET) != 0) { + state->failed = true; + state->stop = true; + break; + } + if (offset != 0) { + if (fgets(line, sizeof(line), log_fp) == NULL) { + continue; + } + } + if (fgets(line, sizeof(line), log_fp) == NULL) { + continue; + } + + len = strlen(line); + if (len == 0u || line[len - 1u] != '\n') { + continue; + } + line[len - 1u] = '\0'; + if (!amduat_asl_ref_decode_hex(line, &ref)) { + fprintf(stderr, "stress: decode ref failed\n"); + state->failed = true; + state->stop = true; + break; + } + + { + amduat_artifact_t loaded = + amduat_artifact(amduat_octets(NULL, 0u)); + amduat_asl_store_error_t err = + amduat_asl_store_get(&store, ref, &loaded); + if (err != AMDUAT_ASL_STORE_OK) { + char *ref_text = NULL; + (void)amduat_format_ref_to_text(ref, AMDUAT_FORMAT_REF_HEX, + &ref_text); + fprintf(stderr, + "stress: get failed: %d ref=%s\n", + err, + ref_text == NULL ? "(null)" : ref_text); + free(ref_text); + state->failed = true; + state->stop = true; + amduat_artifact_free(&loaded); + amduat_reference_free(&ref); + break; + } + amduat_artifact_free(&loaded); + __sync_fetch_and_add(&state->get_count, 1u); + } + amduat_reference_free(&ref); + } + + fclose(log_fp); + return NULL; +} + +static int test_writer_reader_stress(void) { + amduat_asl_store_config_t config; + pthread_t writer; + pthread_t reader; + stress_state_t state; + char *root; + char *log_path = NULL; + uint64_t duration_ns = 60u * 1000u * 1000u * 1000u; + const char *duration_env; + uint64_t start_ns; + uint64_t last_ns; + uint64_t now; + uint64_t last_put = 0u; + uint64_t last_get = 0u; + double put_rate_first = 0.0; + double get_rate_first = 0.0; + double put_rate_last = 0.0; + double get_rate_last = 0.0; + double put_rate_min = 0.0; + double put_rate_max = 0.0; + double get_rate_min = 0.0; + double get_rate_max = 0.0; + double put_rate_sum = 0.0; + double get_rate_sum = 0.0; + uint64_t rate_samples = 0u; + + root = make_temp_root(); + if (root == NULL) { + fprintf(stderr, "temp root failed\n"); + return 1; + } + + if (!join_path(root, "refs.log", &log_path)) { + fprintf(stderr, "log path failed\n"); + free(root); + return 1; + } + + memset(&config, 0, sizeof(config)); + config.encoding_profile_id = AMDUAT_ENC_ASL1_CORE_V1; + config.hash_id = AMDUAT_HASH_ASL1_ID_SHA256; + duration_env = getenv("AMDUAT_ASL_STRESS_SECS"); + if (duration_env != NULL) { + uint64_t seconds; + if (parse_u64_env(duration_env, &seconds)) { + duration_ns = seconds * 1000u * 1000u * 1000u; + } + } + + memset(&state, 0, sizeof(state)); + state.root = root; + state.log_path = log_path; + state.config = config; + state.duration_ns = duration_ns; + state.stop = false; + state.failed = false; + + start_ns = now_ns(); + last_ns = start_ns; + + if (pthread_create(&writer, NULL, stress_writer_thread, &state) != 0 || + pthread_create(&reader, NULL, stress_reader_thread, &state) != 0) { + fprintf(stderr, "thread start failed\n"); + state.stop = true; + } + + while (!state.stop) { + now = now_ns(); + if (now != 0u && now - last_ns >= 5u * 1000u * 1000u * 1000u) { + uint64_t puts = state.put_count; + uint64_t gets = state.get_count; + double elapsed_s = (double)(now - last_ns) / 1000000000.0; + double put_rate = (puts - last_put) / (elapsed_s > 0 ? elapsed_s : 1.0); + double get_rate = (gets - last_get) / (elapsed_s > 0 ? elapsed_s : 1.0); + if (rate_samples == 0u) { + put_rate_first = put_rate; + get_rate_first = get_rate; + put_rate_min = put_rate; + put_rate_max = put_rate; + get_rate_min = get_rate; + get_rate_max = get_rate; + } else { + if (put_rate < put_rate_min) { + put_rate_min = put_rate; + } + if (put_rate > put_rate_max) { + put_rate_max = put_rate; + } + if (get_rate < get_rate_min) { + get_rate_min = get_rate; + } + if (get_rate > get_rate_max) { + get_rate_max = get_rate; + } + } + put_rate_last = put_rate; + get_rate_last = get_rate; + put_rate_sum += put_rate; + get_rate_sum += get_rate; + rate_samples++; + fprintf(stderr, + "stress: t=%.1fs puts=%" PRIu64 " gets=%" PRIu64 + " put/s=%.1f get/s=%.1f\n", + (double)(now - start_ns) / 1000000000.0, + puts, + gets, + put_rate, + get_rate); + last_put = puts; + last_get = gets; + last_ns = now; + } + if (now != 0u && now - start_ns >= duration_ns) { + state.stop = true; + } + if (state.failed) { + state.stop = true; + } + sleep_ms(10u); + } + + pthread_join(writer, NULL); + pthread_join(reader, NULL); + + if (state.failed) { + fprintf(stderr, "stress: failed\n"); + } + if (rate_samples > 0u) { + fprintf(stderr, + "stress: put/s first=%.1f last=%.1f min=%.1f max=%.1f avg=%.1f\n", + put_rate_first, + put_rate_last, + put_rate_min, + put_rate_max, + put_rate_sum / (double)rate_samples); + fprintf(stderr, + "stress: get/s first=%.1f last=%.1f min=%.1f max=%.1f avg=%.1f\n", + get_rate_first, + get_rate_last, + get_rate_min, + get_rate_max, + get_rate_sum / (double)rate_samples); + } + + free(log_path); + if (!remove_tree(root)) { + fprintf(stderr, "cleanup failed\n"); + free(root); + return 1; + } + free(root); + return state.failed ? 1 : 0; +} + int main(void) { if (test_round_trip() != 0) { return 1; @@ -675,5 +1278,11 @@ int main(void) { if (test_snapshot_truncation() != 0) { return 1; } + if (test_large_round_trip_perf() != 0) { + return 1; + } + if (test_writer_reader_stress() != 0) { + return 1; + } return test_shard_routing(); }