From dde532d68f4bdba84543857b1ba2754789ed4d33 Mon Sep 17 00:00:00 2001 From: Carl Niklas Rydberg Date: Sun, 18 Jan 2026 05:54:31 +0100 Subject: [PATCH] Improve ASL store index lifecycle --- include/amduat/asl/asl_store_index_fs.h | 31 + include/amduat/asl/store.h | 36 + .../asl_store_index_fs/asl_store_index_fs.c | 1558 ++++++++++++++++- src/near_core/asl/store.c | 60 + tests/asl/test_asl_store_index_fs.c | 354 ++++ tests/asl/test_asl_store_indexed_ops.c | 20 + 6 files changed, 2049 insertions(+), 10 deletions(-) diff --git a/include/amduat/asl/asl_store_index_fs.h b/include/amduat/asl/asl_store_index_fs.h index fa7b86d..ba464d8 100644 --- a/include/amduat/asl/asl_store_index_fs.h +++ b/include/amduat/asl/asl_store_index_fs.h @@ -18,15 +18,30 @@ typedef struct { uint64_t idle_time_ns; } amduat_asl_store_index_fs_snapshot_policy_t; +typedef struct { + uint64_t max_segment_records; + uint64_t max_segment_bytes; + uint64_t small_artifact_threshold; + bool allow_deferred_visibility; +} amduat_asl_store_index_fs_segment_policy_t; + +typedef struct { + uint32_t segment_domain_id; + uint8_t record_visibility; +} amduat_asl_store_index_fs_visibility_policy_t; + typedef struct { amduat_asl_store_config_t config; amduat_asl_store_index_fs_snapshot_policy_t snapshot_policy; + amduat_asl_store_index_fs_segment_policy_t segment_policy; + amduat_asl_store_index_fs_visibility_policy_t visibility_policy; uint16_t shard_count; char root_path[AMDUAT_ASL_STORE_INDEX_FS_ROOT_MAX]; uint64_t pending_snapshot_bytes; uint64_t last_ingest_time_ns; amduat_asl_snapshot_id_t next_snapshot_id; bool snapshot_state_initialized; + void *open_segments; } amduat_asl_store_index_fs_t; bool amduat_asl_store_index_fs_init(amduat_asl_store_index_fs_t *fs, @@ -37,6 +52,14 @@ void amduat_asl_store_index_fs_set_snapshot_policy( amduat_asl_store_index_fs_t *fs, amduat_asl_store_index_fs_snapshot_policy_t policy); +void amduat_asl_store_index_fs_set_segment_policy( + amduat_asl_store_index_fs_t *fs, + amduat_asl_store_index_fs_segment_policy_t policy); + +void amduat_asl_store_index_fs_set_visibility_policy( + amduat_asl_store_index_fs_t *fs, + amduat_asl_store_index_fs_visibility_policy_t policy); + void amduat_asl_store_index_fs_set_shard_count( amduat_asl_store_index_fs_t *fs, uint16_t shard_count); @@ -47,6 +70,14 @@ amduat_asl_store_error_t amduat_asl_store_index_fs_snapshot_create( amduat_asl_log_position_t *out_logseq, uint8_t out_root_hash[32]); +amduat_asl_store_error_t amduat_asl_store_index_fs_flush( + amduat_asl_store_index_fs_t *fs, + amduat_asl_index_state_t *out_state); + +amduat_asl_store_error_t amduat_asl_store_index_fs_gc( + amduat_asl_store_index_fs_t *fs, + const amduat_asl_index_state_t *state); + amduat_asl_store_ops_t amduat_asl_store_index_fs_ops(void); #ifdef __cplusplus diff --git a/include/amduat/asl/store.h b/include/amduat/asl/store.h index 0125c8f..e1a477e 100644 --- a/include/amduat/asl/store.h +++ b/include/amduat/asl/store.h @@ -2,6 +2,7 @@ #define AMDUAT_ASL_STORE_H #include "amduat/asl/core.h" +#include "amduat/enc/asl_log.h" #include #include @@ -48,6 +49,20 @@ typedef struct { amduat_reference_t ref, amduat_asl_index_state_t state, amduat_artifact_t *out_artifact); + amduat_asl_store_error_t (*tombstone)(void *ctx, + amduat_reference_t ref, + uint32_t scope, + uint32_t reason_code, + amduat_asl_index_state_t *out_state); + amduat_asl_store_error_t (*tombstone_lift)( + void *ctx, + amduat_reference_t ref, + amduat_asl_log_position_t tombstone_logseq, + amduat_asl_index_state_t *out_state); + amduat_asl_store_error_t (*log_scan)( + void *ctx, + amduat_asl_log_record_t **out_records, + size_t *out_count); bool (*current_state)(void *ctx, amduat_asl_index_state_t *out_state); amduat_asl_store_error_t (*validate_config)( void *ctx, @@ -62,6 +77,9 @@ static inline void amduat_asl_store_ops_init(amduat_asl_store_ops_t *ops) { ops->get = NULL; ops->put_indexed = NULL; ops->get_indexed = NULL; + ops->tombstone = NULL; + ops->tombstone_lift = NULL; + ops->log_scan = NULL; ops->current_state = NULL; ops->validate_config = NULL; } @@ -97,6 +115,24 @@ amduat_asl_store_error_t amduat_asl_store_get_indexed( amduat_asl_index_state_t state, amduat_artifact_t *out_artifact); +amduat_asl_store_error_t amduat_asl_store_tombstone( + amduat_asl_store_t *store, + amduat_reference_t ref, + uint32_t scope, + uint32_t reason_code, + amduat_asl_index_state_t *out_state); + +amduat_asl_store_error_t amduat_asl_store_tombstone_lift( + amduat_asl_store_t *store, + amduat_reference_t ref, + amduat_asl_log_position_t tombstone_logseq, + amduat_asl_index_state_t *out_state); + +amduat_asl_store_error_t amduat_asl_log_scan( + amduat_asl_store_t *store, + amduat_asl_log_record_t **out_records, + size_t *out_count); + bool amduat_asl_index_current_state(amduat_asl_store_t *store, amduat_asl_index_state_t *out_state); diff --git a/src/adapters/asl_store_index_fs/asl_store_index_fs.c b/src/adapters/asl_store_index_fs/asl_store_index_fs.c index 6115c38..a6220aa 100644 --- a/src/adapters/asl_store_index_fs/asl_store_index_fs.c +++ b/src/adapters/asl_store_index_fs/asl_store_index_fs.c @@ -34,6 +34,7 @@ enum { AMDUAT_ASL_STORE_INDEX_FS_SEGMENT_HASH_LEN = 32, AMDUAT_ASL_STORE_INDEX_FS_DEFAULT_PENDING_BYTES = 128u * 1024u, AMDUAT_ASL_STORE_INDEX_FS_DEFAULT_IDLE_NS = 2u * 1000u * 1000u * 1000u, + AMDUAT_ASL_STORE_INDEX_FS_DEFAULT_SEGMENT_RECORDS = 1u, AMDUAT_ASL_STORE_INDEX_FS_SHARD_SHIFT = 48, AMDUAT_ASL_STORE_INDEX_FS_LOG_MAGIC_LEN = 8, AMDUAT_ASL_STORE_INDEX_FS_LOG_HASH_LEN = 32, @@ -45,6 +46,26 @@ enum { AMDUAT_ASL_STORE_INDEX_FS_SUMMARY_VERSION = 1 }; +typedef struct { + bool active; + uint16_t shard_id; + uint64_t segment_id; + uint8_t max_visibility; + uint64_t record_count; + uint64_t block_size; + int block_fd; + char *block_temp_path; + amduat_asl_index_record_t *records; + size_t records_len; + size_t records_cap; + amduat_asl_extent_record_t *extents; + size_t extents_len; + size_t extents_cap; + uint8_t *digests; + size_t digests_len; + size_t digests_cap; +} amduat_asl_store_index_fs_open_segment_t; + typedef enum { AMDUAT_ASL_STORE_INDEX_FS_WRITE_OK = 0, AMDUAT_ASL_STORE_INDEX_FS_WRITE_EXIST = 1, @@ -66,6 +87,48 @@ amduat_asl_store_index_fs_write_replace(const char *temp_dir, static uint64_t amduat_asl_store_index_fs_load_u64_le(const uint8_t *data); +static bool amduat_asl_store_index_fs_build_blocks_path( + amduat_asl_store_index_fs_t *fs, + uint16_t shard_id, + char **out_path); + +static bool amduat_asl_store_index_fs_build_segments_path( + amduat_asl_store_index_fs_t *fs, + uint16_t shard_id, + char **out_path); + +static bool amduat_asl_store_index_fs_make_segment_id( + uint16_t shard_id, + uint64_t local_id, + uint64_t *out_id); + +static uint64_t amduat_asl_store_index_fs_now_ns(void); + +static bool amduat_asl_store_index_fs_build_bloom( + const amduat_asl_core_index_segment_t *segment, + amduat_octets_t *out_bloom); + +static bool amduat_asl_store_index_fs_summary_append( + amduat_asl_store_index_fs_t *fs, + uint16_t shard_id, + uint64_t segment_id, + const uint8_t segment_hash[AMDUAT_ASL_STORE_INDEX_FS_SEGMENT_HASH_LEN]); + +static amduat_asl_store_error_t +amduat_asl_store_index_fs_read_next_segment_id(amduat_asl_store_index_fs_t *fs, + uint16_t shard_id, + uint64_t *out_segment_id); + +static amduat_asl_store_error_t +amduat_asl_store_index_fs_write_next_segment_id( + amduat_asl_store_index_fs_t *fs, + uint16_t shard_id, + uint64_t next_id); + +static bool amduat_asl_store_index_fs_current_state_impl( + void *ctx, + amduat_asl_index_state_t *out_state); + static const uint8_t k_amduat_asl_store_index_fs_log_magic[ AMDUAT_ASL_STORE_INDEX_FS_LOG_MAGIC_LEN] = {'A', 'S', 'L', 'L', 'O', 'G', '0', '1'}; @@ -134,6 +197,83 @@ static bool amduat_asl_store_index_fs_join_path(const char *base, return true; } +static void amduat_asl_store_index_fs_open_segment_reset( + amduat_asl_store_index_fs_open_segment_t *segment) { + if (segment == NULL) { + return; + } + if (segment->block_fd >= 0) { + close(segment->block_fd); + segment->block_fd = -1; + } + if (segment->block_temp_path != NULL) { + unlink(segment->block_temp_path); + } + free(segment->block_temp_path); + segment->block_temp_path = NULL; + free(segment->records); + free(segment->extents); + free(segment->digests); + segment->records = NULL; + segment->extents = NULL; + segment->digests = NULL; + segment->records_len = 0u; + segment->records_cap = 0u; + segment->extents_len = 0u; + segment->extents_cap = 0u; + segment->digests_len = 0u; + segment->digests_cap = 0u; + segment->record_count = 0u; + segment->block_size = 0u; + segment->segment_id = 0u; + segment->shard_id = 0u; + segment->max_visibility = 0u; + segment->active = false; +} + +static void amduat_asl_store_index_fs_open_segments_free( + amduat_asl_store_index_fs_t *fs) { + amduat_asl_store_index_fs_open_segment_t *segments; + size_t i; + + if (fs == NULL || fs->open_segments == NULL) { + return; + } + segments = (amduat_asl_store_index_fs_open_segment_t *)fs->open_segments; + for (i = 0u; i < fs->shard_count; ++i) { + amduat_asl_store_index_fs_open_segment_reset(&segments[i]); + } + free(segments); + fs->open_segments = NULL; +} + +static amduat_asl_store_index_fs_open_segment_t * +amduat_asl_store_index_fs_open_segment_for( + amduat_asl_store_index_fs_t *fs, + uint16_t shard_id) { + amduat_asl_store_index_fs_open_segment_t *segments; + + if (fs == NULL) { + return NULL; + } + if (fs->open_segments == NULL) { + segments = (amduat_asl_store_index_fs_open_segment_t *)calloc( + fs->shard_count, sizeof(*segments)); + if (segments == NULL) { + return NULL; + } + for (size_t i = 0u; i < fs->shard_count; ++i) { + segments[i].block_fd = -1; + } + fs->open_segments = segments; + } + segments = (amduat_asl_store_index_fs_open_segment_t *)fs->open_segments; + if (shard_id >= fs->shard_count) { + return NULL; + } + return &segments[shard_id]; +} + static bool amduat_asl_store_index_fs_fsync_directory(const char *path) { int fd; int fsync_errno; @@ -169,6 +309,12 @@ static void amduat_asl_store_index_fs_log_store_u32_le(uint8_t *out, out[3] = (uint8_t)((value >> 24) & 0xffu); } +static void amduat_asl_store_index_fs_log_store_u16_le(uint8_t *out, + uint16_t value) { + out[0] = (uint8_t)(value & 0xffu); + out[1] = (uint8_t)((value >> 8) & 0xffu); +} + static void amduat_asl_store_index_fs_log_store_u64_le(uint8_t *out, uint64_t value) { out[0] = (uint8_t)(value & 0xffu); @@ -590,6 +736,61 @@ amduat_asl_store_index_fs_write_atomic(const char *temp_dir, return AMDUAT_ASL_STORE_INDEX_FS_WRITE_OK; } +static bool amduat_asl_store_index_fs_create_block_temp( + amduat_asl_store_index_fs_t *fs, + uint16_t shard_id, + char **out_path, + int *out_fd) { + static const char suffix[] = "block.tmp.XXXXXX"; + char *blocks_path; + size_t dir_len; + size_t template_len; + bool need_sep; + char *template_path; + int fd; + + if (fs == NULL || out_path == NULL || out_fd == NULL) { + return false; + } + *out_path = NULL; + *out_fd = -1; + + if (!amduat_asl_store_index_fs_build_blocks_path(fs, shard_id, + &blocks_path)) { + return false; + } + if (!amduat_asl_store_index_fs_ensure_directory(blocks_path)) { + free(blocks_path); + return false; + } + + dir_len = strlen(blocks_path); + need_sep = dir_len == 0u || blocks_path[dir_len - 1u] != '/'; + template_len = dir_len + (need_sep ? 1u : 0u) + sizeof(suffix); + + template_path = (char *)malloc(template_len); + if (template_path == NULL) { + free(blocks_path); + return false; + } + if (need_sep) { + snprintf(template_path, template_len, "%s/%s", blocks_path, suffix); + } else { + snprintf(template_path, template_len, "%s%s", blocks_path, suffix); + } + free(blocks_path); + + fd = mkstemp(template_path); + if (fd < 0) { + free(template_path); + return false; + } + + *out_path = template_path; + *out_fd = fd; + return true; +} + static amduat_asl_store_index_fs_write_status_t amduat_asl_store_index_fs_write_replace(const char *temp_dir, const char *final_path, @@ -925,6 +1126,723 @@ static amduat_asl_store_error_t amduat_asl_store_index_fs_write_log( return AMDUAT_ASL_STORE_ERR_IO; } +static bool amduat_asl_store_index_fs_encode_artifact_ref( + amduat_reference_t ref, + uint8_t *out, + size_t out_len, + size_t *out_written) { + size_t total_len; + + if (out_written == NULL) { + return false; + } + *out_written = 0u; + if (out == NULL) { + return false; + } + if (ref.digest.len == 0u || ref.digest.data == NULL) { + return false; + } + if (ref.digest.len > UINT16_MAX) { + return false; + } + + total_len = 4u + 2u + 2u + ref.digest.len; + if (out_len < total_len) { + return false; + } + + amduat_asl_store_index_fs_log_store_u32_le(out, ref.hash_id); + amduat_asl_store_index_fs_log_store_u16_le(out + 4u, + (uint16_t)ref.digest.len); + amduat_asl_store_index_fs_log_store_u16_le(out + 6u, 0u); + memcpy(out + 8u, ref.digest.data, ref.digest.len); + *out_written = total_len; + return true; +} + +static amduat_asl_store_error_t amduat_asl_store_index_fs_append_log_record( + amduat_asl_store_index_fs_t *fs, + uint32_t record_type, + const uint8_t *payload, + size_t payload_len, + amduat_asl_index_state_t *out_state) { + char *index_path = NULL; + char *log_path = NULL; + amduat_asl_log_record_t *log_records = NULL; + size_t log_count = 0u; + amduat_asl_log_record_t *next = NULL; + uint64_t new_logseq; + uint8_t *payload_copy = NULL; + amduat_asl_store_error_t err; + + if (fs == NULL) { + return AMDUAT_ASL_STORE_ERR_IO; + } + if (payload_len != 0u && payload == NULL) { + return AMDUAT_ASL_STORE_ERR_IO; + } + if (payload_len > UINT32_MAX) { + return AMDUAT_ASL_STORE_ERR_UNSUPPORTED; + } + + if (!amduat_asl_store_index_fs_layout_build_index_path(fs->root_path, + &index_path) || + !amduat_asl_store_index_fs_ensure_directory(index_path)) { + free(index_path); + return AMDUAT_ASL_STORE_ERR_IO; + } + if (!amduat_asl_store_index_fs_layout_build_log_path(fs->root_path, + &log_path)) { + free(index_path); + return AMDUAT_ASL_STORE_ERR_IO; + } + + err = amduat_asl_store_index_fs_load_log(log_path, &log_records, &log_count); + if (err != AMDUAT_ASL_STORE_OK) { + amduat_asl_store_index_fs_log_free(log_records, log_count); + free(log_path); + free(index_path); + return err; + } + if (log_count != 0u && log_records == NULL) { + free(log_path); + free(index_path); + return AMDUAT_ASL_STORE_ERR_IO; + } + + if (log_count == 0u) { + log_records = (amduat_asl_log_record_t *)calloc(1u, + sizeof(*log_records)); + if (log_records == NULL) { + free(log_path); + free(index_path); + return AMDUAT_ASL_STORE_ERR_IO; + } + new_logseq = 1u; + } else { + if (log_records[log_count - 1u].logseq == UINT64_MAX) { + amduat_asl_store_index_fs_log_free(log_records, log_count); + free(log_path); + free(index_path); + return AMDUAT_ASL_STORE_ERR_IO; + } + new_logseq = log_records[log_count - 1u].logseq + 1u; + next = (amduat_asl_log_record_t *)realloc( + log_records, (log_count + 1u) * sizeof(*log_records)); + if (next == NULL) { + amduat_asl_store_index_fs_log_free(log_records, log_count); + free(log_path); + free(index_path); + return AMDUAT_ASL_STORE_ERR_IO; + } + log_records = next; + } + + if (payload_len != 0u) { + payload_copy = (uint8_t *)malloc(payload_len); + if (payload_copy == NULL) { + amduat_asl_store_index_fs_log_free(log_records, log_count); + free(log_path); + free(index_path); + return AMDUAT_ASL_STORE_ERR_IO; + } + memcpy(payload_copy, payload, payload_len); + } + + log_records[log_count].logseq = new_logseq; + log_records[log_count].record_type = record_type; + log_records[log_count].payload = amduat_octets(payload_copy, payload_len); + memset(log_records[log_count].record_hash, 0, + sizeof(log_records[log_count].record_hash)); + log_count += 1u; + + err = amduat_asl_store_index_fs_write_log(log_path, index_path, + log_records, log_count); + amduat_asl_store_index_fs_log_free(log_records, log_count); + free(log_path); + free(index_path); + if (err != AMDUAT_ASL_STORE_OK) { + return err; + } + + if (out_state != NULL && + !amduat_asl_store_index_fs_current_state_impl(fs, out_state)) { + return AMDUAT_ASL_STORE_ERR_IO; + } + return AMDUAT_ASL_STORE_OK; +} + +static bool amduat_asl_store_index_fs_open_segment_contains( + const amduat_asl_store_index_fs_open_segment_t *segment, + amduat_reference_t ref) { + size_t digest_cursor; + size_t i; + + if (segment == NULL) { + return false; + } + if (ref.digest.len != 0u && ref.digest.data == NULL) { + return false; + } + digest_cursor = 0u; + for (i = 0u; i < segment->records_len; ++i) { + const amduat_asl_index_record_t *record = &segment->records[i]; + if (record->hash_id == (uint32_t)ref.hash_id && + record->digest_len == ref.digest.len && + record->digest_len <= segment->digests_len - digest_cursor) { + if (record->digest_len == 0u || + memcmp(segment->digests + digest_cursor, + ref.digest.data, + record->digest_len) == 0) { + return true; + } + } + digest_cursor += record->digest_len; + } + return false; +} + +static amduat_asl_store_error_t amduat_asl_store_index_fs_open_segment_init( + amduat_asl_store_index_fs_t *fs, + amduat_asl_store_index_fs_open_segment_t *segment, + uint16_t shard_id) { + uint64_t local_segment_id; + uint64_t segment_id; + amduat_asl_store_error_t err; + int block_fd; + char *block_temp_path = NULL; + char *index_path = NULL; + char *segments_path = NULL; + char *blocks_path = NULL; + + if (fs == NULL || segment == NULL) { + return AMDUAT_ASL_STORE_ERR_IO; + } + if (segment->active) { + return AMDUAT_ASL_STORE_OK; + } + + if (!amduat_asl_store_index_fs_prepare_dirs(fs, + shard_id, + &index_path, + &segments_path, + &blocks_path)) { + free(index_path); + free(segments_path); + free(blocks_path); + return AMDUAT_ASL_STORE_ERR_IO; + } + free(index_path); + free(segments_path); + free(blocks_path); + + err = amduat_asl_store_index_fs_read_next_segment_id(fs, + shard_id, + &local_segment_id); + if (err != AMDUAT_ASL_STORE_OK) { + return err; + } + if (local_segment_id == UINT64_MAX) { + return AMDUAT_ASL_STORE_ERR_IO; + } + err = amduat_asl_store_index_fs_write_next_segment_id(fs, + shard_id, + local_segment_id + 1u); + if (err != AMDUAT_ASL_STORE_OK) { + return err; + } + if (!amduat_asl_store_index_fs_make_segment_id(shard_id, + local_segment_id, + &segment_id)) { + return AMDUAT_ASL_STORE_ERR_IO; + } + + if (!amduat_asl_store_index_fs_create_block_temp(fs, + shard_id, + &block_temp_path, + &block_fd)) { + free(block_temp_path); + return AMDUAT_ASL_STORE_ERR_IO; + } + + segment->active = true; + segment->shard_id = shard_id; + segment->segment_id = segment_id; + segment->max_visibility = 0u; + segment->record_count = 0u; + segment->block_size = 0u; + segment->block_fd = block_fd; + segment->block_temp_path = block_temp_path; + return AMDUAT_ASL_STORE_OK; +} + +static amduat_asl_store_error_t amduat_asl_store_index_fs_open_segment_add( + amduat_asl_store_index_fs_t *fs, + amduat_asl_store_index_fs_open_segment_t *segment, + amduat_reference_t ref, + amduat_octets_t artifact_bytes) { + amduat_asl_index_record_t record; + amduat_asl_extent_record_t extent; + size_t new_records; + size_t new_extents; + size_t new_digests; + size_t offset; + size_t written; + + if (fs == NULL || segment == NULL || !segment->active) { + return AMDUAT_ASL_STORE_ERR_IO; + } + if (artifact_bytes.len != 0u && artifact_bytes.data == NULL) { + return AMDUAT_ASL_STORE_ERR_IO; + } + if (artifact_bytes.len > UINT32_MAX || ref.digest.len > UINT16_MAX) { + return AMDUAT_ASL_STORE_ERR_UNSUPPORTED; + } + if (segment->block_size > UINT32_MAX) { + return AMDUAT_ASL_STORE_ERR_UNSUPPORTED; + } + offset = (size_t)segment->block_size; + if (offset > UINT32_MAX || offset + artifact_bytes.len > UINT32_MAX) { + return AMDUAT_ASL_STORE_ERR_UNSUPPORTED; + } + + written = 0u; + while (written < artifact_bytes.len) { + ssize_t rc = write(segment->block_fd, + artifact_bytes.data + written, + artifact_bytes.len - written); + if (rc < 0) { + if (errno == EINTR) { + continue; + } + return AMDUAT_ASL_STORE_ERR_IO; + } + written += (size_t)rc; + } + segment->block_size += artifact_bytes.len; + + new_records = segment->records_len + 1u; + if (new_records > segment->records_cap) { + size_t cap = segment->records_cap == 0u ? 4u : segment->records_cap * 2u; + amduat_asl_index_record_t *next; + if (cap < new_records) { + cap = new_records; + } + next = (amduat_asl_index_record_t *)realloc(segment->records, + cap * sizeof(*next)); + if (next == NULL) { + return AMDUAT_ASL_STORE_ERR_IO; + } + segment->records = next; + segment->records_cap = cap; + } + + new_extents = segment->extents_len + 1u; + if (new_extents > segment->extents_cap) { + size_t cap = segment->extents_cap == 0u ? 4u : segment->extents_cap * 2u; + amduat_asl_extent_record_t *next; + if (cap < new_extents) { + cap = new_extents; + } + next = (amduat_asl_extent_record_t *)realloc(segment->extents, + cap * sizeof(*next)); + if (next == NULL) { + return AMDUAT_ASL_STORE_ERR_IO; + } + segment->extents = next; + segment->extents_cap = cap; + } + + new_digests = segment->digests_len + ref.digest.len; + if (new_digests > segment->digests_cap) { + size_t cap = segment->digests_cap == 0u ? 64u : segment->digests_cap * 2u; + uint8_t *next; + if (cap < new_digests) { + cap = new_digests; + } + next = (uint8_t *)realloc(segment->digests, cap); + if (next == NULL) { + return AMDUAT_ASL_STORE_ERR_IO; + } + segment->digests = next; + segment->digests_cap = cap; + } + + memset(&record, 0, sizeof(record)); + record.hash_id = ref.hash_id; + record.digest_len = (uint16_t)ref.digest.len; + record.extent_count = 1u; + record.total_length = (uint32_t)artifact_bytes.len; + record.domain_id = fs->visibility_policy.segment_domain_id; + record.visibility = fs->visibility_policy.record_visibility; + record.has_cross_domain_source = 0u; + record.cross_domain_source = 0u; + record.flags = 0u; + + extent.block_id = segment->segment_id; + extent.offset = (uint32_t)offset; + extent.length = (uint32_t)artifact_bytes.len; + + memcpy(segment->digests + segment->digests_len, + ref.digest.data, + ref.digest.len); + segment->digests_len += ref.digest.len; + + segment->records[segment->records_len++] = record; + segment->extents[segment->extents_len++] = extent; + segment->record_count += 1u; + if (record.visibility > segment->max_visibility) { + segment->max_visibility = record.visibility; + } + return AMDUAT_ASL_STORE_OK; +} + +static bool amduat_asl_store_index_fs_open_segment_should_seal( + const amduat_asl_store_index_fs_t *fs, + const amduat_asl_store_index_fs_open_segment_t *segment) { + if (fs == NULL || segment == NULL) { + return false; + } + if (fs->segment_policy.max_segment_records != 0u && + segment->record_count >= fs->segment_policy.max_segment_records) { + return true; + } + if (fs->segment_policy.max_segment_bytes != 0u && + segment->block_size >= fs->segment_policy.max_segment_bytes) { + return true; + } + return false; +} + +static amduat_asl_store_error_t amduat_asl_store_index_fs_open_segment_seal( + amduat_asl_store_index_fs_t *fs, + amduat_asl_store_index_fs_open_segment_t *segment, + amduat_asl_index_state_t *out_state) { + char *segment_path = NULL; + char *segments_path = NULL; + char *block_path = NULL; + char *blocks_path = NULL; + amduat_asl_core_index_segment_t encoded; + amduat_octets_t segment_bytes; + uint8_t segment_hash[AMDUAT_ASL_STORE_INDEX_FS_SEGMENT_HASH_LEN]; + uint8_t *seal_payload = NULL; + size_t seal_len; + amduat_asl_store_error_t err; + amduat_asl_store_index_fs_write_status_t status; + uint64_t now_ns; + + if (fs == NULL || segment == NULL) { + return AMDUAT_ASL_STORE_ERR_IO; + } + if (!segment->active || segment->record_count == 0u) { + amduat_asl_store_index_fs_open_segment_reset(segment); + return AMDUAT_ASL_STORE_OK; + } + + if (!amduat_asl_store_index_fs_build_block_path(fs, + segment->shard_id, + segment->segment_id, + &block_path)) { + amduat_asl_store_index_fs_open_segment_reset(segment); + return AMDUAT_ASL_STORE_ERR_IO; + } + if (!amduat_asl_store_index_fs_build_blocks_path(fs, + segment->shard_id, + &blocks_path)) { + free(block_path); + amduat_asl_store_index_fs_open_segment_reset(segment); + return AMDUAT_ASL_STORE_ERR_IO; + } + + if (fsync(segment->block_fd) != 0) { + free(block_path); + free(blocks_path); + amduat_asl_store_index_fs_open_segment_reset(segment); + return AMDUAT_ASL_STORE_ERR_IO; + } + if (close(segment->block_fd) != 0) { + segment->block_fd = -1; + free(block_path); + free(blocks_path); + amduat_asl_store_index_fs_open_segment_reset(segment); + return AMDUAT_ASL_STORE_ERR_IO; + } + segment->block_fd = -1; + + if (rename(segment->block_temp_path, block_path) != 0) { + free(block_path); + free(blocks_path); + amduat_asl_store_index_fs_open_segment_reset(segment); + return AMDUAT_ASL_STORE_ERR_IO; + } + free(segment->block_temp_path); + segment->block_temp_path = NULL; + if (!amduat_asl_store_index_fs_fsync_directory(blocks_path)) { + free(block_path); + free(blocks_path); + amduat_asl_store_index_fs_open_segment_reset(segment); + return AMDUAT_ASL_STORE_ERR_IO; + } + free(blocks_path); + free(block_path); + + memset(&encoded, 0, sizeof(encoded)); + encoded.header.shard_id = segment->shard_id; + encoded.header.snapshot_min = 0u; + encoded.header.snapshot_max = 0u; + encoded.header.segment_domain_id = fs->visibility_policy.segment_domain_id; + encoded.header.segment_visibility = segment->max_visibility; + encoded.footer.seal_snapshot = 0u; + now_ns = amduat_asl_store_index_fs_now_ns(); + encoded.footer.seal_time_ns = now_ns == 0u ? 0u : now_ns; + encoded.records = segment->records; + encoded.record_count = segment->records_len; + encoded.digests = amduat_octets(segment->digests, segment->digests_len); + encoded.extents = segment->extents; + encoded.extent_count = segment->extents_len; + + encoded.bloom = amduat_octets(NULL, 0u); + if (!amduat_asl_store_index_fs_build_bloom(&encoded, &encoded.bloom)) { + amduat_octets_free(&encoded.bloom); + encoded.bloom = amduat_octets(NULL, 0u); + } + + segment_bytes = amduat_octets(NULL, 0u); + if (!amduat_enc_asl_core_index_encode_v1(&encoded, &segment_bytes)) { + amduat_octets_free(&encoded.bloom); + amduat_asl_store_index_fs_open_segment_reset(segment); + return AMDUAT_ASL_STORE_ERR_IO; + } + amduat_octets_free(&encoded.bloom); + + if (!amduat_hash_asl1_digest(AMDUAT_HASH_ASL1_ID_SHA256, + segment_bytes, + segment_hash, + sizeof(segment_hash))) { + amduat_octets_free(&segment_bytes); + amduat_asl_store_index_fs_open_segment_reset(segment); + return AMDUAT_ASL_STORE_ERR_IO; + } + + if (!amduat_asl_store_index_fs_build_segment_path(fs, + segment->shard_id, + segment->segment_id, + &segment_path)) { + amduat_octets_free(&segment_bytes); + amduat_asl_store_index_fs_open_segment_reset(segment); + return AMDUAT_ASL_STORE_ERR_IO; + } + if (!amduat_asl_store_index_fs_build_segments_path(fs, + segment->shard_id, + &segments_path)) { + free(segment_path); + amduat_octets_free(&segment_bytes); + amduat_asl_store_index_fs_open_segment_reset(segment); + return AMDUAT_ASL_STORE_ERR_IO; + } + if (!amduat_asl_store_index_fs_ensure_directory(segments_path)) { + free(segment_path); + free(segments_path); + amduat_octets_free(&segment_bytes); + amduat_asl_store_index_fs_open_segment_reset(segment); + return AMDUAT_ASL_STORE_ERR_IO; + } + + status = amduat_asl_store_index_fs_write_atomic( + segments_path, segment_path, segment_bytes.data, segment_bytes.len); + free(segment_path); + free(segments_path); + amduat_octets_free(&segment_bytes); + if (status != AMDUAT_ASL_STORE_INDEX_FS_WRITE_OK) { + amduat_asl_store_index_fs_open_segment_reset(segment); + return AMDUAT_ASL_STORE_ERR_IO; + } + + if (!amduat_asl_store_index_fs_summary_append(fs, + segment->shard_id, + segment->segment_id, + segment_hash)) { + amduat_asl_store_index_fs_open_segment_reset(segment); + return AMDUAT_ASL_STORE_ERR_IO; + } + + seal_len = 8u + sizeof(segment_hash); + seal_payload = (uint8_t *)malloc(seal_len); + if (seal_payload == NULL) { + amduat_asl_store_index_fs_open_segment_reset(segment); + return AMDUAT_ASL_STORE_ERR_IO; + } + amduat_asl_store_index_fs_log_store_u64_le(seal_payload, + segment->segment_id); + memcpy(seal_payload + 8u, segment_hash, sizeof(segment_hash)); + + err = amduat_asl_store_index_fs_append_log_record( + fs, + AMDUAT_ASL_LOG_RECORD_SEGMENT_SEAL, + seal_payload, + seal_len, + out_state); + free(seal_payload); + if (err != AMDUAT_ASL_STORE_OK) { + amduat_asl_store_index_fs_open_segment_reset(segment); + return err; + } + + amduat_asl_store_index_fs_open_segment_reset(segment); + return AMDUAT_ASL_STORE_OK; +} + +typedef struct { + uint64_t *ids; + size_t len; + size_t cap; +} amduat_asl_store_index_fs_id_set_t; + +static void amduat_asl_store_index_fs_id_set_free( + amduat_asl_store_index_fs_id_set_t *set) { + if (set == NULL) { + return; + } + free(set->ids); + set->ids = NULL; + set->len = 0u; + set->cap = 0u; +} + +static bool amduat_asl_store_index_fs_id_set_contains( + const amduat_asl_store_index_fs_id_set_t *set, + uint64_t id) { + size_t i; + + if (set == NULL) { + return false; + } + for (i = 0u; i < set->len; ++i) { + if (set->ids[i] == id) { + return true; + } + } + return false; +} + +static bool amduat_asl_store_index_fs_id_set_add( + amduat_asl_store_index_fs_id_set_t *set, + uint64_t id) { + uint64_t *next; + size_t cap; + + if (set == NULL) { + return false; + } + if (amduat_asl_store_index_fs_id_set_contains(set, id)) { + return true; + } + if (set->len + 1u > set->cap) { + cap = set->cap == 0u ? 16u : set->cap * 2u; + next = (uint64_t *)realloc(set->ids, cap * sizeof(*next)); + if (next == NULL) { + return false; + } + set->ids = next; + set->cap = cap; + } + set->ids[set->len++] = id; + return true; +} + +static bool amduat_asl_store_index_fs_parse_segment_filename( + const char *name, + uint64_t *out_id) { + const char prefix[] = "segment-"; + const char suffix[] = ".asl"; + size_t name_len; + size_t prefix_len; + size_t suffix_len; + size_t hex_len; + char *hex; + char *endptr; + unsigned long long parsed; + + if (name == NULL || out_id == NULL) { + return false; + } + name_len = strlen(name); + prefix_len = sizeof(prefix) - 1u; + suffix_len = sizeof(suffix) - 1u; + if (name_len <= prefix_len + suffix_len) { + return false; + } + if (strncmp(name, prefix, prefix_len) != 0) { + return false; + } + if (strcmp(name + name_len - suffix_len, suffix) != 0) { + return false; + } + + hex_len = name_len - prefix_len - suffix_len; + hex = (char *)malloc(hex_len + 1u); + if (hex == NULL) { + return false; + } + memcpy(hex, name + prefix_len, hex_len); + hex[hex_len] = '\0'; + errno = 0; + parsed = strtoull(hex, &endptr, 16); + free(hex); + if (errno != 0 || endptr == NULL || *endptr != '\0') { + return false; + } + *out_id = (uint64_t)parsed; + return true; +} + +static bool amduat_asl_store_index_fs_parse_block_filename( + const char *name, + uint64_t *out_id) { + const char prefix[] = "block-"; + const char suffix[] = ".asl"; + size_t name_len; + size_t prefix_len; + size_t suffix_len; + size_t hex_len; + char *hex; + char *endptr; + unsigned long long parsed; + + if (name == NULL || out_id == NULL) { + return false; + } + name_len = strlen(name); + prefix_len = sizeof(prefix) - 1u; + suffix_len = sizeof(suffix) - 1u; + if (name_len <= prefix_len + suffix_len) { + return false; + } + if (strncmp(name, prefix, prefix_len) != 0) { + return false; + } + if (strcmp(name + name_len - suffix_len, suffix) != 0) { + return false; + } + + hex_len = name_len - prefix_len - suffix_len; + hex = (char *)malloc(hex_len + 1u); + if (hex == NULL) { + return false; + } + memcpy(hex, name + prefix_len, hex_len); + hex[hex_len] = '\0'; + errno = 0; + parsed = strtoull(hex, &endptr, 16); + free(hex); + if (errno != 0 || endptr == NULL || *endptr != '\0') { + return false; + } + *out_id = (uint64_t)parsed; + return true; +} + static bool amduat_asl_store_index_fs_summary_append( amduat_asl_store_index_fs_t *fs, uint16_t shard_id, @@ -2951,6 +3869,152 @@ static amduat_asl_store_error_t amduat_asl_store_index_fs_get_indexed_impl( return err; } +static amduat_asl_store_error_t amduat_asl_store_index_fs_tombstone_impl( + void *ctx, + amduat_reference_t ref, + uint32_t scope, + uint32_t reason_code, + amduat_asl_index_state_t *out_state) { + amduat_asl_store_index_fs_t *fs; + const amduat_hash_asl1_desc_t *hash_desc; + uint8_t *payload = NULL; + size_t payload_len; + size_t ref_len; + amduat_asl_store_error_t err; + + if (ctx == NULL) { + return AMDUAT_ASL_STORE_ERR_IO; + } + fs = (amduat_asl_store_index_fs_t *)ctx; + amduat_asl_store_index_fs_maybe_snapshot_idle(fs); + + err = amduat_asl_store_index_fs_validate_config(ctx, fs->config); + if (err != AMDUAT_ASL_STORE_OK) { + return err; + } + if (ref.hash_id != fs->config.hash_id) { + return AMDUAT_ASL_STORE_ERR_UNSUPPORTED; + } + hash_desc = amduat_hash_asl1_desc_lookup(ref.hash_id); + if (hash_desc == NULL || hash_desc->digest_len == 0) { + return AMDUAT_ASL_STORE_ERR_UNSUPPORTED; + } + if (ref.digest.len != hash_desc->digest_len || + (ref.digest.len != 0u && ref.digest.data == NULL)) { + return AMDUAT_ASL_STORE_ERR_INTEGRITY; + } + + payload_len = 16u + ref.digest.len; + payload = (uint8_t *)malloc(payload_len); + if (payload == NULL) { + return AMDUAT_ASL_STORE_ERR_IO; + } + if (!amduat_asl_store_index_fs_encode_artifact_ref(ref, + payload, + payload_len, + &ref_len)) { + free(payload); + return AMDUAT_ASL_STORE_ERR_INTEGRITY; + } + amduat_asl_store_index_fs_log_store_u32_le(payload + ref_len, scope); + amduat_asl_store_index_fs_log_store_u32_le(payload + ref_len + 4u, + reason_code); + + err = amduat_asl_store_index_fs_append_log_record( + fs, + AMDUAT_ASL_LOG_RECORD_TOMBSTONE, + payload, + payload_len, + out_state); + free(payload); + return err; +} + +static amduat_asl_store_error_t amduat_asl_store_index_fs_tombstone_lift_impl( + void *ctx, + amduat_reference_t ref, + amduat_asl_log_position_t tombstone_logseq, + amduat_asl_index_state_t *out_state) { + amduat_asl_store_index_fs_t *fs; + const amduat_hash_asl1_desc_t *hash_desc; + uint8_t *payload = NULL; + size_t payload_len; + size_t ref_len; + amduat_asl_store_error_t err; + + if (ctx == NULL) { + return AMDUAT_ASL_STORE_ERR_IO; + } + if (tombstone_logseq == 0u) { + return AMDUAT_ASL_STORE_ERR_INTEGRITY; + } + fs = (amduat_asl_store_index_fs_t *)ctx; + amduat_asl_store_index_fs_maybe_snapshot_idle(fs); + + err = amduat_asl_store_index_fs_validate_config(ctx, fs->config); + if (err != AMDUAT_ASL_STORE_OK) { + return err; + } + if (ref.hash_id != fs->config.hash_id) { + return AMDUAT_ASL_STORE_ERR_UNSUPPORTED; + } + hash_desc = amduat_hash_asl1_desc_lookup(ref.hash_id); + if (hash_desc == NULL || hash_desc->digest_len == 0) { + return AMDUAT_ASL_STORE_ERR_UNSUPPORTED; + } + if (ref.digest.len != hash_desc->digest_len || + (ref.digest.len != 0u && ref.digest.data == NULL)) { + return AMDUAT_ASL_STORE_ERR_INTEGRITY; + } + + payload_len = 16u + ref.digest.len; + payload = (uint8_t *)malloc(payload_len); + if (payload == NULL) { + return AMDUAT_ASL_STORE_ERR_IO; + } + if (!amduat_asl_store_index_fs_encode_artifact_ref(ref, + payload, + payload_len, + &ref_len)) { + free(payload); + return AMDUAT_ASL_STORE_ERR_INTEGRITY; + } + amduat_asl_store_index_fs_log_store_u64_le(payload + ref_len, + tombstone_logseq); + + err = amduat_asl_store_index_fs_append_log_record( + fs, + AMDUAT_ASL_LOG_RECORD_TOMBSTONE_LIFT, + payload, + payload_len, + out_state); + free(payload); + return err; +} + +static amduat_asl_store_error_t amduat_asl_store_index_fs_log_scan_impl( + void *ctx, + amduat_asl_log_record_t **out_records, + size_t *out_count) { + amduat_asl_store_index_fs_t *fs; + char *log_path; + amduat_asl_store_error_t err; + + if (ctx == NULL || out_records == NULL || out_count == NULL) { + return AMDUAT_ASL_STORE_ERR_IO; + } + fs = (amduat_asl_store_index_fs_t *)ctx; + amduat_asl_store_index_fs_maybe_snapshot_idle(fs); + + if (!amduat_asl_store_index_fs_layout_build_log_path(fs->root_path, + &log_path)) { + return AMDUAT_ASL_STORE_ERR_IO; + } + err = amduat_asl_store_index_fs_load_log(log_path, out_records, out_count); + free(log_path); + return err; +} + static amduat_asl_store_error_t amduat_asl_store_index_fs_put_indexed_impl( void *ctx, amduat_artifact_t artifact, @@ -3035,6 +4099,83 @@ static amduat_asl_store_error_t amduat_asl_store_index_fs_put_indexed_impl( return err; } + if (fs->segment_policy.small_artifact_threshold != 0u && + artifact_bytes.len <= fs->segment_policy.small_artifact_threshold && + (fs->segment_policy.max_segment_records > 1u || + fs->segment_policy.max_segment_bytes != 0u)) { + amduat_asl_store_index_fs_open_segment_t *open_segment; + + open_segment = amduat_asl_store_index_fs_open_segment_for(fs, shard_id); + if (open_segment == NULL) { + amduat_reference_free(&derived_ref); + amduat_octets_free(&artifact_bytes); + return AMDUAT_ASL_STORE_ERR_IO; + } + + if (open_segment->active && + amduat_asl_store_index_fs_open_segment_contains(open_segment, + derived_ref)) { + *out_ref = derived_ref; + amduat_asl_store_index_fs_fill_index_state(out_state, + current_state.snapshot_id, + current_state.log_position); + amduat_octets_free(&artifact_bytes); + return AMDUAT_ASL_STORE_OK; + } + + err = amduat_asl_store_index_fs_open_segment_init(fs, + open_segment, + shard_id); + if (err != AMDUAT_ASL_STORE_OK) { + amduat_reference_free(&derived_ref); + amduat_octets_free(&artifact_bytes); + return err; + } + + err = amduat_asl_store_index_fs_open_segment_add(fs, + open_segment, + derived_ref, + artifact_bytes); + if (err != AMDUAT_ASL_STORE_OK) { + amduat_asl_store_index_fs_open_segment_reset(open_segment); + amduat_reference_free(&derived_ref); + amduat_octets_free(&artifact_bytes); + return err; + } + + amduat_asl_store_index_fs_update_ingest_state(fs, artifact_bytes.len); + + if (amduat_asl_store_index_fs_open_segment_should_seal(fs, + open_segment)) { + err = amduat_asl_store_index_fs_open_segment_seal(fs, + open_segment, + out_state); + if (err != AMDUAT_ASL_STORE_OK) { + amduat_reference_free(&derived_ref); + amduat_octets_free(&artifact_bytes); + return err; + } + } else if (!fs->segment_policy.allow_deferred_visibility) { + err = amduat_asl_store_index_fs_open_segment_seal(fs, + open_segment, + out_state); + if (err != AMDUAT_ASL_STORE_OK) { + amduat_reference_free(&derived_ref); + amduat_octets_free(&artifact_bytes); + return err; + } + } else { + amduat_asl_store_index_fs_fill_index_state(out_state, + current_state.snapshot_id, + current_state.log_position); + } + + amduat_octets_free(&artifact_bytes); + *out_ref = derived_ref; + amduat_asl_store_index_fs_maybe_snapshot_size(fs); + return AMDUAT_ASL_STORE_OK; + } + index_path = NULL; segments_path = NULL; blocks_path = NULL; @@ -3140,13 +4281,13 @@ static amduat_asl_store_error_t amduat_asl_store_index_fs_put_indexed_impl( memset(&record, 0, sizeof(record)); memset(&extent, 0, sizeof(extent)); - segment.header.shard_id = 0u; + segment.header.shard_id = shard_id; segment.header.snapshot_min = 0u; segment.header.snapshot_max = 0u; - segment.header.segment_domain_id = 0u; - segment.header.segment_visibility = 0u; + segment.header.segment_domain_id = fs->visibility_policy.segment_domain_id; + segment.header.segment_visibility = fs->visibility_policy.record_visibility; segment.footer.seal_snapshot = 0u; - segment.footer.seal_time_ns = 0u; + segment.footer.seal_time_ns = amduat_asl_store_index_fs_now_ns(); record.hash_id = fs->config.hash_id; record.digest_len = (uint16_t)hash_desc->digest_len; @@ -3154,8 +4295,8 @@ static amduat_asl_store_error_t amduat_asl_store_index_fs_put_indexed_impl( record.extents_offset = 0u; record.extent_count = 1u; record.total_length = (uint32_t)artifact_bytes.len; - record.domain_id = 0u; - record.visibility = 0u; + record.domain_id = fs->visibility_policy.segment_domain_id; + record.visibility = fs->visibility_policy.record_visibility; record.has_cross_domain_source = 0u; record.cross_domain_source = 0u; record.flags = 0u; @@ -3228,10 +4369,17 @@ static amduat_asl_store_error_t amduat_asl_store_index_fs_put_indexed_impl( return AMDUAT_ASL_STORE_ERR_IO; } - (void)amduat_asl_store_index_fs_summary_append(fs, - shard_id, - segment_id, - segment_hash); + if (!amduat_asl_store_index_fs_summary_append(fs, + shard_id, + segment_id, + segment_hash)) { + amduat_reference_free(&derived_ref); + amduat_octets_free(&artifact_bytes); + free(index_path); + free(segments_path); + free(blocks_path); + return AMDUAT_ASL_STORE_ERR_IO; + } if (!amduat_asl_store_index_fs_layout_build_log_path(fs->root_path, &log_path)) { @@ -3412,6 +4560,11 @@ amduat_asl_store_error_t amduat_asl_store_index_fs_snapshot_create( return AMDUAT_ASL_STORE_ERR_UNSUPPORTED; } + err = amduat_asl_store_index_fs_flush(fs, NULL); + if (err != AMDUAT_ASL_STORE_OK) { + return err; + } + err = amduat_asl_store_index_fs_validate_config(fs, fs->config); if (err != AMDUAT_ASL_STORE_OK) { return err; @@ -3564,6 +4717,358 @@ amduat_asl_store_error_t amduat_asl_store_index_fs_snapshot_create( return AMDUAT_ASL_STORE_OK; } +static amduat_asl_store_error_t amduat_asl_store_index_fs_build_replay_state( + amduat_asl_store_index_fs_t *fs, + amduat_asl_index_state_t state, + amduat_asl_replay_state_t *out_state) { + char *log_path; + amduat_asl_log_record_t *records = NULL; + size_t record_count = 0u; + amduat_asl_store_error_t err; + amduat_asl_log_position_t anchor_logseq = 0u; + size_t replay_start = 0u; + + if (fs == NULL || out_state == NULL) { + return AMDUAT_ASL_STORE_ERR_IO; + } + if (!amduat_asl_store_index_fs_layout_build_log_path(fs->root_path, + &log_path)) { + return AMDUAT_ASL_STORE_ERR_IO; + } + + err = amduat_asl_store_index_fs_load_log(log_path, &records, &record_count); + free(log_path); + if (err != AMDUAT_ASL_STORE_OK) { + amduat_asl_store_index_fs_log_free(records, record_count); + return err; + } + + if (state.snapshot_id != 0u) { + if (!amduat_asl_store_index_fs_load_snapshot_replay(fs->root_path, + state.snapshot_id, + NULL, + out_state, + &anchor_logseq)) { + amduat_asl_store_index_fs_log_free(records, record_count); + return AMDUAT_ASL_STORE_ERR_INTEGRITY; + } + if (state.log_position < anchor_logseq || + out_state->state.log_position != anchor_logseq) { + amduat_asl_replay_free(out_state); + amduat_asl_store_index_fs_log_free(records, record_count); + return AMDUAT_ASL_STORE_ERR_INTEGRITY; + } + replay_start = amduat_asl_store_index_fs_find_log_start(records, + record_count, + anchor_logseq); + } else { + if (!amduat_asl_replay_init(out_state)) { + amduat_asl_store_index_fs_log_free(records, record_count); + return AMDUAT_ASL_STORE_ERR_IO; + } + } + + if (!amduat_asl_replay_apply_log(records + replay_start, + record_count - replay_start, + state.log_position, + out_state)) { + amduat_asl_replay_free(out_state); + amduat_asl_store_index_fs_log_free(records, record_count); + return AMDUAT_ASL_STORE_ERR_INTEGRITY; + } + + amduat_asl_store_index_fs_log_free(records, record_count); + return AMDUAT_ASL_STORE_OK; +} + +amduat_asl_store_error_t amduat_asl_store_index_fs_flush( + amduat_asl_store_index_fs_t *fs, + amduat_asl_index_state_t *out_state) { + amduat_asl_store_index_fs_open_segment_t *segments; + bool sealed_any = false; + amduat_asl_store_error_t err; + + if (fs == NULL) { + return AMDUAT_ASL_STORE_ERR_IO; + } + + segments = (amduat_asl_store_index_fs_open_segment_t *)fs->open_segments; + if (segments != NULL) { + for (size_t i = 0u; i < fs->shard_count; ++i) { + if (!segments[i].active) { + continue; + } + err = amduat_asl_store_index_fs_open_segment_seal(fs, + &segments[i], + out_state); + if (err != AMDUAT_ASL_STORE_OK) { + return err; + } + sealed_any = true; + } + } + + if (out_state != NULL && !sealed_any) { + if (!amduat_asl_store_index_fs_current_state_impl(fs, out_state)) { + return AMDUAT_ASL_STORE_ERR_IO; + } + } + return AMDUAT_ASL_STORE_OK; +} + +amduat_asl_store_error_t amduat_asl_store_index_fs_gc( + amduat_asl_store_index_fs_t *fs, + const amduat_asl_index_state_t *state) { + amduat_asl_index_state_t target_state; + amduat_asl_index_state_t current_state; + amduat_asl_replay_state_t replay_state; + amduat_asl_store_index_fs_id_set_t keep_segments; + amduat_asl_store_index_fs_id_set_t keep_blocks; + char *snapshots_path = NULL; + DIR *snap_dir = NULL; + struct dirent *entry; + amduat_asl_store_error_t err; + size_t i; + + if (fs == NULL) { + return AMDUAT_ASL_STORE_ERR_IO; + } + + err = amduat_asl_store_index_fs_flush(fs, NULL); + if (err != AMDUAT_ASL_STORE_OK) { + return err; + } + + if (state != NULL) { + target_state = *state; + } else { + if (!amduat_asl_store_index_fs_current_state_impl(fs, &target_state)) { + return AMDUAT_ASL_STORE_ERR_IO; + } + } + + if (!amduat_asl_store_index_fs_current_state_impl(fs, ¤t_state)) { + return AMDUAT_ASL_STORE_ERR_IO; + } + if (state != NULL && + (target_state.snapshot_id != current_state.snapshot_id || + target_state.log_position != current_state.log_position)) { + return AMDUAT_ASL_STORE_ERR_INTEGRITY; + } + + memset(&keep_segments, 0, sizeof(keep_segments)); + memset(&keep_blocks, 0, sizeof(keep_blocks)); + + if (amduat_asl_store_index_fs_layout_build_snapshots_path(fs->root_path, + &snapshots_path)) { + snap_dir = opendir(snapshots_path); + } + + if (snap_dir != NULL) { + while ((entry = readdir(snap_dir)) != NULL) { + amduat_asl_snapshot_id_t snapshot_id; + amduat_asl_snapshot_manifest_t manifest; + uint8_t manifest_hash[32]; + + if (!amduat_asl_store_index_fs_parse_snapshot_filename(entry->d_name, + &snapshot_id)) { + continue; + } + if (!amduat_asl_store_index_fs_load_snapshot_manifest( + fs->root_path, snapshot_id, &manifest, manifest_hash)) { + closedir(snap_dir); + free(snapshots_path); + amduat_asl_store_index_fs_id_set_free(&keep_segments); + amduat_asl_store_index_fs_id_set_free(&keep_blocks); + return AMDUAT_ASL_STORE_ERR_INTEGRITY; + } + for (i = 0u; i < manifest.segments_len; ++i) { + if (!amduat_asl_store_index_fs_id_set_add( + &keep_segments, manifest.segments[i].segment_id)) { + amduat_asl_snapshot_manifest_free(&manifest); + closedir(snap_dir); + free(snapshots_path); + amduat_asl_store_index_fs_id_set_free(&keep_segments); + amduat_asl_store_index_fs_id_set_free(&keep_blocks); + return AMDUAT_ASL_STORE_ERR_IO; + } + } + amduat_asl_snapshot_manifest_free(&manifest); + } + closedir(snap_dir); + } + free(snapshots_path); + + err = amduat_asl_store_index_fs_build_replay_state(fs, + target_state, + &replay_state); + if (err != AMDUAT_ASL_STORE_OK) { + amduat_asl_store_index_fs_id_set_free(&keep_segments); + amduat_asl_store_index_fs_id_set_free(&keep_blocks); + return err; + } + + for (i = 0u; i < replay_state.segments_len; ++i) { + if (!amduat_asl_store_index_fs_id_set_add( + &keep_segments, replay_state.segments[i].segment_id)) { + amduat_asl_replay_free(&replay_state); + amduat_asl_store_index_fs_id_set_free(&keep_segments); + amduat_asl_store_index_fs_id_set_free(&keep_blocks); + return AMDUAT_ASL_STORE_ERR_IO; + } + } + + for (i = 0u; i < keep_segments.len; ++i) { + uint64_t segment_id = keep_segments.ids[i]; + uint16_t shard_id = amduat_asl_store_index_fs_segment_shard(segment_id); + char *segment_path = NULL; + amduat_asl_core_index_segment_t segment; + uint8_t segment_hash[AMDUAT_ASL_STORE_INDEX_FS_SEGMENT_HASH_LEN]; + + if (fs->shard_count <= 1u) { + shard_id = 0u; + } + if (!amduat_asl_store_index_fs_build_segment_path(fs, + shard_id, + segment_id, + &segment_path)) { + amduat_asl_replay_free(&replay_state); + amduat_asl_store_index_fs_id_set_free(&keep_segments); + amduat_asl_store_index_fs_id_set_free(&keep_blocks); + return AMDUAT_ASL_STORE_ERR_IO; + } + + err = amduat_asl_store_index_fs_load_segment(fs, + shard_id, + segment_id, + segment_path, + &segment, + segment_hash); + free(segment_path); + if (err != AMDUAT_ASL_STORE_OK) { + amduat_asl_replay_free(&replay_state); + amduat_asl_store_index_fs_id_set_free(&keep_segments); + amduat_asl_store_index_fs_id_set_free(&keep_blocks); + return err; + } + for (size_t j = 0u; j < segment.extent_count; ++j) { + if (!amduat_asl_store_index_fs_id_set_add( + &keep_blocks, segment.extents[j].block_id)) { + amduat_enc_asl_core_index_free(&segment); + amduat_asl_replay_free(&replay_state); + amduat_asl_store_index_fs_id_set_free(&keep_segments); + amduat_asl_store_index_fs_id_set_free(&keep_blocks); + return AMDUAT_ASL_STORE_ERR_IO; + } + } + amduat_enc_asl_core_index_free(&segment); + } + + amduat_asl_replay_free(&replay_state); + + for (uint16_t shard_id = 0u; shard_id < fs->shard_count; ++shard_id) { + char *segments_path = NULL; + DIR *seg_dir; + + if (!amduat_asl_store_index_fs_build_segments_path(fs, + shard_id, + &segments_path)) { + amduat_asl_store_index_fs_id_set_free(&keep_segments); + amduat_asl_store_index_fs_id_set_free(&keep_blocks); + return AMDUAT_ASL_STORE_ERR_IO; + } + seg_dir = opendir(segments_path); + if (seg_dir != NULL) { + while ((entry = readdir(seg_dir)) != NULL) { + uint64_t segment_id; + char *path = NULL; + + if (!amduat_asl_store_index_fs_parse_segment_filename(entry->d_name, + &segment_id)) { + continue; + } + if (amduat_asl_store_index_fs_id_set_contains(&keep_segments, + segment_id)) { + continue; + } + if (!amduat_asl_store_index_fs_join_path(segments_path, + entry->d_name, + &path)) { + closedir(seg_dir); + free(segments_path); + amduat_asl_store_index_fs_id_set_free(&keep_segments); + amduat_asl_store_index_fs_id_set_free(&keep_blocks); + return AMDUAT_ASL_STORE_ERR_IO; + } + if (unlink(path) != 0 && errno != ENOENT) { + free(path); + closedir(seg_dir); + free(segments_path); + amduat_asl_store_index_fs_id_set_free(&keep_segments); + amduat_asl_store_index_fs_id_set_free(&keep_blocks); + return AMDUAT_ASL_STORE_ERR_IO; + } + free(path); + } + closedir(seg_dir); + } + free(segments_path); + } + + for (uint16_t shard_id = 0u; shard_id < fs->shard_count; ++shard_id) { + char *blocks_path = NULL; + DIR *blk_dir; + + if (!amduat_asl_store_index_fs_build_blocks_path(fs, + shard_id, + &blocks_path)) { + amduat_asl_store_index_fs_id_set_free(&keep_segments); + amduat_asl_store_index_fs_id_set_free(&keep_blocks); + return AMDUAT_ASL_STORE_ERR_IO; + } + blk_dir = opendir(blocks_path); + if (blk_dir != NULL) { + while ((entry = readdir(blk_dir)) != NULL) { + uint64_t block_id; + char *path = NULL; + + if (!amduat_asl_store_index_fs_parse_block_filename(entry->d_name, + &block_id)) { + continue; + } + if (amduat_asl_store_index_fs_id_set_contains(&keep_blocks, + block_id)) { + continue; + } + if (!amduat_asl_store_index_fs_join_path(blocks_path, + entry->d_name, + &path)) { + closedir(blk_dir); + free(blocks_path); + amduat_asl_store_index_fs_id_set_free(&keep_segments); + amduat_asl_store_index_fs_id_set_free(&keep_blocks); + return AMDUAT_ASL_STORE_ERR_IO; + } + if (unlink(path) != 0 && errno != ENOENT) { + free(path); + closedir(blk_dir); + free(blocks_path); + amduat_asl_store_index_fs_id_set_free(&keep_segments); + amduat_asl_store_index_fs_id_set_free(&keep_blocks); + return AMDUAT_ASL_STORE_ERR_IO; + } + free(path); + } + closedir(blk_dir); + } + free(blocks_path); + } + + amduat_asl_store_index_fs_id_set_free(&keep_segments); + amduat_asl_store_index_fs_id_set_free(&keep_blocks); + return AMDUAT_ASL_STORE_OK; +} + bool amduat_asl_store_index_fs_init(amduat_asl_store_index_fs_t *fs, amduat_asl_store_config_t config, const char *root_path) { @@ -3584,11 +5089,19 @@ bool amduat_asl_store_index_fs_init(amduat_asl_store_index_fs_t *fs, AMDUAT_ASL_STORE_INDEX_FS_DEFAULT_PENDING_BYTES; fs->snapshot_policy.idle_time_ns = AMDUAT_ASL_STORE_INDEX_FS_DEFAULT_IDLE_NS; + fs->segment_policy.max_segment_records = + AMDUAT_ASL_STORE_INDEX_FS_DEFAULT_SEGMENT_RECORDS; + fs->segment_policy.max_segment_bytes = 0u; + fs->segment_policy.small_artifact_threshold = 0u; + fs->segment_policy.allow_deferred_visibility = false; + fs->visibility_policy.segment_domain_id = 0u; + fs->visibility_policy.record_visibility = 0u; fs->shard_count = 1u; fs->pending_snapshot_bytes = 0u; fs->last_ingest_time_ns = 0u; fs->next_snapshot_id = 0u; fs->snapshot_state_initialized = false; + fs->open_segments = NULL; return true; } @@ -3601,6 +5114,27 @@ void amduat_asl_store_index_fs_set_snapshot_policy( fs->snapshot_policy = policy; } +void amduat_asl_store_index_fs_set_segment_policy( + amduat_asl_store_index_fs_t *fs, + amduat_asl_store_index_fs_segment_policy_t policy) { + if (fs == NULL) { + return; + } + fs->segment_policy = policy; +} + +void amduat_asl_store_index_fs_set_visibility_policy( + amduat_asl_store_index_fs_t *fs, + amduat_asl_store_index_fs_visibility_policy_t policy) { + if (fs == NULL) { + return; + } + if (policy.record_visibility > 1u) { + policy.record_visibility = 0u; + } + fs->visibility_policy = policy; +} + void amduat_asl_store_index_fs_set_shard_count( amduat_asl_store_index_fs_t *fs, uint16_t shard_count) { @@ -3611,6 +5145,7 @@ void amduat_asl_store_index_fs_set_shard_count( shard_count = 1u; } fs->shard_count = shard_count; + amduat_asl_store_index_fs_open_segments_free(fs); } amduat_asl_store_ops_t amduat_asl_store_index_fs_ops(void) { @@ -3621,6 +5156,9 @@ amduat_asl_store_ops_t amduat_asl_store_index_fs_ops(void) { ops.get = amduat_asl_store_index_fs_get_impl; ops.put_indexed = amduat_asl_store_index_fs_put_indexed_impl; ops.get_indexed = amduat_asl_store_index_fs_get_indexed_impl; + ops.tombstone = amduat_asl_store_index_fs_tombstone_impl; + ops.tombstone_lift = amduat_asl_store_index_fs_tombstone_lift_impl; + ops.log_scan = amduat_asl_store_index_fs_log_scan_impl; ops.current_state = amduat_asl_store_index_fs_current_state_impl; ops.validate_config = amduat_asl_store_index_fs_validate_config; return ops; diff --git a/src/near_core/asl/store.c b/src/near_core/asl/store.c index 88868e0..aaaf837 100644 --- a/src/near_core/asl/store.c +++ b/src/near_core/asl/store.c @@ -106,6 +106,66 @@ amduat_asl_store_error_t amduat_asl_store_get_indexed( return store->ops.get_indexed(store->ctx, ref, state, out_artifact); } +amduat_asl_store_error_t amduat_asl_store_tombstone( + amduat_asl_store_t *store, + amduat_reference_t ref, + uint32_t scope, + uint32_t reason_code, + amduat_asl_index_state_t *out_state) { + amduat_asl_store_error_t cfg_err; + + if (store == NULL || store->ops.tombstone == NULL) { + return AMDUAT_ASL_STORE_ERR_UNSUPPORTED; + } + if (ref.hash_id != store->config.hash_id) { + return AMDUAT_ASL_STORE_ERR_UNSUPPORTED; + } + cfg_err = amduat_asl_store_validate_config(store); + if (cfg_err != AMDUAT_ASL_STORE_OK) { + return cfg_err; + } + return store->ops.tombstone(store->ctx, ref, scope, reason_code, out_state); +} + +amduat_asl_store_error_t amduat_asl_store_tombstone_lift( + amduat_asl_store_t *store, + amduat_reference_t ref, + amduat_asl_log_position_t tombstone_logseq, + amduat_asl_index_state_t *out_state) { + amduat_asl_store_error_t cfg_err; + + if (store == NULL || store->ops.tombstone_lift == NULL) { + return AMDUAT_ASL_STORE_ERR_UNSUPPORTED; + } + if (ref.hash_id != store->config.hash_id) { + return AMDUAT_ASL_STORE_ERR_UNSUPPORTED; + } + cfg_err = amduat_asl_store_validate_config(store); + if (cfg_err != AMDUAT_ASL_STORE_OK) { + return cfg_err; + } + return store->ops.tombstone_lift(store->ctx, + ref, + tombstone_logseq, + out_state); +} + +amduat_asl_store_error_t amduat_asl_log_scan( + amduat_asl_store_t *store, + amduat_asl_log_record_t **out_records, + size_t *out_count) { + amduat_asl_store_error_t cfg_err; + + if (store == NULL || store->ops.log_scan == NULL) { + return AMDUAT_ASL_STORE_ERR_UNSUPPORTED; + } + cfg_err = amduat_asl_store_validate_config(store); + if (cfg_err != AMDUAT_ASL_STORE_OK) { + return cfg_err; + } + return store->ops.log_scan(store->ctx, out_records, out_count); +} + bool amduat_asl_index_current_state(amduat_asl_store_t *store, amduat_asl_index_state_t *out_state) { amduat_asl_store_error_t cfg_err; diff --git a/tests/asl/test_asl_store_index_fs.c b/tests/asl/test_asl_store_index_fs.c index c6c5614..48d4d74 100644 --- a/tests/asl/test_asl_store_index_fs.c +++ b/tests/asl/test_asl_store_index_fs.c @@ -5,6 +5,7 @@ #include "amduat/asl/store.h" #include "amduat/enc/asl1_core.h" #include "amduat/enc/asl_core_index.h" +#include "amduat/enc/asl_log.h" #include "amduat/format/ref.h" #include "amduat/hash/asl1.h" @@ -250,6 +251,13 @@ static uint64_t fnv1a64(const uint8_t *data, size_t len, uint64_t seed) { return hash; } +static uint64_t load_u64_le(const uint8_t *data) { + return (uint64_t)data[0] | ((uint64_t)data[1] << 8) | + ((uint64_t)data[2] << 16) | ((uint64_t)data[3] << 24) | + ((uint64_t)data[4] << 32) | ((uint64_t)data[5] << 40) | + ((uint64_t)data[6] << 48) | ((uint64_t)data[7] << 56); +} + static uint16_t ref_shard(amduat_reference_t ref, uint16_t shard_count) { uint64_t hash; uint8_t hash_id_bytes[2]; @@ -739,6 +747,343 @@ cleanup: return exit_code; } +static int test_segment_batch_packing(void) { + amduat_asl_store_config_t config; + amduat_asl_store_index_fs_t fs; + amduat_asl_store_t store; + amduat_asl_store_index_fs_segment_policy_t segment_policy; + amduat_asl_store_index_fs_visibility_policy_t visibility_policy; + amduat_asl_store_error_t err; + amduat_asl_index_state_t state_a; + amduat_asl_index_state_t state_b; + amduat_artifact_t artifact_a; + amduat_artifact_t artifact_b; + amduat_artifact_t loaded; + amduat_reference_t ref_a; + amduat_reference_t ref_b; + uint8_t payload_a[3] = {0x01, 0x02, 0x03}; + uint8_t payload_b[4] = {0x10, 0x11, 0x12, 0x13}; + char *root; + amduat_asl_log_record_t *records = NULL; + size_t record_count = 0u; + uint64_t segment_id = 0u; + char *segment_path = NULL; + uint8_t *segment_bytes = NULL; + size_t segment_len = 0u; + amduat_asl_core_index_segment_t segment; + int exit_code = 1; + + root = make_temp_root(); + if (root == NULL) { + fprintf(stderr, "temp root failed\n"); + return 1; + } + + memset(&config, 0, sizeof(config)); + config.encoding_profile_id = AMDUAT_ENC_ASL1_CORE_V1; + config.hash_id = AMDUAT_HASH_ASL1_ID_SHA256; + if (!amduat_asl_store_index_fs_init(&fs, config, root)) { + fprintf(stderr, "index fs init failed\n"); + goto cleanup; + } + segment_policy.max_segment_records = 2u; + segment_policy.max_segment_bytes = 0u; + segment_policy.small_artifact_threshold = 1024u; + segment_policy.allow_deferred_visibility = true; + amduat_asl_store_index_fs_set_segment_policy(&fs, segment_policy); + visibility_policy.segment_domain_id = 7u; + visibility_policy.record_visibility = 1u; + amduat_asl_store_index_fs_set_visibility_policy(&fs, visibility_policy); + amduat_asl_store_init(&store, config, amduat_asl_store_index_fs_ops(), &fs); + + artifact_a = amduat_artifact(amduat_octets(payload_a, sizeof(payload_a))); + artifact_b = amduat_artifact(amduat_octets(payload_b, sizeof(payload_b))); + ref_a = amduat_reference(0u, amduat_octets(NULL, 0u)); + ref_b = amduat_reference(0u, amduat_octets(NULL, 0u)); + + err = amduat_asl_store_put_indexed(&store, artifact_a, &ref_a, &state_a); + if (err != AMDUAT_ASL_STORE_OK) { + fprintf(stderr, "put_indexed a failed: %d\n", err); + goto cleanup; + } + + loaded = amduat_artifact(amduat_octets(NULL, 0u)); + err = amduat_asl_store_get_indexed(&store, ref_a, state_a, &loaded); + if (err != AMDUAT_ASL_STORE_ERR_NOT_FOUND) { + fprintf(stderr, "expected a to be pending before seal: %d\n", err); + amduat_artifact_free(&loaded); + goto cleanup; + } + amduat_artifact_free(&loaded); + + err = amduat_asl_store_put_indexed(&store, artifact_b, &ref_b, &state_b); + if (err != AMDUAT_ASL_STORE_OK) { + fprintf(stderr, "put_indexed b failed: %d\n", err); + goto cleanup; + } + + loaded = amduat_artifact(amduat_octets(NULL, 0u)); + err = amduat_asl_store_get_indexed(&store, ref_a, state_b, &loaded); + if (err != AMDUAT_ASL_STORE_OK) { + fprintf(stderr, "get_indexed a after seal failed: %d\n", err); + goto cleanup; + } + amduat_artifact_free(&loaded); + + loaded = amduat_artifact(amduat_octets(NULL, 0u)); + err = amduat_asl_store_get_indexed(&store, ref_b, state_b, &loaded); + if (err != AMDUAT_ASL_STORE_OK) { + fprintf(stderr, "get_indexed b after seal failed: %d\n", err); + goto cleanup; + } + amduat_artifact_free(&loaded); + + err = amduat_asl_log_scan(&store, &records, &record_count); + if (err != AMDUAT_ASL_STORE_OK || record_count != 1u || + records[0].record_type != AMDUAT_ASL_LOG_RECORD_SEGMENT_SEAL) { + fprintf(stderr, "log scan expected one segment seal\n"); + goto cleanup; + } + if (records[0].payload.len < 8u) { + fprintf(stderr, "segment seal payload too small\n"); + goto cleanup; + } + segment_id = load_u64_le(records[0].payload.data); + + memset(&segment, 0, sizeof(segment)); + if (!build_segment_path(root, segment_id, &segment_path)) { + fprintf(stderr, "segment path build failed\n"); + goto cleanup; + } + if (!read_file(segment_path, &segment_bytes, &segment_len)) { + fprintf(stderr, "segment read failed\n"); + goto cleanup; + } + if (!amduat_enc_asl_core_index_decode_v1( + amduat_octets(segment_bytes, segment_len), &segment)) { + fprintf(stderr, "segment decode failed\n"); + goto cleanup; + } + if (segment.record_count != 2u || segment.extent_count != 2u) { + fprintf(stderr, "segment record/extent count mismatch\n"); + goto cleanup; + } + if (segment.header.segment_visibility != 1u || + segment.header.segment_domain_id != 7u) { + fprintf(stderr, "segment visibility metadata mismatch\n"); + goto cleanup; + } + if (segment.records[0].visibility != 1u || + segment.records[1].visibility != 1u || + segment.records[0].domain_id != 7u || + segment.records[1].domain_id != 7u) { + fprintf(stderr, "record visibility metadata mismatch\n"); + goto cleanup; + } + + exit_code = 0; + +cleanup: + amduat_enc_asl_core_index_free(&segment); + free(segment_path); + free(segment_bytes); + amduat_enc_asl_log_free(records, record_count); + amduat_reference_free(&ref_a); + amduat_reference_free(&ref_b); + if (!remove_tree(root)) { + fprintf(stderr, "cleanup failed\n"); + exit_code = 1; + } + free(root); + return exit_code; +} + +static int test_tombstone_round_trip(void) { + amduat_asl_store_config_t config; + amduat_asl_store_index_fs_t fs; + amduat_asl_store_t store; + amduat_asl_store_error_t err; + amduat_asl_index_state_t state; + amduat_asl_index_state_t tombstone_state; + amduat_asl_index_state_t lift_state; + amduat_artifact_t artifact; + amduat_artifact_t loaded; + amduat_reference_t ref; + uint8_t payload[4]; + char *root; + amduat_asl_log_record_t *records = NULL; + size_t record_count = 0u; + int exit_code = 1; + + root = make_temp_root(); + if (root == NULL) { + fprintf(stderr, "temp root failed\n"); + return 1; + } + + memset(&config, 0, sizeof(config)); + config.encoding_profile_id = AMDUAT_ENC_ASL1_CORE_V1; + config.hash_id = AMDUAT_HASH_ASL1_ID_SHA256; + if (!amduat_asl_store_index_fs_init(&fs, config, root)) { + fprintf(stderr, "index fs init failed\n"); + goto cleanup; + } + amduat_asl_store_init(&store, config, amduat_asl_store_index_fs_ops(), &fs); + + memset(payload, 0x2a, sizeof(payload)); + artifact = amduat_artifact(amduat_octets(payload, sizeof(payload))); + ref = amduat_reference(0u, amduat_octets(NULL, 0u)); + + err = amduat_asl_store_put_indexed(&store, artifact, &ref, &state); + if (err != AMDUAT_ASL_STORE_OK) { + fprintf(stderr, "put_indexed failed: %d\n", err); + goto cleanup; + } + + loaded = amduat_artifact(amduat_octets(NULL, 0u)); + err = amduat_asl_store_get_indexed(&store, ref, state, &loaded); + if (err != AMDUAT_ASL_STORE_OK) { + fprintf(stderr, "get_indexed failed: %d\n", err); + goto cleanup; + } + amduat_artifact_free(&loaded); + + err = amduat_asl_store_tombstone(&store, ref, 7u, 9u, &tombstone_state); + if (err != AMDUAT_ASL_STORE_OK) { + fprintf(stderr, "tombstone failed: %d\n", err); + goto cleanup; + } + if (tombstone_state.log_position <= state.log_position) { + fprintf(stderr, "tombstone did not advance log position\n"); + goto cleanup; + } + + if (!amduat_asl_index_current_state(&store, &state)) { + fprintf(stderr, "current_state failed\n"); + goto cleanup; + } + loaded = amduat_artifact(amduat_octets(NULL, 0u)); + err = amduat_asl_store_get_indexed(&store, ref, state, &loaded); + if (err != AMDUAT_ASL_STORE_ERR_NOT_FOUND) { + fprintf(stderr, "get_indexed after tombstone expected not found: %d\n", err); + amduat_artifact_free(&loaded); + goto cleanup; + } + amduat_artifact_free(&loaded); + + err = amduat_asl_store_tombstone_lift(&store, + ref, + tombstone_state.log_position, + &lift_state); + if (err != AMDUAT_ASL_STORE_OK) { + fprintf(stderr, "tombstone_lift failed: %d\n", err); + goto cleanup; + } + if (lift_state.log_position <= tombstone_state.log_position) { + fprintf(stderr, "tombstone_lift did not advance log position\n"); + goto cleanup; + } + + if (!amduat_asl_index_current_state(&store, &state)) { + fprintf(stderr, "current_state failed\n"); + goto cleanup; + } + loaded = amduat_artifact(amduat_octets(NULL, 0u)); + err = amduat_asl_store_get_indexed(&store, ref, state, &loaded); + if (err != AMDUAT_ASL_STORE_OK) { + fprintf(stderr, "get_indexed after tombstone lift failed: %d\n", err); + goto cleanup; + } + amduat_artifact_free(&loaded); + + err = amduat_asl_log_scan(&store, &records, &record_count); + if (err != AMDUAT_ASL_STORE_OK) { + fprintf(stderr, "log scan failed: %d\n", err); + goto cleanup; + } + if (record_count < 3u || + records[record_count - 1u].record_type != + AMDUAT_ASL_LOG_RECORD_TOMBSTONE_LIFT) { + fprintf(stderr, "log scan missing tombstone lift record\n"); + goto cleanup; + } + + exit_code = 0; + +cleanup: + amduat_enc_asl_log_free(records, record_count); + amduat_reference_free(&ref); + if (!remove_tree(root)) { + fprintf(stderr, "cleanup failed\n"); + exit_code = 1; + } + free(root); + return exit_code; +} + +static int test_gc_keeps_visible(void) { + amduat_asl_store_config_t config; + amduat_asl_store_index_fs_t fs; + amduat_asl_store_t store; + amduat_asl_store_error_t err; + amduat_asl_index_state_t state; + amduat_artifact_t artifact; + amduat_artifact_t loaded; + amduat_reference_t ref; + uint8_t payload[5]; + char *root; + int exit_code = 1; + + root = make_temp_root(); + if (root == NULL) { + fprintf(stderr, "temp root failed\n"); + return 1; + } + + memset(&config, 0, sizeof(config)); + config.encoding_profile_id = AMDUAT_ENC_ASL1_CORE_V1; + config.hash_id = AMDUAT_HASH_ASL1_ID_SHA256; + if (!amduat_asl_store_index_fs_init(&fs, config, root)) { + fprintf(stderr, "index fs init failed\n"); + goto cleanup; + } + amduat_asl_store_init(&store, config, amduat_asl_store_index_fs_ops(), &fs); + + memset(payload, 0x77, sizeof(payload)); + artifact = amduat_artifact(amduat_octets(payload, sizeof(payload))); + ref = amduat_reference(0u, amduat_octets(NULL, 0u)); + err = amduat_asl_store_put_indexed(&store, artifact, &ref, &state); + if (err != AMDUAT_ASL_STORE_OK) { + fprintf(stderr, "put_indexed failed: %d\n", err); + goto cleanup; + } + + err = amduat_asl_store_index_fs_gc(&fs, &state); + if (err != AMDUAT_ASL_STORE_OK) { + fprintf(stderr, "gc failed: %d\n", err); + goto cleanup; + } + + loaded = amduat_artifact(amduat_octets(NULL, 0u)); + err = amduat_asl_store_get_indexed(&store, ref, state, &loaded); + if (err != AMDUAT_ASL_STORE_OK) { + fprintf(stderr, "get_indexed after gc failed: %d\n", err); + goto cleanup; + } + amduat_artifact_free(&loaded); + + exit_code = 0; + +cleanup: + amduat_reference_free(&ref); + if (!remove_tree(root)) { + fprintf(stderr, "cleanup failed\n"); + exit_code = 1; + } + free(root); + return exit_code; +} + static int test_large_round_trip_perf(void) { enum { k_artifact_count_default = 10000, k_payload_max = 64 }; amduat_asl_store_config_t config; @@ -1278,6 +1623,15 @@ int main(void) { if (test_snapshot_truncation() != 0) { return 1; } + if (test_segment_batch_packing() != 0) { + return 1; + } + if (test_tombstone_round_trip() != 0) { + return 1; + } + if (test_gc_keeps_visible() != 0) { + return 1; + } if (test_large_round_trip_perf() != 0) { return 1; } diff --git a/tests/asl/test_asl_store_indexed_ops.c b/tests/asl/test_asl_store_indexed_ops.c index 176370b..455d27a 100644 --- a/tests/asl/test_asl_store_indexed_ops.c +++ b/tests/asl/test_asl_store_indexed_ops.c @@ -40,6 +40,26 @@ static int test_indexed_ops_unsupported(void) { return 1; } + err = amduat_asl_store_tombstone(&store, ref, 0u, 0u, &state); + if (err != AMDUAT_ASL_STORE_ERR_UNSUPPORTED) { + fprintf(stderr, "tombstone missing ops should be unsupported: %d\n", err); + return 1; + } + + err = amduat_asl_store_tombstone_lift(&store, ref, 1u, &state); + if (err != AMDUAT_ASL_STORE_ERR_UNSUPPORTED) { + fprintf(stderr, + "tombstone_lift missing ops should be unsupported: %d\n", + err); + return 1; + } + + err = amduat_asl_log_scan(&store, NULL, NULL); + if (err != AMDUAT_ASL_STORE_ERR_UNSUPPORTED) { + fprintf(stderr, "log_scan missing ops should be unsupported: %d\n", err); + return 1; + } + return 0; }