diff --git a/include/amduat/asl/asl_store_index_fs.h b/include/amduat/asl/asl_store_index_fs.h index d9d184a..fa7b86d 100644 --- a/include/amduat/asl/asl_store_index_fs.h +++ b/include/amduat/asl/asl_store_index_fs.h @@ -21,6 +21,7 @@ typedef struct { typedef struct { amduat_asl_store_config_t config; amduat_asl_store_index_fs_snapshot_policy_t snapshot_policy; + uint16_t shard_count; char root_path[AMDUAT_ASL_STORE_INDEX_FS_ROOT_MAX]; uint64_t pending_snapshot_bytes; uint64_t last_ingest_time_ns; @@ -36,6 +37,10 @@ void amduat_asl_store_index_fs_set_snapshot_policy( amduat_asl_store_index_fs_t *fs, amduat_asl_store_index_fs_snapshot_policy_t policy); +void amduat_asl_store_index_fs_set_shard_count( + amduat_asl_store_index_fs_t *fs, + uint16_t shard_count); + amduat_asl_store_error_t amduat_asl_store_index_fs_snapshot_create( amduat_asl_store_index_fs_t *fs, amduat_asl_snapshot_id_t snapshot_id, diff --git a/src/adapters/asl_store_index_fs/asl_store_index_fs.c b/src/adapters/asl_store_index_fs/asl_store_index_fs.c index e5695b4..07ace06 100644 --- a/src/adapters/asl_store_index_fs/asl_store_index_fs.c +++ b/src/adapters/asl_store_index_fs/asl_store_index_fs.c @@ -11,6 +11,7 @@ #include "amduat/enc/asl_log.h" #include "amduat/hash/asl1.h" +#include #include #include #include @@ -32,7 +33,12 @@ enum { AMDUAT_ASL_STORE_INDEX_FS_MIN_DIGEST_BYTES = 2, AMDUAT_ASL_STORE_INDEX_FS_SEGMENT_HASH_LEN = 32, AMDUAT_ASL_STORE_INDEX_FS_DEFAULT_PENDING_BYTES = 128u * 1024u, - AMDUAT_ASL_STORE_INDEX_FS_DEFAULT_IDLE_NS = 2u * 1000u * 1000u * 1000u + AMDUAT_ASL_STORE_INDEX_FS_DEFAULT_IDLE_NS = 2u * 1000u * 1000u * 1000u, + AMDUAT_ASL_STORE_INDEX_FS_SHARD_SHIFT = 48, + AMDUAT_ASL_STORE_INDEX_FS_LOG_MAGIC_LEN = 8, + AMDUAT_ASL_STORE_INDEX_FS_LOG_HASH_LEN = 32, + AMDUAT_ASL_STORE_INDEX_FS_LOG_HEADER_LEN = 24, + AMDUAT_ASL_STORE_INDEX_FS_LOG_VERSION = 1 }; typedef enum { @@ -48,6 +54,18 @@ typedef enum { AMDUAT_ASL_STORE_INDEX_FS_READ_CORRUPT = 3 } amduat_asl_store_index_fs_read_status_t; +static amduat_asl_store_index_fs_write_status_t +amduat_asl_store_index_fs_write_replace(const char *temp_dir, + const char *final_path, + const uint8_t *bytes, + size_t size); + +static uint64_t amduat_asl_store_index_fs_load_u64_le(const uint8_t *data); + +static const uint8_t k_amduat_asl_store_index_fs_log_magic[ + AMDUAT_ASL_STORE_INDEX_FS_LOG_MAGIC_LEN] = {'A', 'S', 'L', 'L', + 'O', 'G', '0', '1'}; + static bool amduat_asl_store_index_fs_ensure_directory(const char *path) { struct stat st; @@ -96,6 +114,201 @@ static bool amduat_asl_store_index_fs_fsync_directory(const char *path) { return true; } +static void amduat_asl_store_index_fs_log_store_u32_le(uint8_t *out, + uint32_t value) { + out[0] = (uint8_t)(value & 0xffu); + out[1] = (uint8_t)((value >> 8) & 0xffu); + out[2] = (uint8_t)((value >> 16) & 0xffu); + out[3] = (uint8_t)((value >> 24) & 0xffu); +} + +static void amduat_asl_store_index_fs_log_store_u64_le(uint8_t *out, + uint64_t value) { + out[0] = (uint8_t)(value & 0xffu); + out[1] = (uint8_t)((value >> 8) & 0xffu); + out[2] = (uint8_t)((value >> 16) & 0xffu); + out[3] = (uint8_t)((value >> 24) & 0xffu); + out[4] = (uint8_t)((value >> 32) & 0xffu); + out[5] = (uint8_t)((value >> 40) & 0xffu); + out[6] = (uint8_t)((value >> 48) & 0xffu); + out[7] = (uint8_t)((value >> 56) & 0xffu); +} + +static uint32_t amduat_asl_store_index_fs_log_load_u32_le( + const uint8_t *data) { + return (uint32_t)data[0] | ((uint32_t)data[1] << 8) | + ((uint32_t)data[2] << 16) | ((uint32_t)data[3] << 24); +} + +static bool amduat_asl_store_index_fs_log_hash_record( + const uint8_t prev_hash[AMDUAT_ASL_STORE_INDEX_FS_LOG_HASH_LEN], + uint64_t logseq, + uint32_t record_type, + uint32_t payload_len, + const uint8_t *payload, + uint8_t out_hash[AMDUAT_ASL_STORE_INDEX_FS_LOG_HASH_LEN]) { + amduat_hash_asl1_stream_t stream; + uint8_t logseq_bytes[8]; + uint8_t type_bytes[4]; + uint8_t payload_len_bytes[4]; + bool ok; + + if (payload_len != 0u && payload == NULL) { + return false; + } + + if (!amduat_hash_asl1_stream_init(AMDUAT_HASH_ASL1_ID_SHA256, &stream)) { + return false; + } + + amduat_asl_store_index_fs_log_store_u64_le(logseq_bytes, logseq); + amduat_asl_store_index_fs_log_store_u32_le(type_bytes, record_type); + amduat_asl_store_index_fs_log_store_u32_le(payload_len_bytes, payload_len); + + ok = amduat_hash_asl1_stream_update( + &stream, + amduat_octets(prev_hash, AMDUAT_ASL_STORE_INDEX_FS_LOG_HASH_LEN)) && + amduat_hash_asl1_stream_update( + &stream, amduat_octets(logseq_bytes, sizeof(logseq_bytes))) && + amduat_hash_asl1_stream_update( + &stream, amduat_octets(type_bytes, sizeof(type_bytes))) && + amduat_hash_asl1_stream_update( + &stream, amduat_octets(payload_len_bytes, + sizeof(payload_len_bytes))); + + if (ok && payload_len != 0u) { + ok = amduat_hash_asl1_stream_update( + &stream, amduat_octets(payload, payload_len)); + } + + if (ok) { + ok = amduat_hash_asl1_stream_final( + &stream, out_hash, AMDUAT_ASL_STORE_INDEX_FS_LOG_HASH_LEN); + } + + amduat_hash_asl1_stream_destroy(&stream); + return ok; +} + +static bool amduat_asl_store_index_fs_log_read_exact(int fd, + uint8_t *buf, + size_t len) { + size_t offset = 0u; + + while (offset < len) { + ssize_t rc = read(fd, buf + offset, len - offset); + if (rc < 0) { + if (errno == EINTR) { + continue; + } + return false; + } + if (rc == 0) { + return false; + } + offset += (size_t)rc; + } + return true; +} + +static int amduat_asl_store_index_fs_log_read_exact_eof(int fd, + uint8_t *buf, + size_t len) { + size_t offset = 0u; + + while (offset < len) { + ssize_t rc = read(fd, buf + offset, len - offset); + if (rc < 0) { + if (errno == EINTR) { + continue; + } + return -1; + } + if (rc == 0) { + return offset == 0u ? 0 : -1; + } + offset += (size_t)rc; + } + return 1; +} + +static bool amduat_asl_store_index_fs_log_write_exact(int fd, + const uint8_t *buf, + size_t len) { + size_t offset = 0u; + + while (offset < len) { + ssize_t rc = write(fd, buf + offset, len - offset); + if (rc < 0) { + if (errno == EINTR) { + continue; + } + return false; + } + offset += (size_t)rc; + } + return true; +} + +static bool amduat_asl_store_index_fs_log_read_header_bytes( + int fd, + uint8_t **out_bytes, + size_t *out_size) { + uint8_t header_prefix[AMDUAT_ASL_STORE_INDEX_FS_LOG_HEADER_LEN]; + uint32_t version; + uint32_t header_size; + uint64_t flags; + size_t remaining; + uint8_t *buffer; + + if (out_bytes == NULL || out_size == NULL) { + return false; + } + *out_bytes = NULL; + *out_size = 0u; + + if (!amduat_asl_store_index_fs_log_read_exact(fd, + header_prefix, + sizeof(header_prefix))) { + return false; + } + + if (memcmp(header_prefix, + k_amduat_asl_store_index_fs_log_magic, + AMDUAT_ASL_STORE_INDEX_FS_LOG_MAGIC_LEN) != 0) { + return false; + } + version = amduat_asl_store_index_fs_log_load_u32_le( + header_prefix + AMDUAT_ASL_STORE_INDEX_FS_LOG_MAGIC_LEN); + header_size = amduat_asl_store_index_fs_log_load_u32_le( + header_prefix + AMDUAT_ASL_STORE_INDEX_FS_LOG_MAGIC_LEN + 4u); + flags = amduat_asl_store_index_fs_load_u64_le( + header_prefix + AMDUAT_ASL_STORE_INDEX_FS_LOG_MAGIC_LEN + 8u); + + if (version != AMDUAT_ASL_STORE_INDEX_FS_LOG_VERSION || + flags != 0u || + header_size < AMDUAT_ASL_STORE_INDEX_FS_LOG_HEADER_LEN) { + return false; + } + + buffer = (uint8_t *)malloc(header_size); + if (buffer == NULL) { + return false; + } + memcpy(buffer, header_prefix, sizeof(header_prefix)); + remaining = header_size - sizeof(header_prefix); + if (remaining != 0u && + !amduat_asl_store_index_fs_log_read_exact( + fd, buffer + sizeof(header_prefix), remaining)) { + free(buffer); + return false; + } + + *out_bytes = buffer; + *out_size = header_size; + return true; +} + static amduat_asl_store_index_fs_read_status_t amduat_asl_store_index_fs_read_file(const char *path, uint8_t **out_bytes, @@ -362,31 +575,146 @@ static void amduat_asl_store_index_fs_log_free( amduat_enc_asl_log_free(records, record_count); } +static bool amduat_asl_store_index_fs_build_segments_path( + amduat_asl_store_index_fs_t *fs, + uint16_t shard_id, + char **out_path) { + if (fs == NULL || out_path == NULL) { + return false; + } + if (fs->shard_count <= 1u) { + return amduat_asl_store_index_fs_layout_build_segments_path(fs->root_path, + out_path); + } + return amduat_asl_store_index_fs_layout_build_shard_segments_path( + fs->root_path, shard_id, out_path); +} + +static bool amduat_asl_store_index_fs_build_blocks_path( + amduat_asl_store_index_fs_t *fs, + uint16_t shard_id, + char **out_path) { + if (fs == NULL || out_path == NULL) { + return false; + } + if (fs->shard_count <= 1u) { + return amduat_asl_store_index_fs_layout_build_blocks_path(fs->root_path, + out_path); + } + return amduat_asl_store_index_fs_layout_build_shard_blocks_path( + fs->root_path, shard_id, out_path); +} + +static bool amduat_asl_store_index_fs_build_segment_meta_path( + amduat_asl_store_index_fs_t *fs, + uint16_t shard_id, + char **out_path) { + if (fs == NULL || out_path == NULL) { + return false; + } + if (fs->shard_count <= 1u) { + return amduat_asl_store_index_fs_layout_build_segment_meta_path( + fs->root_path, out_path); + } + return amduat_asl_store_index_fs_layout_build_shard_segment_meta_path( + fs->root_path, shard_id, out_path); +} + +static bool amduat_asl_store_index_fs_build_segment_path( + amduat_asl_store_index_fs_t *fs, + uint16_t shard_id, + uint64_t segment_id, + char **out_path) { + if (fs == NULL || out_path == NULL) { + return false; + } + if (fs->shard_count <= 1u) { + return amduat_asl_store_index_fs_layout_build_segment_path(fs->root_path, + segment_id, + out_path); + } + return amduat_asl_store_index_fs_layout_build_shard_segment_path( + fs->root_path, shard_id, segment_id, out_path); +} + +static bool amduat_asl_store_index_fs_build_block_path( + amduat_asl_store_index_fs_t *fs, + uint16_t shard_id, + uint64_t block_id, + char **out_path) { + if (fs == NULL || out_path == NULL) { + return false; + } + if (fs->shard_count <= 1u) { + return amduat_asl_store_index_fs_layout_build_block_path(fs->root_path, + block_id, + out_path); + } + return amduat_asl_store_index_fs_layout_build_shard_block_path( + fs->root_path, shard_id, block_id, out_path); +} + static bool amduat_asl_store_index_fs_prepare_dirs( - const char *root_path, + amduat_asl_store_index_fs_t *fs, + uint16_t shard_id, char **out_index_path, char **out_segments_path, char **out_blocks_path) { - if (!amduat_asl_store_index_fs_layout_build_index_path(root_path, - out_index_path)) { + if (fs == NULL || out_index_path == NULL || out_segments_path == NULL || + out_blocks_path == NULL) { return false; } - if (!amduat_asl_store_index_fs_layout_build_segments_path(root_path, - out_segments_path)) { + if (!amduat_asl_store_index_fs_layout_build_index_path(fs->root_path, + out_index_path)) { + return false; + } + if (!amduat_asl_store_index_fs_ensure_directory(*out_index_path)) { free(*out_index_path); *out_index_path = NULL; return false; } - if (!amduat_asl_store_index_fs_layout_build_blocks_path(root_path, - out_blocks_path)) { + if (fs->shard_count > 1u) { + char *shards_path = NULL; + char *shard_path = NULL; + if (!amduat_asl_store_index_fs_layout_build_shards_path(fs->root_path, + &shards_path) || + !amduat_asl_store_index_fs_layout_build_shard_path(fs->root_path, + shard_id, + &shard_path)) { + free(*out_index_path); + *out_index_path = NULL; + free(shards_path); + free(shard_path); + return false; + } + if (!amduat_asl_store_index_fs_ensure_directory(shards_path) || + !amduat_asl_store_index_fs_ensure_directory(shard_path)) { + free(*out_index_path); + *out_index_path = NULL; + free(shards_path); + free(shard_path); + return false; + } + free(shards_path); + free(shard_path); + } + if (!amduat_asl_store_index_fs_build_segments_path(fs, + shard_id, + out_segments_path)) { + free(*out_index_path); + *out_index_path = NULL; + return false; + } + if (!amduat_asl_store_index_fs_build_blocks_path(fs, + shard_id, + out_blocks_path)) { free(*out_index_path); *out_index_path = NULL; free(*out_segments_path); *out_segments_path = NULL; return false; } - if (!amduat_asl_store_index_fs_ensure_directory(*out_index_path) || - !amduat_asl_store_index_fs_ensure_directory(*out_segments_path) || + if (!amduat_asl_store_index_fs_ensure_directory(*out_segments_path) || !amduat_asl_store_index_fs_ensure_directory(*out_blocks_path)) { return false; } @@ -455,6 +783,526 @@ static amduat_asl_store_error_t amduat_asl_store_index_fs_write_log( return AMDUAT_ASL_STORE_ERR_IO; } +static amduat_asl_store_error_t amduat_asl_store_index_fs_stream_log_apply( + const char *log_path, + amduat_asl_replay_state_t *replay_state, + amduat_asl_log_position_t *out_last_logseq, + uint8_t out_last_hash[AMDUAT_ASL_STORE_INDEX_FS_LOG_HASH_LEN]) { + int fd; + uint8_t *header_bytes = NULL; + size_t header_size = 0u; + uint8_t prev_hash[AMDUAT_ASL_STORE_INDEX_FS_LOG_HASH_LEN]; + uint64_t last_logseq = 0u; + + if (replay_state == NULL || out_last_logseq == NULL || + out_last_hash == NULL) { + return AMDUAT_ASL_STORE_ERR_IO; + } + + memset(prev_hash, 0, sizeof(prev_hash)); + fd = open(log_path, O_RDONLY); + if (fd < 0) { + if (errno == ENOENT || errno == ENOTDIR) { + *out_last_logseq = 0u; + memcpy(out_last_hash, prev_hash, sizeof(prev_hash)); + return AMDUAT_ASL_STORE_OK; + } + return AMDUAT_ASL_STORE_ERR_IO; + } + + if (!amduat_asl_store_index_fs_log_read_header_bytes(fd, + &header_bytes, + &header_size)) { + close(fd); + return AMDUAT_ASL_STORE_ERR_INTEGRITY; + } + free(header_bytes); + header_bytes = NULL; + (void)header_size; + + while (true) { + uint8_t record_header[16]; + uint64_t logseq; + uint32_t record_type; + uint32_t payload_len; + uint8_t *payload = NULL; + uint8_t record_hash[AMDUAT_ASL_STORE_INDEX_FS_LOG_HASH_LEN]; + uint8_t expected_hash[AMDUAT_ASL_STORE_INDEX_FS_LOG_HASH_LEN]; + int rc; + amduat_asl_log_record_t record; + + rc = amduat_asl_store_index_fs_log_read_exact_eof(fd, + record_header, + sizeof(record_header)); + if (rc == 0) { + break; + } + if (rc < 0) { + close(fd); + return AMDUAT_ASL_STORE_ERR_INTEGRITY; + } + + logseq = amduat_asl_store_index_fs_load_u64_le(record_header); + record_type = + amduat_asl_store_index_fs_log_load_u32_le(record_header + 8u); + payload_len = + amduat_asl_store_index_fs_log_load_u32_le(record_header + 12u); + + if (payload_len != 0u) { + payload = (uint8_t *)malloc(payload_len); + if (payload == NULL) { + close(fd); + return AMDUAT_ASL_STORE_ERR_IO; + } + if (!amduat_asl_store_index_fs_log_read_exact(fd, + payload, + payload_len)) { + free(payload); + close(fd); + return AMDUAT_ASL_STORE_ERR_INTEGRITY; + } + } + + if (!amduat_asl_store_index_fs_log_read_exact(fd, + record_hash, + sizeof(record_hash))) { + free(payload); + close(fd); + return AMDUAT_ASL_STORE_ERR_INTEGRITY; + } + + if (!amduat_asl_store_index_fs_log_hash_record(prev_hash, + logseq, + record_type, + payload_len, + payload, + expected_hash)) { + free(payload); + close(fd); + return AMDUAT_ASL_STORE_ERR_INTEGRITY; + } + if (memcmp(expected_hash, record_hash, sizeof(record_hash)) != 0) { + free(payload); + close(fd); + return AMDUAT_ASL_STORE_ERR_INTEGRITY; + } + + record.logseq = logseq; + record.record_type = record_type; + record.payload = amduat_octets(payload, payload_len); + if (!amduat_asl_replay_apply_log(&record, 1u, logseq, replay_state)) { + free(payload); + close(fd); + return AMDUAT_ASL_STORE_ERR_INTEGRITY; + } + + memcpy(prev_hash, record_hash, sizeof(prev_hash)); + last_logseq = logseq; + free(payload); + } + + close(fd); + *out_last_logseq = last_logseq; + memcpy(out_last_hash, prev_hash, sizeof(prev_hash)); + return AMDUAT_ASL_STORE_OK; +} + +static amduat_asl_store_error_t amduat_asl_store_index_fs_append_anchor_log( + const char *log_path, + const char *log_dir, + amduat_asl_snapshot_id_t snapshot_id, + amduat_asl_log_position_t anchor_logseq, + const uint8_t root_hash[32], + const uint8_t prev_hash[AMDUAT_ASL_STORE_INDEX_FS_LOG_HASH_LEN]) { + static const char suffix[] = "tmp.XXXXXX"; + size_t temp_len; + bool need_sep; + size_t template_len; + char *template_path; + int temp_fd; + int src_fd; + uint8_t buffer[4096]; + uint8_t payload[8 + 32]; + uint8_t record_hash[AMDUAT_ASL_STORE_INDEX_FS_LOG_HASH_LEN]; + uint8_t header[AMDUAT_ASL_STORE_INDEX_FS_LOG_HEADER_LEN]; + + if (log_path == NULL || log_dir == NULL || root_hash == NULL || + prev_hash == NULL) { + return AMDUAT_ASL_STORE_ERR_IO; + } + + temp_len = strlen(log_dir); + if (temp_len == 0u) { + return AMDUAT_ASL_STORE_ERR_IO; + } + need_sep = log_dir[temp_len - 1u] != '/'; + template_len = temp_len + (need_sep ? 1u : 0u) + sizeof(suffix); + + template_path = (char *)malloc(template_len); + if (template_path == NULL) { + return AMDUAT_ASL_STORE_ERR_IO; + } + if (need_sep) { + snprintf(template_path, template_len, "%s/%s", log_dir, suffix); + } else { + snprintf(template_path, template_len, "%s%s", log_dir, suffix); + } + + temp_fd = mkstemp(template_path); + if (temp_fd < 0) { + free(template_path); + return AMDUAT_ASL_STORE_ERR_IO; + } + + src_fd = open(log_path, O_RDONLY); + if (src_fd < 0) { + if (errno != ENOENT && errno != ENOTDIR) { + close(temp_fd); + unlink(template_path); + free(template_path); + return AMDUAT_ASL_STORE_ERR_IO; + } + memcpy(header, + k_amduat_asl_store_index_fs_log_magic, + AMDUAT_ASL_STORE_INDEX_FS_LOG_MAGIC_LEN); + amduat_asl_store_index_fs_log_store_u32_le( + header + AMDUAT_ASL_STORE_INDEX_FS_LOG_MAGIC_LEN, + AMDUAT_ASL_STORE_INDEX_FS_LOG_VERSION); + amduat_asl_store_index_fs_log_store_u32_le( + header + AMDUAT_ASL_STORE_INDEX_FS_LOG_MAGIC_LEN + 4u, + AMDUAT_ASL_STORE_INDEX_FS_LOG_HEADER_LEN); + amduat_asl_store_index_fs_log_store_u64_le( + header + AMDUAT_ASL_STORE_INDEX_FS_LOG_MAGIC_LEN + 8u, 0u); + if (!amduat_asl_store_index_fs_log_write_exact(temp_fd, + header, + sizeof(header))) { + close(temp_fd); + unlink(template_path); + free(template_path); + return AMDUAT_ASL_STORE_ERR_IO; + } + } else { + while (true) { + ssize_t rc = read(src_fd, buffer, sizeof(buffer)); + if (rc < 0) { + if (errno == EINTR) { + continue; + } + close(src_fd); + close(temp_fd); + unlink(template_path); + free(template_path); + return AMDUAT_ASL_STORE_ERR_IO; + } + if (rc == 0) { + break; + } + if (!amduat_asl_store_index_fs_log_write_exact( + temp_fd, buffer, (size_t)rc)) { + close(src_fd); + close(temp_fd); + unlink(template_path); + free(template_path); + return AMDUAT_ASL_STORE_ERR_IO; + } + } + close(src_fd); + } + + amduat_asl_store_index_fs_log_store_u64_le(payload, snapshot_id); + memcpy(payload + 8u, root_hash, 32u); + if (!amduat_asl_store_index_fs_log_hash_record( + prev_hash, + anchor_logseq, + AMDUAT_ASL_LOG_RECORD_SNAPSHOT_ANCHOR, + (uint32_t)sizeof(payload), + payload, + record_hash)) { + close(temp_fd); + unlink(template_path); + free(template_path); + return AMDUAT_ASL_STORE_ERR_IO; + } + + { + uint8_t record_header[16]; + amduat_asl_store_index_fs_log_store_u64_le(record_header, anchor_logseq); + amduat_asl_store_index_fs_log_store_u32_le(record_header + 8u, + AMDUAT_ASL_LOG_RECORD_SNAPSHOT_ANCHOR); + amduat_asl_store_index_fs_log_store_u32_le(record_header + 12u, + (uint32_t)sizeof(payload)); + if (!amduat_asl_store_index_fs_log_write_exact( + temp_fd, record_header, sizeof(record_header)) || + !amduat_asl_store_index_fs_log_write_exact( + temp_fd, payload, sizeof(payload)) || + !amduat_asl_store_index_fs_log_write_exact( + temp_fd, record_hash, sizeof(record_hash))) { + close(temp_fd); + unlink(template_path); + free(template_path); + return AMDUAT_ASL_STORE_ERR_IO; + } + } + + if (fsync(temp_fd) != 0) { + close(temp_fd); + unlink(template_path); + free(template_path); + return AMDUAT_ASL_STORE_ERR_IO; + } + if (close(temp_fd) != 0) { + unlink(template_path); + free(template_path); + return AMDUAT_ASL_STORE_ERR_IO; + } + if (rename(template_path, log_path) != 0) { + unlink(template_path); + free(template_path); + return AMDUAT_ASL_STORE_ERR_IO; + } + free(template_path); + + if (!amduat_asl_store_index_fs_fsync_directory(log_dir)) { + return AMDUAT_ASL_STORE_ERR_IO; + } + return AMDUAT_ASL_STORE_OK; +} + +static size_t amduat_asl_store_index_fs_find_log_start( + const amduat_asl_log_record_t *records, + size_t record_count, + amduat_asl_log_position_t anchor_logseq) { + size_t i; + + if (records == NULL || record_count == 0u) { + return 0u; + } + for (i = 0u; i < record_count; ++i) { + if (records[i].logseq > anchor_logseq) { + return i; + } + } + return record_count; +} + +static amduat_asl_store_error_t amduat_asl_store_index_fs_truncate_log( + const char *log_path, + const char *log_dir, + amduat_asl_log_position_t anchor_logseq) { + static const char suffix[] = "tmp.XXXXXX"; + size_t temp_len; + bool need_sep; + size_t template_len; + char *template_path; + int src_fd; + int temp_fd; + uint8_t *header_bytes = NULL; + size_t header_size = 0u; + uint8_t src_prev_hash[AMDUAT_ASL_STORE_INDEX_FS_LOG_HASH_LEN]; + uint8_t dst_prev_hash[AMDUAT_ASL_STORE_INDEX_FS_LOG_HASH_LEN]; + + if (log_path == NULL || log_dir == NULL) { + return AMDUAT_ASL_STORE_ERR_IO; + } + + src_fd = open(log_path, O_RDONLY); + if (src_fd < 0) { + if (errno == ENOENT || errno == ENOTDIR) { + return AMDUAT_ASL_STORE_OK; + } + return AMDUAT_ASL_STORE_ERR_IO; + } + + if (!amduat_asl_store_index_fs_log_read_header_bytes(src_fd, + &header_bytes, + &header_size)) { + close(src_fd); + return AMDUAT_ASL_STORE_ERR_INTEGRITY; + } + + temp_len = strlen(log_dir); + if (temp_len == 0u) { + free(header_bytes); + close(src_fd); + return AMDUAT_ASL_STORE_ERR_IO; + } + need_sep = log_dir[temp_len - 1u] != '/'; + template_len = temp_len + (need_sep ? 1u : 0u) + sizeof(suffix); + + template_path = (char *)malloc(template_len); + if (template_path == NULL) { + free(header_bytes); + close(src_fd); + return AMDUAT_ASL_STORE_ERR_IO; + } + if (need_sep) { + snprintf(template_path, template_len, "%s/%s", log_dir, suffix); + } else { + snprintf(template_path, template_len, "%s%s", log_dir, suffix); + } + + temp_fd = mkstemp(template_path); + if (temp_fd < 0) { + free(template_path); + free(header_bytes); + close(src_fd); + return AMDUAT_ASL_STORE_ERR_IO; + } + + if (!amduat_asl_store_index_fs_log_write_exact(temp_fd, + header_bytes, + header_size)) { + close(src_fd); + close(temp_fd); + unlink(template_path); + free(template_path); + free(header_bytes); + return AMDUAT_ASL_STORE_ERR_IO; + } + free(header_bytes); + memset(src_prev_hash, 0, sizeof(src_prev_hash)); + memset(dst_prev_hash, 0, sizeof(dst_prev_hash)); + + while (true) { + uint8_t record_header[16]; + uint64_t logseq; + uint32_t record_type; + uint32_t payload_len; + uint8_t *payload = NULL; + uint8_t record_hash[AMDUAT_ASL_STORE_INDEX_FS_LOG_HASH_LEN]; + uint8_t expected_hash[AMDUAT_ASL_STORE_INDEX_FS_LOG_HASH_LEN]; + int rc; + + rc = amduat_asl_store_index_fs_log_read_exact_eof(src_fd, + record_header, + sizeof(record_header)); + if (rc == 0) { + break; + } + if (rc < 0) { + close(src_fd); + close(temp_fd); + unlink(template_path); + free(template_path); + return AMDUAT_ASL_STORE_ERR_INTEGRITY; + } + + logseq = amduat_asl_store_index_fs_load_u64_le(record_header); + record_type = + amduat_asl_store_index_fs_log_load_u32_le(record_header + 8u); + payload_len = + amduat_asl_store_index_fs_log_load_u32_le(record_header + 12u); + + if (payload_len != 0u) { + payload = (uint8_t *)malloc(payload_len); + if (payload == NULL) { + close(src_fd); + close(temp_fd); + unlink(template_path); + free(template_path); + return AMDUAT_ASL_STORE_ERR_IO; + } + if (!amduat_asl_store_index_fs_log_read_exact(src_fd, + payload, + payload_len)) { + free(payload); + close(src_fd); + close(temp_fd); + unlink(template_path); + free(template_path); + return AMDUAT_ASL_STORE_ERR_INTEGRITY; + } + } + + if (!amduat_asl_store_index_fs_log_read_exact(src_fd, + record_hash, + sizeof(record_hash))) { + free(payload); + close(src_fd); + close(temp_fd); + unlink(template_path); + free(template_path); + return AMDUAT_ASL_STORE_ERR_INTEGRITY; + } + + if (!amduat_asl_store_index_fs_log_hash_record(src_prev_hash, + logseq, + record_type, + payload_len, + payload, + expected_hash) || + memcmp(expected_hash, record_hash, sizeof(record_hash)) != 0) { + free(payload); + close(src_fd); + close(temp_fd); + unlink(template_path); + free(template_path); + return AMDUAT_ASL_STORE_ERR_INTEGRITY; + } + memcpy(src_prev_hash, record_hash, sizeof(src_prev_hash)); + + if (logseq > anchor_logseq) { + uint8_t new_hash[AMDUAT_ASL_STORE_INDEX_FS_LOG_HASH_LEN]; + + if (!amduat_asl_store_index_fs_log_hash_record(dst_prev_hash, + logseq, + record_type, + payload_len, + payload, + new_hash)) { + free(payload); + close(src_fd); + close(temp_fd); + unlink(template_path); + free(template_path); + return AMDUAT_ASL_STORE_ERR_INTEGRITY; + } + + if (!amduat_asl_store_index_fs_log_write_exact( + temp_fd, record_header, sizeof(record_header)) || + (payload_len != 0u && + !amduat_asl_store_index_fs_log_write_exact( + temp_fd, payload, payload_len)) || + !amduat_asl_store_index_fs_log_write_exact( + temp_fd, new_hash, sizeof(new_hash))) { + free(payload); + close(src_fd); + close(temp_fd); + unlink(template_path); + free(template_path); + return AMDUAT_ASL_STORE_ERR_IO; + } + memcpy(dst_prev_hash, new_hash, sizeof(dst_prev_hash)); + } + + free(payload); + } + + close(src_fd); + if (fsync(temp_fd) != 0) { + close(temp_fd); + unlink(template_path); + free(template_path); + return AMDUAT_ASL_STORE_ERR_IO; + } + if (close(temp_fd) != 0) { + unlink(template_path); + free(template_path); + return AMDUAT_ASL_STORE_ERR_IO; + } + if (rename(template_path, log_path) != 0) { + unlink(template_path); + free(template_path); + return AMDUAT_ASL_STORE_ERR_IO; + } + free(template_path); + + if (!amduat_asl_store_index_fs_fsync_directory(log_dir)) { + return AMDUAT_ASL_STORE_ERR_IO; + } + return AMDUAT_ASL_STORE_OK; +} + static bool amduat_asl_store_index_fs_parse_u64(const char *value, uint64_t *out) { char *endptr; @@ -476,6 +1324,131 @@ static bool amduat_asl_store_index_fs_parse_u64(const char *value, return true; } +static bool amduat_asl_store_index_fs_parse_snapshot_filename( + const char *name, + amduat_asl_snapshot_id_t *out_id) { + const char prefix[] = "snap-"; + const char suffix[] = ".bin"; + size_t name_len; + size_t prefix_len; + size_t suffix_len; + size_t hex_len; + char *hex; + char *endptr; + unsigned long long parsed; + + if (name == NULL || out_id == NULL) { + return false; + } + + name_len = strlen(name); + prefix_len = sizeof(prefix) - 1u; + suffix_len = sizeof(suffix) - 1u; + if (name_len <= prefix_len + suffix_len) { + return false; + } + if (strncmp(name, prefix, prefix_len) != 0) { + return false; + } + if (strcmp(name + name_len - suffix_len, suffix) != 0) { + return false; + } + + hex_len = name_len - prefix_len - suffix_len; + hex = (char *)malloc(hex_len + 1u); + if (hex == NULL) { + return false; + } + memcpy(hex, name + prefix_len, hex_len); + hex[hex_len] = '\0'; + + errno = 0; + parsed = strtoull(hex, &endptr, 16); + free(hex); + if (errno != 0 || endptr == NULL || *endptr != '\0') { + return false; + } + if (parsed == 0u || parsed > UINT64_MAX) { + return false; + } + *out_id = (amduat_asl_snapshot_id_t)parsed; + return true; +} + +static bool amduat_asl_store_index_fs_find_latest_snapshot_id( + amduat_asl_store_index_fs_t *fs, + amduat_asl_snapshot_id_t *out_id) { + char *snapshots_path; + DIR *dir; + struct dirent *entry; + amduat_asl_snapshot_id_t latest; + bool found = false; + + if (fs == NULL || out_id == NULL) { + return false; + } + + if (!amduat_asl_store_index_fs_layout_build_snapshots_path(fs->root_path, + &snapshots_path)) { + return false; + } + dir = opendir(snapshots_path); + if (dir == NULL) { + free(snapshots_path); + return false; + } + + latest = 0u; + while ((entry = readdir(dir)) != NULL) { + amduat_asl_snapshot_id_t snapshot_id; + if (!amduat_asl_store_index_fs_parse_snapshot_filename(entry->d_name, + &snapshot_id)) { + continue; + } + if (!found || snapshot_id > latest) { + latest = snapshot_id; + found = true; + } + } + + closedir(dir); + free(snapshots_path); + if (!found) { + return false; + } + *out_id = latest; + return true; +} + +static bool amduat_asl_store_index_fs_load_snapshot_manifest( + const char *root_path, + amduat_asl_snapshot_id_t snapshot_id, + amduat_asl_snapshot_manifest_t *out_manifest, + uint8_t out_hash[32]) { + char *manifest_path; + + if (root_path == NULL || out_manifest == NULL) { + return false; + } + + if (!amduat_asl_store_index_fs_layout_build_snapshot_manifest_path( + root_path, snapshot_id, &manifest_path)) { + return false; + } + if (!amduat_asl_snapshot_manifest_read(manifest_path, out_manifest, + out_hash)) { + free(manifest_path); + return false; + } + free(manifest_path); + + if (out_manifest->snapshot_id != snapshot_id) { + amduat_asl_snapshot_manifest_free(out_manifest); + return false; + } + return true; +} + static uint64_t amduat_asl_store_index_fs_load_u64_le(const uint8_t *data) { return (uint64_t)data[0] | ((uint64_t)data[1] << 8) | ((uint64_t)data[2] << 16) | ((uint64_t)data[3] << 24) | @@ -493,55 +1466,87 @@ static uint64_t amduat_asl_store_index_fs_now_ns(void) { (uint64_t)ts.tv_nsec; } +static uint64_t amduat_asl_store_index_fs_local_id_mask(void) { + return (1ull << AMDUAT_ASL_STORE_INDEX_FS_SHARD_SHIFT) - 1ull; +} + +static uint16_t amduat_asl_store_index_fs_segment_shard(uint64_t segment_id) { + return (uint16_t)(segment_id >> AMDUAT_ASL_STORE_INDEX_FS_SHARD_SHIFT); +} + +static uint64_t amduat_asl_store_index_fs_segment_local(uint64_t segment_id) { + return segment_id & amduat_asl_store_index_fs_local_id_mask(); +} + +static bool amduat_asl_store_index_fs_make_segment_id( + uint16_t shard_id, + uint64_t local_id, + uint64_t *out_id) { + if (out_id == NULL) { + return false; + } + if (local_id > amduat_asl_store_index_fs_local_id_mask()) { + return false; + } + *out_id = ((uint64_t)shard_id << AMDUAT_ASL_STORE_INDEX_FS_SHARD_SHIFT) | + local_id; + return true; +} + +static uint64_t amduat_asl_store_index_fs_fnv1a64(const uint8_t *data, + size_t len, + uint64_t seed) { + size_t i; + uint64_t hash = seed; + + for (i = 0; i < len; ++i) { + hash ^= data[i]; + hash *= 1099511628211ull; + } + return hash; +} + +static uint16_t amduat_asl_store_index_fs_ref_shard( + amduat_reference_t ref, + uint16_t shard_count) { + uint64_t hash; + uint8_t hash_id_bytes[2]; + + if (shard_count == 0u) { + return 0u; + } + + hash_id_bytes[0] = (uint8_t)(ref.hash_id & 0xffu); + hash_id_bytes[1] = (uint8_t)((ref.hash_id >> 8) & 0xffu); + hash = amduat_asl_store_index_fs_fnv1a64(hash_id_bytes, + sizeof(hash_id_bytes), + 14695981039346656037ull); + if (ref.digest.len != 0u && ref.digest.data != NULL) { + hash = amduat_asl_store_index_fs_fnv1a64(ref.digest.data, + ref.digest.len, + hash); + } + return (uint16_t)(hash % shard_count); +} + static bool amduat_asl_store_index_fs_find_next_snapshot_id( amduat_asl_store_index_fs_t *fs, amduat_asl_snapshot_id_t *out_next_id) { - char *log_path; - amduat_asl_log_record_t *records; - size_t record_count; - size_t i; - amduat_asl_snapshot_id_t max_id; - amduat_asl_store_error_t err; + amduat_asl_snapshot_id_t latest; if (fs == NULL || out_next_id == NULL) { return false; } - if (!amduat_asl_store_index_fs_layout_build_log_path(fs->root_path, - &log_path)) { - return false; + if (!amduat_asl_store_index_fs_find_latest_snapshot_id(fs, &latest)) { + *out_next_id = 1u; + return true; } - records = NULL; - record_count = 0u; - err = amduat_asl_store_index_fs_load_log(log_path, &records, &record_count); - free(log_path); - if (err != AMDUAT_ASL_STORE_OK) { - amduat_asl_store_index_fs_log_free(records, record_count); + if (latest == UINT64_MAX) { return false; } - - max_id = 0u; - for (i = 0; i < record_count; ++i) { - const amduat_asl_log_record_t *record = &records[i]; - if (record->record_type != AMDUAT_ASL_LOG_RECORD_SNAPSHOT_ANCHOR) { - continue; - } - if (record->payload.len < 8u || record->payload.data == NULL) { - continue; - } - amduat_asl_snapshot_id_t snapshot_id = - amduat_asl_store_index_fs_load_u64_le(record->payload.data); - if (snapshot_id > max_id) { - max_id = snapshot_id; - } - } - - amduat_asl_store_index_fs_log_free(records, record_count); - if (max_id == UINT64_MAX) { - return false; - } - *out_next_id = max_id + 1u; + *out_next_id = latest + 1u; return true; } @@ -806,7 +1811,8 @@ static void amduat_asl_store_index_fs_maybe_snapshot_size( } static amduat_asl_store_error_t -amduat_asl_store_index_fs_read_next_segment_id(const char *root_path, +amduat_asl_store_index_fs_read_next_segment_id(amduat_asl_store_index_fs_t *fs, + uint16_t shard_id, uint64_t *out_segment_id) { char *meta_path; uint8_t *meta_bytes; @@ -816,13 +1822,14 @@ amduat_asl_store_index_fs_read_next_segment_id(const char *root_path, uint64_t next_id; size_t trimmed_len; - if (out_segment_id == NULL) { + if (fs == NULL || out_segment_id == NULL) { return AMDUAT_ASL_STORE_ERR_IO; } *out_segment_id = 0u; - if (!amduat_asl_store_index_fs_layout_build_segment_meta_path(root_path, - &meta_path)) { + if (!amduat_asl_store_index_fs_build_segment_meta_path(fs, + shard_id, + &meta_path)) { return AMDUAT_ASL_STORE_ERR_IO; } @@ -892,7 +1899,8 @@ amduat_asl_store_index_fs_read_next_segment_id(const char *root_path, } static amduat_asl_store_error_t -amduat_asl_store_index_fs_write_next_segment_id(const char *root_path, +amduat_asl_store_index_fs_write_next_segment_id(amduat_asl_store_index_fs_t *fs, + uint16_t shard_id, uint64_t next_id) { char *meta_path; char *segments_path; @@ -900,16 +1908,21 @@ amduat_asl_store_index_fs_write_next_segment_id(const char *root_path, int len; amduat_asl_store_index_fs_write_status_t status; + if (fs == NULL) { + return AMDUAT_ASL_STORE_ERR_IO; + } if (next_id == 0u) { return AMDUAT_ASL_STORE_ERR_IO; } - if (!amduat_asl_store_index_fs_layout_build_segment_meta_path(root_path, - &meta_path)) { + if (!amduat_asl_store_index_fs_build_segment_meta_path(fs, + shard_id, + &meta_path)) { return AMDUAT_ASL_STORE_ERR_IO; } - if (!amduat_asl_store_index_fs_layout_build_segments_path(root_path, - &segments_path)) { + if (!amduat_asl_store_index_fs_build_segments_path(fs, + shard_id, + &segments_path)) { free(meta_path); return AMDUAT_ASL_STORE_ERR_IO; } @@ -1080,7 +2093,7 @@ static bool amduat_asl_store_index_fs_build_bloom( } static amduat_asl_store_error_t amduat_asl_store_index_fs_read_extent_bytes( - const char *root_path, + amduat_asl_store_index_fs_t *fs, const amduat_asl_extent_record_t *extent, uint8_t *out_cursor, size_t remaining) { @@ -1088,14 +2101,20 @@ static amduat_asl_store_error_t amduat_asl_store_index_fs_read_extent_bytes( uint8_t *block_bytes; size_t block_len; amduat_asl_store_index_fs_read_status_t status; + uint16_t shard_id; - if (extent == NULL || out_cursor == NULL) { + if (fs == NULL || extent == NULL || out_cursor == NULL) { return AMDUAT_ASL_STORE_ERR_IO; } - if (!amduat_asl_store_index_fs_layout_build_block_path(root_path, - extent->block_id, - &block_path)) { + shard_id = amduat_asl_store_index_fs_segment_shard(extent->block_id); + if (fs->shard_count <= 1u) { + shard_id = 0u; + } + if (!amduat_asl_store_index_fs_build_block_path(fs, + shard_id, + extent->block_id, + &block_path)) { return AMDUAT_ASL_STORE_ERR_IO; } @@ -1133,7 +2152,7 @@ static amduat_asl_store_error_t amduat_asl_store_index_fs_read_extent_bytes( } static amduat_asl_store_error_t amduat_asl_store_index_fs_materialize_artifact( - const char *root_path, + amduat_asl_store_index_fs_t *fs, const amduat_asl_core_index_segment_t *segment, const amduat_asl_index_record_t *record, amduat_artifact_t *out_artifact) { @@ -1143,7 +2162,7 @@ static amduat_asl_store_error_t amduat_asl_store_index_fs_materialize_artifact( size_t i; size_t cursor; - if (segment == NULL || record == NULL || out_artifact == NULL) { + if (fs == NULL || segment == NULL || record == NULL || out_artifact == NULL) { return AMDUAT_ASL_STORE_ERR_IO; } if ((record->flags & AMDUAT_ASL_INDEX_FLAG_TOMBSTONE) != 0) { @@ -1183,7 +2202,7 @@ static amduat_asl_store_error_t amduat_asl_store_index_fs_materialize_artifact( const amduat_asl_extent_record_t *extent = &segment->extents[extents_base + i]; amduat_asl_store_error_t err = amduat_asl_store_index_fs_read_extent_bytes( - root_path, extent, artifact_bytes + cursor, artifact_len - cursor); + fs, extent, artifact_bytes + cursor, artifact_len - cursor); if (err != AMDUAT_ASL_STORE_OK) { free(artifact_bytes); return err; @@ -1206,16 +2225,23 @@ static amduat_asl_store_error_t amduat_asl_store_index_fs_materialize_artifact( } static amduat_asl_store_error_t amduat_asl_store_index_fs_scan_segments( - const char *root_path, + amduat_asl_store_index_fs_t *fs, const amduat_asl_replay_state_t *replay_state, amduat_reference_t ref, amduat_artifact_t *out_artifact) { size_t i; + uint16_t target_shard; - if (replay_state == NULL || out_artifact == NULL) { + if (fs == NULL || replay_state == NULL || out_artifact == NULL) { return AMDUAT_ASL_STORE_ERR_IO; } + if (fs->shard_count <= 1u) { + target_shard = 0u; + } else { + target_shard = amduat_asl_store_index_fs_ref_shard(ref, fs->shard_count); + } + if (amduat_asl_store_index_fs_is_tombstoned(replay_state, ref)) { return AMDUAT_ASL_STORE_ERR_NOT_FOUND; } @@ -1228,10 +2254,20 @@ static amduat_asl_store_error_t amduat_asl_store_index_fs_scan_segments( uint8_t segment_hash[AMDUAT_ASL_STORE_INDEX_FS_SEGMENT_HASH_LEN]; amduat_asl_store_error_t err; size_t r; + uint16_t shard_id = amduat_asl_store_index_fs_segment_shard( + seal->segment_id); - if (!amduat_asl_store_index_fs_layout_build_segment_path(root_path, - seal->segment_id, - &segment_path)) { + if (fs->shard_count <= 1u) { + shard_id = 0u; + } + if (shard_id != target_shard) { + continue; + } + + if (!amduat_asl_store_index_fs_build_segment_path(fs, + shard_id, + seal->segment_id, + &segment_path)) { return AMDUAT_ASL_STORE_ERR_IO; } @@ -1261,7 +2297,7 @@ static amduat_asl_store_error_t amduat_asl_store_index_fs_scan_segments( if (!amduat_asl_store_index_fs_record_matches(&segment, record, ref)) { continue; } - err = amduat_asl_store_index_fs_materialize_artifact(root_path, + err = amduat_asl_store_index_fs_materialize_artifact(fs, &segment, record, out_artifact); @@ -1317,6 +2353,8 @@ static bool amduat_asl_store_index_fs_current_state_impl( uint8_t root_hash[32]; amduat_asl_log_position_t anchor_logseq; amduat_asl_replay_state_t replay_state; + amduat_asl_snapshot_manifest_t manifest; + uint8_t manifest_hash[32]; if (ctx == NULL || out_state == NULL) { return false; @@ -1343,14 +2381,25 @@ static bool amduat_asl_store_index_fs_current_state_impl( last_logseq = records[record_count - 1u].logseq; } - if (record_count != 0u && - amduat_asl_store_index_fs_find_latest_anchor(records, - record_count, - last_logseq, - &anchor_index, - &snapshot_id, - root_hash, - &anchor_logseq)) { + out_state->snapshot_id = 0u; + out_state->log_position = last_logseq; + + if (amduat_asl_store_index_fs_find_latest_snapshot_id(fs, &snapshot_id) && + amduat_asl_store_index_fs_load_snapshot_manifest( + fs->root_path, snapshot_id, &manifest, manifest_hash)) { + out_state->snapshot_id = manifest.snapshot_id; + if (manifest.anchor_logseq > out_state->log_position) { + out_state->log_position = manifest.anchor_logseq; + } + amduat_asl_snapshot_manifest_free(&manifest); + } else if (record_count != 0u && + amduat_asl_store_index_fs_find_latest_anchor(records, + record_count, + last_logseq, + &anchor_index, + &snapshot_id, + root_hash, + &anchor_logseq)) { if (!amduat_asl_store_index_fs_load_snapshot_replay(fs->root_path, snapshot_id, root_hash, @@ -1366,10 +2415,10 @@ static bool amduat_asl_store_index_fs_current_state_impl( } amduat_asl_replay_free(&replay_state); out_state->snapshot_id = snapshot_id; - } else { - out_state->snapshot_id = 0u; + if (anchor_logseq > out_state->log_position) { + out_state->log_position = anchor_logseq; + } } - out_state->log_position = last_logseq; amduat_enc_asl_log_free(records, record_count); return true; } @@ -1386,9 +2435,7 @@ static amduat_asl_store_error_t amduat_asl_store_index_fs_get_indexed_impl( size_t record_count; amduat_asl_replay_state_t replay_state; amduat_asl_store_error_t err; - size_t anchor_index; amduat_asl_snapshot_id_t snapshot_id; - uint8_t root_hash[32]; amduat_asl_log_position_t anchor_logseq; size_t replay_start; @@ -1430,54 +2477,39 @@ static amduat_asl_store_error_t amduat_asl_store_index_fs_get_indexed_impl( replay_start = 0u; if (state.snapshot_id != 0u) { - if (amduat_asl_store_index_fs_find_anchor_for_snapshot( - records, - record_count, - state.snapshot_id, - state.log_position, - &anchor_index, - root_hash, - &anchor_logseq)) { - if (!amduat_asl_store_index_fs_load_snapshot_replay(fs->root_path, - state.snapshot_id, - root_hash, - &replay_state, - &anchor_logseq)) { - amduat_enc_asl_log_free(records, record_count); - return AMDUAT_ASL_STORE_ERR_INTEGRITY; - } - if (replay_state.state.log_position != anchor_logseq) { - amduat_asl_replay_free(&replay_state); - amduat_enc_asl_log_free(records, record_count); - return AMDUAT_ASL_STORE_ERR_INTEGRITY; - } - replay_start = anchor_index + 1u; - } else { - amduat_enc_asl_log_free(records, record_count); - return AMDUAT_ASL_STORE_ERR_UNSUPPORTED; - } - } else if (record_count != 0u && - amduat_asl_store_index_fs_find_latest_anchor(records, - record_count, - state.log_position, - &anchor_index, - &snapshot_id, - root_hash, - &anchor_logseq)) { if (!amduat_asl_store_index_fs_load_snapshot_replay(fs->root_path, - snapshot_id, - root_hash, + state.snapshot_id, + NULL, &replay_state, &anchor_logseq)) { amduat_enc_asl_log_free(records, record_count); return AMDUAT_ASL_STORE_ERR_INTEGRITY; } - if (replay_state.state.log_position != anchor_logseq) { + if (state.log_position < anchor_logseq || + replay_state.state.log_position != anchor_logseq) { amduat_asl_replay_free(&replay_state); amduat_enc_asl_log_free(records, record_count); return AMDUAT_ASL_STORE_ERR_INTEGRITY; } - replay_start = anchor_index + 1u; + replay_start = amduat_asl_store_index_fs_find_log_start(records, + record_count, + anchor_logseq); + } else if (amduat_asl_store_index_fs_find_latest_snapshot_id(fs, + &snapshot_id) && + amduat_asl_store_index_fs_load_snapshot_replay(fs->root_path, + snapshot_id, + NULL, + &replay_state, + &anchor_logseq)) { + if (state.log_position < anchor_logseq || + replay_state.state.log_position != anchor_logseq) { + amduat_asl_replay_free(&replay_state); + amduat_enc_asl_log_free(records, record_count); + return AMDUAT_ASL_STORE_ERR_INTEGRITY; + } + replay_start = amduat_asl_store_index_fs_find_log_start(records, + record_count, + anchor_logseq); } else { if (!amduat_asl_replay_init(&replay_state)) { amduat_enc_asl_log_free(records, record_count); @@ -1494,7 +2526,7 @@ static amduat_asl_store_error_t amduat_asl_store_index_fs_get_indexed_impl( return AMDUAT_ASL_STORE_ERR_INTEGRITY; } - err = amduat_asl_store_index_fs_scan_segments(fs->root_path, + err = amduat_asl_store_index_fs_scan_segments(fs, &replay_state, ref, out_artifact); @@ -1521,7 +2553,9 @@ static amduat_asl_store_error_t amduat_asl_store_index_fs_put_indexed_impl( char *segment_path; char *block_path; char *log_path; + uint16_t shard_id; uint64_t segment_id; + uint64_t local_segment_id; amduat_asl_core_index_segment_t segment; amduat_asl_index_record_t record; amduat_asl_extent_record_t extent; @@ -1553,6 +2587,12 @@ static amduat_asl_store_error_t amduat_asl_store_index_fs_put_indexed_impl( &artifact_bytes)) { return AMDUAT_ASL_STORE_ERR_IO; } + if (fs->shard_count == 0u) { + shard_id = 0u; + } else { + shard_id = amduat_asl_store_index_fs_ref_shard(derived_ref, + fs->shard_count); + } if (!amduat_asl_store_index_fs_current_state_impl(ctx, ¤t_state)) { amduat_reference_free(&derived_ref); @@ -1581,7 +2621,8 @@ static amduat_asl_store_error_t amduat_asl_store_index_fs_put_indexed_impl( index_path = NULL; segments_path = NULL; blocks_path = NULL; - if (!amduat_asl_store_index_fs_prepare_dirs(fs->root_path, + if (!amduat_asl_store_index_fs_prepare_dirs(fs, + shard_id, &index_path, &segments_path, &blocks_path)) { @@ -1593,8 +2634,9 @@ static amduat_asl_store_error_t amduat_asl_store_index_fs_put_indexed_impl( return AMDUAT_ASL_STORE_ERR_IO; } - err = amduat_asl_store_index_fs_read_next_segment_id(fs->root_path, - &segment_id); + err = amduat_asl_store_index_fs_read_next_segment_id(fs, + shard_id, + &local_segment_id); if (err != AMDUAT_ASL_STORE_OK) { amduat_reference_free(&derived_ref); amduat_octets_free(&artifact_bytes); @@ -1603,7 +2645,7 @@ static amduat_asl_store_error_t amduat_asl_store_index_fs_put_indexed_impl( free(blocks_path); return err; } - if (segment_id == UINT64_MAX) { + if (local_segment_id == UINT64_MAX) { amduat_reference_free(&derived_ref); amduat_octets_free(&artifact_bytes); free(index_path); @@ -1612,8 +2654,9 @@ static amduat_asl_store_error_t amduat_asl_store_index_fs_put_indexed_impl( return AMDUAT_ASL_STORE_ERR_IO; } - err = amduat_asl_store_index_fs_write_next_segment_id(fs->root_path, - segment_id + 1u); + err = amduat_asl_store_index_fs_write_next_segment_id(fs, + shard_id, + local_segment_id + 1u); if (err != AMDUAT_ASL_STORE_OK) { amduat_reference_free(&derived_ref); amduat_octets_free(&artifact_bytes); @@ -1623,9 +2666,21 @@ static amduat_asl_store_error_t amduat_asl_store_index_fs_put_indexed_impl( return err; } - if (!amduat_asl_store_index_fs_layout_build_block_path(fs->root_path, - segment_id, - &block_path)) { + if (!amduat_asl_store_index_fs_make_segment_id(shard_id, + local_segment_id, + &segment_id)) { + amduat_reference_free(&derived_ref); + amduat_octets_free(&artifact_bytes); + free(index_path); + free(segments_path); + free(blocks_path); + return AMDUAT_ASL_STORE_ERR_IO; + } + + if (!amduat_asl_store_index_fs_build_block_path(fs, + shard_id, + segment_id, + &block_path)) { amduat_reference_free(&derived_ref); amduat_octets_free(&artifact_bytes); free(index_path); @@ -1730,9 +2785,10 @@ static amduat_asl_store_error_t amduat_asl_store_index_fs_put_indexed_impl( return AMDUAT_ASL_STORE_ERR_IO; } - if (!amduat_asl_store_index_fs_layout_build_segment_path(fs->root_path, - segment_id, - &segment_path)) { + if (!amduat_asl_store_index_fs_build_segment_path(fs, + shard_id, + segment_id, + &segment_path)) { amduat_reference_free(&derived_ref); amduat_octets_free(&artifact_bytes); amduat_octets_free(&segment_bytes); @@ -1918,15 +2974,12 @@ amduat_asl_store_error_t amduat_asl_store_index_fs_snapshot_create( char *snapshots_path; char *log_path; char *manifest_path; - amduat_asl_log_record_t *log_records; - size_t log_count; uint64_t last_logseq; uint64_t new_logseq; amduat_asl_replay_state_t replay_state; amduat_asl_snapshot_manifest_t manifest; uint8_t root_hash[32]; - uint8_t *payload; - size_t payload_len; + uint8_t last_hash[AMDUAT_ASL_STORE_INDEX_FS_LOG_HASH_LEN]; if (fs == NULL) { return AMDUAT_ASL_STORE_ERR_IO; @@ -1987,48 +3040,33 @@ amduat_asl_store_error_t amduat_asl_store_index_fs_snapshot_create( return AMDUAT_ASL_STORE_ERR_IO; } - log_records = NULL; - log_count = 0u; - err = amduat_asl_store_index_fs_load_log(log_path, &log_records, &log_count); + if (!amduat_asl_replay_init(&replay_state)) { + free(index_path); + free(log_path); + free(manifest_path); + return AMDUAT_ASL_STORE_ERR_IO; + } + + err = amduat_asl_store_index_fs_stream_log_apply(log_path, + &replay_state, + &last_logseq, + last_hash); if (err != AMDUAT_ASL_STORE_OK) { - amduat_asl_store_index_fs_log_free(log_records, log_count); + amduat_asl_replay_free(&replay_state); free(index_path); free(log_path); free(manifest_path); return err; } - last_logseq = 0u; - if (log_count != 0u) { - last_logseq = log_records[log_count - 1u].logseq; - } if (last_logseq == UINT64_MAX) { - amduat_asl_store_index_fs_log_free(log_records, log_count); - free(index_path); - free(log_path); - free(manifest_path); - return AMDUAT_ASL_STORE_ERR_IO; - } - new_logseq = (log_count == 0u) ? 1u : last_logseq + 1u; - - if (!amduat_asl_replay_init(&replay_state)) { - amduat_asl_store_index_fs_log_free(log_records, log_count); - free(index_path); - free(log_path); - free(manifest_path); - return AMDUAT_ASL_STORE_ERR_IO; - } - if (!amduat_asl_replay_apply_log(log_records, - log_count, - last_logseq, - &replay_state)) { - amduat_asl_store_index_fs_log_free(log_records, log_count); amduat_asl_replay_free(&replay_state); free(index_path); free(log_path); free(manifest_path); - return AMDUAT_ASL_STORE_ERR_INTEGRITY; + return AMDUAT_ASL_STORE_ERR_IO; } + new_logseq = last_logseq + 1u; memset(&manifest, 0, sizeof(manifest)); manifest.snapshot_id = snapshot_id; @@ -2042,7 +3080,6 @@ amduat_asl_store_error_t amduat_asl_store_index_fs_snapshot_create( if (!amduat_asl_snapshot_manifest_write(manifest_path, &manifest, root_hash)) { - amduat_asl_store_index_fs_log_free(log_records, log_count); amduat_asl_replay_free(&replay_state); free(index_path); free(log_path); @@ -2050,72 +3087,42 @@ amduat_asl_store_error_t amduat_asl_store_index_fs_snapshot_create( return AMDUAT_ASL_STORE_ERR_IO; } - if (log_count == 0u) { - log_records = (amduat_asl_log_record_t *)calloc(1u, - sizeof(*log_records)); - if (log_records == NULL) { - unlink(manifest_path); - amduat_asl_replay_free(&replay_state); - free(index_path); - free(log_path); - free(manifest_path); - return AMDUAT_ASL_STORE_ERR_IO; - } - } else { - amduat_asl_log_record_t *next = (amduat_asl_log_record_t *)realloc( - log_records, (log_count + 1u) * sizeof(*log_records)); - if (next == NULL) { - unlink(manifest_path); - amduat_asl_store_index_fs_log_free(log_records, log_count); - amduat_asl_replay_free(&replay_state); - free(index_path); - free(log_path); - free(manifest_path); - return AMDUAT_ASL_STORE_ERR_IO; - } - log_records = next; - } - - payload_len = 8u + 32u; - payload = (uint8_t *)malloc(payload_len); - if (payload == NULL) { - unlink(manifest_path); - amduat_asl_store_index_fs_log_free(log_records, log_count); - amduat_asl_replay_free(&replay_state); - free(index_path); - free(log_path); - free(manifest_path); - return AMDUAT_ASL_STORE_ERR_IO; - } - payload[0] = (uint8_t)(snapshot_id & 0xffu); - payload[1] = (uint8_t)((snapshot_id >> 8) & 0xffu); - payload[2] = (uint8_t)((snapshot_id >> 16) & 0xffu); - payload[3] = (uint8_t)((snapshot_id >> 24) & 0xffu); - payload[4] = (uint8_t)((snapshot_id >> 32) & 0xffu); - payload[5] = (uint8_t)((snapshot_id >> 40) & 0xffu); - payload[6] = (uint8_t)((snapshot_id >> 48) & 0xffu); - payload[7] = (uint8_t)((snapshot_id >> 56) & 0xffu); - memcpy(payload + 8, root_hash, 32u); - - log_records[log_count].logseq = new_logseq; - log_records[log_count].record_type = AMDUAT_ASL_LOG_RECORD_SNAPSHOT_ANCHOR; - log_records[log_count].payload = amduat_octets(payload, payload_len); - memset(log_records[log_count].record_hash, 0, - sizeof(log_records[log_count].record_hash)); - log_count += 1u; - - err = amduat_asl_store_index_fs_write_log(log_path, index_path, - log_records, log_count); - amduat_asl_store_index_fs_log_free(log_records, log_count); - amduat_asl_replay_free(&replay_state); - free(index_path); - free(log_path); + err = amduat_asl_store_index_fs_append_anchor_log(log_path, + index_path, + snapshot_id, + new_logseq, + root_hash, + last_hash); if (err != AMDUAT_ASL_STORE_OK) { unlink(manifest_path); + amduat_asl_replay_free(&replay_state); + free(index_path); + free(log_path); free(manifest_path); return err; } + { + amduat_asl_snapshot_manifest_t verify_manifest; + uint8_t verify_hash[32]; + bool manifest_ok = amduat_asl_store_index_fs_load_snapshot_manifest( + fs->root_path, snapshot_id, &verify_manifest, verify_hash); + + if (manifest_ok && + verify_manifest.anchor_logseq == new_logseq && + memcmp(verify_hash, root_hash, 32u) == 0) { + (void)amduat_asl_store_index_fs_truncate_log(log_path, + index_path, + new_logseq); + } + if (manifest_ok) { + amduat_asl_snapshot_manifest_free(&verify_manifest); + } + } + + amduat_asl_replay_free(&replay_state); + free(index_path); + free(log_path); if (out_logseq != NULL) { *out_logseq = new_logseq; } @@ -2152,6 +3159,7 @@ bool amduat_asl_store_index_fs_init(amduat_asl_store_index_fs_t *fs, AMDUAT_ASL_STORE_INDEX_FS_DEFAULT_PENDING_BYTES; fs->snapshot_policy.idle_time_ns = AMDUAT_ASL_STORE_INDEX_FS_DEFAULT_IDLE_NS; + fs->shard_count = 1u; fs->pending_snapshot_bytes = 0u; fs->last_ingest_time_ns = 0u; fs->next_snapshot_id = 0u; @@ -2168,6 +3176,18 @@ void amduat_asl_store_index_fs_set_snapshot_policy( fs->snapshot_policy = policy; } +void amduat_asl_store_index_fs_set_shard_count( + amduat_asl_store_index_fs_t *fs, + uint16_t shard_count) { + if (fs == NULL) { + return; + } + if (shard_count == 0u) { + shard_count = 1u; + } + fs->shard_count = shard_count; +} + amduat_asl_store_ops_t amduat_asl_store_index_fs_ops(void) { amduat_asl_store_ops_t ops; diff --git a/src/adapters/asl_store_index_fs/asl_store_index_fs_layout.c b/src/adapters/asl_store_index_fs/asl_store_index_fs_layout.c index 3f357a3..e6716d6 100644 --- a/src/adapters/asl_store_index_fs/asl_store_index_fs_layout.c +++ b/src/adapters/asl_store_index_fs/asl_store_index_fs_layout.c @@ -121,6 +121,155 @@ bool amduat_asl_store_index_fs_layout_build_log_path(const char *root_path, return ok; } +bool amduat_asl_store_index_fs_layout_build_shards_path( + const char *root_path, + char **out_path) { + char *index_path; + bool ok; + + if (!amduat_asl_store_index_fs_layout_build_index_path(root_path, + &index_path)) { + return false; + } + ok = amduat_asl_store_index_fs_layout_join(index_path, "shards", out_path); + free(index_path); + return ok; +} + +bool amduat_asl_store_index_fs_layout_build_shard_path( + const char *root_path, + uint16_t shard_id, + char **out_path) { + char *shards_path; + char *name; + bool ok; + + if (!amduat_asl_store_index_fs_layout_build_shards_path(root_path, + &shards_path)) { + return false; + } + if (!amduat_asl_store_index_fs_layout_format_id("shard-", + (uint64_t)shard_id, + "", + &name)) { + free(shards_path); + return false; + } + ok = amduat_asl_store_index_fs_layout_join(shards_path, name, out_path); + free(name); + free(shards_path); + return ok; +} + +bool amduat_asl_store_index_fs_layout_build_shard_segments_path( + const char *root_path, + uint16_t shard_id, + char **out_path) { + char *shard_path; + bool ok; + + if (!amduat_asl_store_index_fs_layout_build_shard_path(root_path, + shard_id, + &shard_path)) { + return false; + } + ok = amduat_asl_store_index_fs_layout_join(shard_path, "segments", out_path); + free(shard_path); + return ok; +} + +bool amduat_asl_store_index_fs_layout_build_shard_blocks_path( + const char *root_path, + uint16_t shard_id, + char **out_path) { + char *shard_path; + bool ok; + + if (!amduat_asl_store_index_fs_layout_build_shard_path(root_path, + shard_id, + &shard_path)) { + return false; + } + ok = amduat_asl_store_index_fs_layout_join(shard_path, "blocks", out_path); + free(shard_path); + return ok; +} + +bool amduat_asl_store_index_fs_layout_build_shard_segment_meta_path( + const char *root_path, + uint16_t shard_id, + char **out_path) { + char *segments_path; + bool ok; + + if (!amduat_asl_store_index_fs_layout_build_shard_segments_path(root_path, + shard_id, + &segments_path)) { + return false; + } + ok = amduat_asl_store_index_fs_layout_join(segments_path, + "next_id", + out_path); + free(segments_path); + return ok; +} + +bool amduat_asl_store_index_fs_layout_build_shard_segment_path( + const char *root_path, + uint16_t shard_id, + uint64_t segment_id, + char **out_path) { + char *segments_path; + char *name; + bool ok; + + if (!amduat_asl_store_index_fs_layout_build_shard_segments_path(root_path, + shard_id, + &segments_path)) { + return false; + } + if (!amduat_asl_store_index_fs_layout_format_id("segment-", + segment_id, + ".asl", + &name)) { + free(segments_path); + return false; + } + + ok = amduat_asl_store_index_fs_layout_join(segments_path, name, out_path); + free(name); + free(segments_path); + return ok; +} + +bool amduat_asl_store_index_fs_layout_build_shard_block_path( + const char *root_path, + uint16_t shard_id, + uint64_t block_id, + char **out_path) { + char *blocks_path; + char *name; + bool ok; + + if (!amduat_asl_store_index_fs_layout_build_shard_blocks_path(root_path, + shard_id, + &blocks_path)) { + return false; + } + if (!amduat_asl_store_index_fs_layout_format_id("block-", + block_id, + ".asl", + &name)) { + free(blocks_path); + return false; + } + + ok = amduat_asl_store_index_fs_layout_join(blocks_path, name, out_path); + free(name); + free(blocks_path); + return ok; +} + bool amduat_asl_store_index_fs_layout_build_snapshots_path( const char *root_path, char **out_path) { diff --git a/src/adapters/asl_store_index_fs/asl_store_index_fs_layout.h b/src/adapters/asl_store_index_fs/asl_store_index_fs_layout.h index c791ad5..4bddb7b 100644 --- a/src/adapters/asl_store_index_fs/asl_store_index_fs_layout.h +++ b/src/adapters/asl_store_index_fs/asl_store_index_fs_layout.h @@ -21,6 +21,42 @@ bool amduat_asl_store_index_fs_layout_build_blocks_path(const char *root_path, bool amduat_asl_store_index_fs_layout_build_log_path(const char *root_path, char **out_path); +bool amduat_asl_store_index_fs_layout_build_shards_path( + const char *root_path, + char **out_path); + +bool amduat_asl_store_index_fs_layout_build_shard_path( + const char *root_path, + uint16_t shard_id, + char **out_path); + +bool amduat_asl_store_index_fs_layout_build_shard_segments_path( + const char *root_path, + uint16_t shard_id, + char **out_path); + +bool amduat_asl_store_index_fs_layout_build_shard_blocks_path( + const char *root_path, + uint16_t shard_id, + char **out_path); + +bool amduat_asl_store_index_fs_layout_build_shard_segment_meta_path( + const char *root_path, + uint16_t shard_id, + char **out_path); + +bool amduat_asl_store_index_fs_layout_build_shard_segment_path( + const char *root_path, + uint16_t shard_id, + uint64_t segment_id, + char **out_path); + +bool amduat_asl_store_index_fs_layout_build_shard_block_path( + const char *root_path, + uint16_t shard_id, + uint64_t block_id, + char **out_path); + bool amduat_asl_store_index_fs_layout_build_snapshots_path( const char *root_path, char **out_path); diff --git a/src/tools/amduat_asl_cli.c b/src/tools/amduat_asl_cli.c index dde1311..4f059ee 100644 --- a/src/tools/amduat_asl_cli.c +++ b/src/tools/amduat_asl_cli.c @@ -42,6 +42,8 @@ typedef struct { const char *store_id; const char *profile; const char *hash; + uint16_t shard_count; + bool has_shard_count; bool force; bool quiet; } amduat_asl_cli_init_opts_t; @@ -97,7 +99,8 @@ static void amduat_asl_cli_print_usage(FILE *stream) { " amduat-asl log inspect [--root PATH]\n" " amduat-asl index init [--root PATH] [--store-id ID]\n" " [--profile PROFILE_ID|name]\n" - " [--hash HASH_ID|name] [--force] [--quiet]\n" + " [--hash HASH_ID|name] [--shards N]\n" + " [--force] [--quiet]\n" " amduat-asl index state [--root PATH]\n" " amduat-asl segment verify [--root PATH] [--segment ID]\n" "\n" @@ -274,6 +277,19 @@ static bool amduat_asl_cli_build_blocks_path(const char *root_path, return ok; } +static bool amduat_asl_cli_build_index_config_path(const char *root_path, + char **out_path) { + char *index_path; + bool ok; + + if (!amduat_asl_cli_build_index_path(root_path, &index_path)) { + return false; + } + ok = amduat_asl_cli_join_path(index_path, "config", out_path); + free(index_path); + return ok; +} + static bool amduat_asl_cli_build_segment_path(const char *root_path, uint64_t segment_id, char **out_path) { @@ -326,9 +342,134 @@ static bool amduat_asl_cli_is_index_store_root(const char *root_path) { return ok; } +static bool amduat_asl_cli_parse_u16(const char *value, uint16_t *out) { + char *endptr; + unsigned long parsed; + + if (value == NULL || out == NULL) { + return false; + } + parsed = strtoul(value, &endptr, 10); + if (endptr == value || *endptr != '\0') { + return false; + } + if (parsed == 0 || parsed > UINT16_MAX) { + return false; + } + *out = (uint16_t)parsed; + return true; +} + +static bool amduat_asl_cli_read_index_shards(const char *root_path, + uint16_t *out_shards) { + char *config_path; + uint8_t *bytes; + size_t len; + char *text; + char *token; + bool ok; + uint16_t shards; + + if (out_shards == NULL) { + return false; + } + *out_shards = 1u; + + if (!amduat_asl_cli_build_index_config_path(root_path, &config_path)) { + return false; + } + if (!amduat_asl_cli_path_is_file(config_path)) { + free(config_path); + return true; + } + bytes = NULL; + len = 0u; + if (!amduat_asl_read_path(config_path, &bytes, &len)) { + free(config_path); + return false; + } + free(config_path); + + text = (char *)malloc(len + 1u); + if (text == NULL) { + free(bytes); + return false; + } + memcpy(text, bytes, len); + text[len] = '\0'; + free(bytes); + + token = strtok(text, " \t\r\n"); + if (token == NULL || strcmp(token, "amduat-asl-index-v1") != 0) { + free(text); + return false; + } + + ok = false; + shards = 1u; + while ((token = strtok(NULL, " \t\r\n")) != NULL) { + if (strncmp(token, "shards=", 7u) == 0) { + if (!amduat_asl_cli_parse_u16(token + 7u, &shards)) { + free(text); + return false; + } + ok = true; + } + } + + free(text); + if (!ok) { + return false; + } + *out_shards = shards; + return true; +} + +static bool amduat_asl_cli_write_index_config(const char *root_path, + uint16_t shard_count, + bool force) { + char *config_path; + char buffer[128]; + int len; + uint16_t existing_shards; + + if (root_path == NULL || shard_count == 0u) { + return false; + } + if (!amduat_asl_cli_build_index_config_path(root_path, &config_path)) { + return false; + } + + if (!force && amduat_asl_cli_path_is_file(config_path)) { + if (!amduat_asl_cli_read_index_shards(root_path, &existing_shards)) { + free(config_path); + return false; + } + free(config_path); + return existing_shards == shard_count; + } + + len = snprintf(buffer, + sizeof(buffer), + "amduat-asl-index-v1\nshards=%" PRIu16 "\n", + shard_count); + if (len <= 0 || (size_t)len >= sizeof(buffer)) { + free(config_path); + return false; + } + if (!amduat_asl_write_path(config_path, (const uint8_t *)buffer, + (size_t)len)) { + free(config_path); + return false; + } + free(config_path); + return true; +} + static bool amduat_asl_cli_open_store(const char *root_path, amduat_asl_cli_store_ctx_t *out_ctx) { bool use_index; + uint16_t shard_count; if (root_path == NULL || out_ctx == NULL) { return false; @@ -342,11 +483,15 @@ static bool amduat_asl_cli_open_store(const char *root_path, use_index = amduat_asl_cli_is_index_store_root(root_path); out_ctx->is_index = use_index; if (use_index) { + if (!amduat_asl_cli_read_index_shards(root_path, &shard_count)) { + return false; + } if (!amduat_asl_store_index_fs_init(&out_ctx->index_fs, out_ctx->cfg.config, root_path)) { return false; } + amduat_asl_store_index_fs_set_shard_count(&out_ctx->index_fs, shard_count); amduat_asl_store_init(&out_ctx->store, out_ctx->cfg.config, amduat_asl_store_index_fs_ops(), @@ -1169,6 +1314,7 @@ static int amduat_asl_cli_cmd_index(int argc, char **argv) { if (strcmp(argv[0], "init") == 0) { memset(&opts, 0, sizeof(opts)); opts.root = AMDUAT_ASL_CLI_DEFAULT_ROOT; + opts.shard_count = 1u; for (i = 1; i < (size_t)argc; ++i) { if (strcmp(argv[i], "--root") == 0) { @@ -1195,6 +1341,16 @@ static int amduat_asl_cli_cmd_index(int argc, char **argv) { return AMDUAT_ASL_CLI_EXIT_USAGE; } opts.hash = argv[++i]; + } else if (strcmp(argv[i], "--shards") == 0) { + if (i + 1 >= (size_t)argc) { + fprintf(stderr, "error: --shards requires a value\n"); + return AMDUAT_ASL_CLI_EXIT_USAGE; + } + if (!amduat_asl_cli_parse_u16(argv[++i], &opts.shard_count)) { + fprintf(stderr, "error: invalid shard count\n"); + return AMDUAT_ASL_CLI_EXIT_USAGE; + } + opts.has_shard_count = true; } else if (strcmp(argv[i], "--force") == 0) { opts.force = true; } else if (strcmp(argv[i], "--quiet") == 0) { @@ -1277,6 +1433,13 @@ static int amduat_asl_cli_cmd_index(int argc, char **argv) { } } + if (!amduat_asl_cli_write_index_config(root, + opts.shard_count, + opts.force)) { + fprintf(stderr, "error: failed to write index config\n"); + return AMDUAT_ASL_CLI_EXIT_IO; + } + if (!amduat_asl_cli_write_empty_log(root)) { fprintf(stderr, "error: failed to initialize log file\n"); return AMDUAT_ASL_CLI_EXIT_IO; @@ -1319,6 +1482,14 @@ static int amduat_asl_cli_cmd_index(int argc, char **argv) { fprintf(stderr, "error: failed to initialize index store\n"); return AMDUAT_ASL_CLI_EXIT_CONFIG; } + { + uint16_t shard_count = 1u; + if (!amduat_asl_cli_read_index_shards(root, &shard_count)) { + fprintf(stderr, "error: failed to load index config\n"); + return AMDUAT_ASL_CLI_EXIT_CONFIG; + } + amduat_asl_store_index_fs_set_shard_count(&index_fs, shard_count); + } amduat_asl_store_init(&store, cfg.config, amduat_asl_store_index_fs_ops(), &index_fs); diff --git a/tests/asl/test_asl_store_index_fs.c b/tests/asl/test_asl_store_index_fs.c index 23a5b05..7850fd4 100644 --- a/tests/asl/test_asl_store_index_fs.c +++ b/tests/asl/test_asl_store_index_fs.c @@ -1,5 +1,6 @@ #include "amduat/asl/asl_store_index_fs.h" #include "amduat/asl/index_bloom.h" +#include "amduat/asl/ref_derive.h" #include "amduat/asl/store.h" #include "amduat/enc/asl1_core.h" #include "amduat/enc/asl_core_index.h" @@ -148,6 +149,22 @@ static bool read_file(const char *path, uint8_t **out_bytes, size_t *out_len) { return true; } +static bool build_log_path(const char *root, char **out_path) { + char *index_path = NULL; + bool ok; + + if (root == NULL || out_path == NULL) { + return false; + } + + if (!join_path(root, "index", &index_path)) { + return false; + } + ok = join_path(index_path, "log.asl", out_path); + free(index_path); + return ok; +} + static bool build_segment_path(const char *root, uint64_t segment_id, char **out_path) { @@ -180,6 +197,71 @@ static bool build_segment_path(const char *root, return true; } +static bool build_shard_segment_path(const char *root, + uint16_t shard_id, + uint64_t segment_id, + char **out_path) { + int needed; + char *buffer; + + if (root == NULL || out_path == NULL) { + return false; + } + + needed = snprintf(NULL, + 0, + "%s/index/shards/shard-%016" PRIx64 + "/segments/segment-%016" PRIx64 ".asl", + root, + (uint64_t)shard_id, + segment_id); + if (needed <= 0) { + return false; + } + + buffer = (char *)malloc((size_t)needed + 1u); + if (buffer == NULL) { + return false; + } + snprintf(buffer, + (size_t)needed + 1u, + "%s/index/shards/shard-%016" PRIx64 + "/segments/segment-%016" PRIx64 ".asl", + root, + (uint64_t)shard_id, + segment_id); + *out_path = buffer; + return true; +} + +static uint64_t fnv1a64(const uint8_t *data, size_t len, uint64_t seed) { + size_t i; + uint64_t hash = seed; + + for (i = 0; i < len; ++i) { + hash ^= data[i]; + hash *= 1099511628211ull; + } + return hash; +} + +static uint16_t ref_shard(amduat_reference_t ref, uint16_t shard_count) { + uint64_t hash; + uint8_t hash_id_bytes[2]; + + if (shard_count == 0u) { + return 0u; + } + hash_id_bytes[0] = (uint8_t)(ref.hash_id & 0xffu); + hash_id_bytes[1] = (uint8_t)((ref.hash_id >> 8) & 0xffu); + hash = fnv1a64(hash_id_bytes, sizeof(hash_id_bytes), + 14695981039346656037ull); + if (ref.digest.len != 0u && ref.digest.data != NULL) { + hash = fnv1a64(ref.digest.data, ref.digest.len, hash); + } + return (uint16_t)(hash % shard_count); +} + static char *make_temp_root(void) { char *templ; const char template_prefix[] = "/tmp/amduat_test_asl_store_index_fs_XXXXXX"; @@ -334,6 +416,264 @@ cleanup: return exit_code; } -int main(void) { - return test_round_trip(); +static int test_shard_routing(void) { + amduat_asl_store_config_t config; + amduat_asl_store_index_fs_t fs; + amduat_asl_store_t store; + char *root; + uint8_t payload_a[2] = {0x10, 0x11}; + uint8_t payload_b[2] = {0x20, 0x21}; + amduat_reference_t ref_a; + amduat_reference_t ref_b; + amduat_octets_t bytes_a; + amduat_octets_t bytes_b; + amduat_hash_id_t hash_id; + uint16_t shard_a; + uint16_t shard_b; + uint64_t local_counts[2] = {0u, 0u}; + int exit_code = 1; + size_t i; + bool found = false; + + root = make_temp_root(); + if (root == NULL) { + fprintf(stderr, "temp root failed\n"); + return 1; + } + + memset(&config, 0, sizeof(config)); + config.encoding_profile_id = AMDUAT_ENC_ASL1_CORE_V1; + config.hash_id = AMDUAT_HASH_ASL1_ID_SHA256; + if (!amduat_asl_store_index_fs_init(&fs, config, root)) { + fprintf(stderr, "index fs init failed\n"); + goto cleanup; + } + amduat_asl_store_index_fs_set_shard_count(&fs, 2); + amduat_asl_store_init(&store, config, amduat_asl_store_index_fs_ops(), &fs); + + hash_id = config.hash_id; + ref_a = amduat_reference(0u, amduat_octets(NULL, 0u)); + ref_b = amduat_reference(0u, amduat_octets(NULL, 0u)); + bytes_a = amduat_octets(NULL, 0u); + bytes_b = amduat_octets(NULL, 0u); + + for (i = 0; i < 256; ++i) { + payload_a[0] = (uint8_t)i; + payload_b[0] = (uint8_t)(255u - i); + amduat_reference_free(&ref_a); + amduat_reference_free(&ref_b); + amduat_octets_free(&bytes_a); + amduat_octets_free(&bytes_b); + if (!amduat_asl_ref_derive(amduat_artifact(amduat_octets(payload_a, + sizeof(payload_a))), + AMDUAT_ENC_ASL1_CORE_V1, + hash_id, + &ref_a, + &bytes_a) || + !amduat_asl_ref_derive(amduat_artifact(amduat_octets(payload_b, + sizeof(payload_b))), + AMDUAT_ENC_ASL1_CORE_V1, + hash_id, + &ref_b, + &bytes_b)) { + fprintf(stderr, "ref derive failed\n"); + goto cleanup; + } + shard_a = ref_shard(ref_a, 2); + shard_b = ref_shard(ref_b, 2); + if (shard_a != shard_b) { + found = true; + break; + } + } + + if (!found) { + fprintf(stderr, "shard selection failed\n"); + goto cleanup; + } + + { + amduat_reference_t stored_ref; + amduat_asl_index_state_t state; + uint64_t segment_id; + char *segment_path = NULL; + amduat_asl_store_error_t err; + + stored_ref = amduat_reference(0u, amduat_octets(NULL, 0u)); + err = amduat_asl_store_put_indexed(&store, + amduat_artifact(amduat_octets(payload_a, + sizeof(payload_a))), + &stored_ref, + &state); + if (err != AMDUAT_ASL_STORE_OK) { + fprintf(stderr, "put shard a failed\n"); + amduat_reference_free(&stored_ref); + goto cleanup; + } + amduat_reference_free(&stored_ref); + local_counts[shard_a] += 1u; + segment_id = ((uint64_t)shard_a << 48) | local_counts[shard_a]; + if (!build_shard_segment_path(root, shard_a, segment_id, &segment_path)) { + fprintf(stderr, "segment path failed\n"); + goto cleanup; + } + if (access(segment_path, F_OK) != 0) { + fprintf(stderr, "segment missing in shard a\n"); + free(segment_path); + goto cleanup; + } + free(segment_path); + } + + { + amduat_reference_t stored_ref; + amduat_asl_index_state_t state; + uint64_t segment_id; + char *segment_path = NULL; + amduat_asl_store_error_t err; + + stored_ref = amduat_reference(0u, amduat_octets(NULL, 0u)); + err = amduat_asl_store_put_indexed(&store, + amduat_artifact(amduat_octets(payload_b, + sizeof(payload_b))), + &stored_ref, + &state); + if (err != AMDUAT_ASL_STORE_OK) { + fprintf(stderr, "put shard b failed\n"); + amduat_reference_free(&stored_ref); + goto cleanup; + } + amduat_reference_free(&stored_ref); + local_counts[shard_b] += 1u; + segment_id = ((uint64_t)shard_b << 48) | local_counts[shard_b]; + if (!build_shard_segment_path(root, shard_b, segment_id, &segment_path)) { + fprintf(stderr, "segment path failed\n"); + goto cleanup; + } + if (access(segment_path, F_OK) != 0) { + fprintf(stderr, "segment missing in shard b\n"); + free(segment_path); + goto cleanup; + } + free(segment_path); + } + + exit_code = 0; + +cleanup: + amduat_reference_free(&ref_a); + amduat_reference_free(&ref_b); + amduat_octets_free(&bytes_a); + amduat_octets_free(&bytes_b); + if (!remove_tree(root)) { + fprintf(stderr, "cleanup failed\n"); + exit_code = 1; + } + free(root); + return exit_code; +} + +static int test_snapshot_truncation(void) { + amduat_asl_store_config_t config; + amduat_asl_store_index_fs_t fs; + amduat_asl_store_t store; + amduat_asl_store_error_t err; + amduat_asl_index_state_t state; + amduat_artifact_t artifact; + amduat_artifact_t loaded; + amduat_reference_t ref; + uint8_t payload[8]; + char *root; + char *log_path = NULL; + uint8_t *log_before = NULL; + uint8_t *log_after = NULL; + size_t log_before_len = 0u; + size_t log_after_len = 0u; + int exit_code = 1; + + root = make_temp_root(); + if (root == NULL) { + fprintf(stderr, "temp root failed\n"); + return 1; + } + + memset(&config, 0, sizeof(config)); + config.encoding_profile_id = AMDUAT_ENC_ASL1_CORE_V1; + config.hash_id = AMDUAT_HASH_ASL1_ID_SHA256; + if (!amduat_asl_store_index_fs_init(&fs, config, root)) { + fprintf(stderr, "index fs init failed\n"); + goto cleanup; + } + amduat_asl_store_init(&store, config, amduat_asl_store_index_fs_ops(), &fs); + + memset(payload, 0x33, sizeof(payload)); + artifact = amduat_artifact(amduat_octets(payload, sizeof(payload))); + ref = amduat_reference(0u, amduat_octets(NULL, 0u)); + err = amduat_asl_store_put_indexed(&store, artifact, &ref, &state); + if (err != AMDUAT_ASL_STORE_OK) { + fprintf(stderr, "put_indexed failed: %d\n", err); + goto cleanup; + } + + if (!build_log_path(root, &log_path) || + !read_file(log_path, &log_before, &log_before_len)) { + fprintf(stderr, "log read before snapshot failed\n"); + goto cleanup; + } + + err = amduat_asl_store_index_fs_snapshot_create(&fs, 1u, NULL, NULL); + if (err != AMDUAT_ASL_STORE_OK) { + fprintf(stderr, "snapshot create failed: %d\n", err); + goto cleanup; + } + + if (!read_file(log_path, &log_after, &log_after_len)) { + fprintf(stderr, "log read after snapshot failed\n"); + goto cleanup; + } + if (log_after_len >= log_before_len) { + fprintf(stderr, "log did not shrink after snapshot\n"); + goto cleanup; + } + + if (!amduat_asl_index_current_state(&store, &state)) { + fprintf(stderr, "current_state failed\n"); + goto cleanup; + } + loaded = amduat_artifact(amduat_octets(NULL, 0u)); + err = amduat_asl_store_get_indexed(&store, ref, state, &loaded); + if (err != AMDUAT_ASL_STORE_OK) { + fprintf(stderr, "get_indexed after snapshot failed: %d\n", err); + goto cleanup; + } + if (!amduat_artifact_eq(artifact, loaded)) { + fprintf(stderr, "artifact mismatch after snapshot\n"); + amduat_artifact_free(&loaded); + goto cleanup; + } + amduat_artifact_free(&loaded); + + exit_code = 0; + +cleanup: + free(log_path); + free(log_before); + free(log_after); + amduat_reference_free(&ref); + if (!remove_tree(root)) { + fprintf(stderr, "cleanup failed\n"); + exit_code = 1; + } + free(root); + return exit_code; +} + +int main(void) { + if (test_round_trip() != 0) { + return 1; + } + if (test_snapshot_truncation() != 0) { + return 1; + } + return test_shard_routing(); }