From a433f92f13be5193642cd514272ae0faba4aed08 Mon Sep 17 00:00:00 2001 From: Carl Niklas Rydberg Date: Sun, 8 Feb 2026 09:54:37 +0100 Subject: [PATCH] Fixed but not --- .../asl_store_index_fs/asl_store_index_fs.c | 11 +- src/core/asl_log_store.c | 111 +++++++++++------- tests/asl/test_asl_log_store_index_fs.c | 47 ++++++++ 3 files changed, 123 insertions(+), 46 deletions(-) diff --git a/src/adapters/asl_store_index_fs/asl_store_index_fs.c b/src/adapters/asl_store_index_fs/asl_store_index_fs.c index c46450c..155ebc8 100644 --- a/src/adapters/asl_store_index_fs/asl_store_index_fs.c +++ b/src/adapters/asl_store_index_fs/asl_store_index_fs.c @@ -3197,11 +3197,15 @@ static void amduat_asl_store_index_fs_maybe_snapshot_idle( if (fs->next_snapshot_id == 0u) { return; } + if (amduat_asl_store_index_fs_begin_write(fs) != AMDUAT_ASL_STORE_OK) { + return; + } if (amduat_asl_store_index_fs_snapshot_create( fs, fs->next_snapshot_id, NULL, NULL) == AMDUAT_ASL_STORE_OK) { fs->next_snapshot_id += 1u; fs->pending_snapshot_bytes = 0u; } + amduat_asl_store_index_fs_end_write(fs); } static void amduat_asl_store_index_fs_maybe_snapshot_size( @@ -3226,11 +3230,15 @@ static void amduat_asl_store_index_fs_maybe_snapshot_size( if (fs->next_snapshot_id == 0u) { return; } + if (amduat_asl_store_index_fs_begin_write(fs) != AMDUAT_ASL_STORE_OK) { + return; + } if (amduat_asl_store_index_fs_snapshot_create( fs, fs->next_snapshot_id, NULL, NULL) == AMDUAT_ASL_STORE_OK) { fs->next_snapshot_id += 1u; fs->pending_snapshot_bytes = 0u; } + amduat_asl_store_index_fs_end_write(fs); } static amduat_asl_store_error_t @@ -3794,7 +3802,6 @@ static bool amduat_asl_store_index_fs_current_state_impl( return false; } fs = (amduat_asl_store_index_fs_t *)ctx; - amduat_asl_store_index_fs_maybe_snapshot_idle(fs); if (!amduat_asl_store_index_fs_layout_build_log_path(fs->root_path, &log_path)) { @@ -3879,7 +3886,6 @@ static amduat_asl_store_error_t amduat_asl_store_index_fs_get_indexed_impl( return AMDUAT_ASL_STORE_ERR_IO; } fs = (amduat_asl_store_index_fs_t *)ctx; - amduat_asl_store_index_fs_maybe_snapshot_idle(fs); if (fs->config.encoding_profile_id != AMDUAT_ENC_ASL1_CORE_V1) { return AMDUAT_ASL_STORE_ERR_UNSUPPORTED; @@ -4090,7 +4096,6 @@ static amduat_asl_store_error_t amduat_asl_store_index_fs_log_scan_impl( return AMDUAT_ASL_STORE_ERR_IO; } fs = (amduat_asl_store_index_fs_t *)ctx; - amduat_asl_store_index_fs_maybe_snapshot_idle(fs); if (!amduat_asl_store_index_fs_layout_build_log_path(fs->root_path, &log_path)) { diff --git a/src/core/asl_log_store.c b/src/core/asl_log_store.c index 5a44112..31c8001 100644 --- a/src/core/asl_log_store.c +++ b/src/core/asl_log_store.c @@ -13,7 +13,7 @@ enum { AMDUAT_ASL_LOG_MAGIC_LEN = 8, AMDUAT_ASL_LOG_VERSION = 1, AMDUAT_ASL_LOG_CHUNK_MAX_ENTRIES = 1024u, - AMDUAT_ASL_LOG_MAX_RETRIES = 8u, + AMDUAT_ASL_LOG_MAX_RETRIES = 32u, AMDUAT_ASL_LOG_VERIFY_MAX_RETRIES = 6u }; @@ -107,6 +107,13 @@ static bool amduat_asl_log_add_size(size_t *acc, size_t add) { return true; } +static void amduat_asl_log_retry_pause(uint32_t attempt) { + volatile uint32_t spins = (attempt < 15u ? (attempt + 1u) : 16u) * 50000u; + while (spins > 0u) { + spins--; + } +} + void amduat_asl_log_chunk_free(amduat_asl_log_chunk_t *chunk) { if (chunk == NULL) { return; @@ -536,7 +543,6 @@ amduat_asl_store_error_t amduat_asl_log_append( for (uint32_t attempt = 0u; attempt < AMDUAT_ASL_LOG_MAX_RETRIES; ++attempt) { bool head_exists = false; - bool head_ref_missing = false; amduat_reference_t head_ref; amduat_artifact_t head_artifact; amduat_asl_log_chunk_t head_chunk; @@ -562,8 +568,18 @@ amduat_asl_store_error_t amduat_asl_log_append( &head_artifact); if (store_err != AMDUAT_ASL_STORE_OK) { if (store_err == AMDUAT_ASL_STORE_ERR_NOT_FOUND) { - /* Recover from a stale head pointer by rebasing to a fresh chunk. */ - head_ref_missing = true; + char *head_ref_hex = NULL; + (void)amduat_asl_ref_encode_hex(head_ref, &head_ref_hex); + amduat_log(AMDUAT_LOG_WARN, + "asl_log_append: head chunk missing; retrying append from fresh head read (log=%s pointer=%s attempt=%u head_ref=%s)", + log_name != NULL ? log_name : "(null)", + pointer_name != NULL ? pointer_name : "(null)", + (unsigned)attempt, + head_ref_hex != NULL ? head_ref_hex : "(hex-encode-failed)"); + free(head_ref_hex); + amduat_reference_free(&head_ref); + amduat_asl_log_retry_pause(attempt); + continue; } else { char *head_ref_hex = NULL; (void)amduat_asl_ref_encode_hex(head_ref, &head_ref_hex); @@ -580,48 +596,36 @@ amduat_asl_store_error_t amduat_asl_log_append( return store_err; } } - if (!head_ref_missing) { - if (!head_artifact.has_type_tag || - head_artifact.type_tag.tag_id != AMDUAT_TYPE_TAG_ASL_LOG_CHUNK_1) { - amduat_reference_free(&head_ref); - amduat_artifact_free(&head_artifact); - free(pointer_name); - return AMDUAT_ASL_STORE_ERR_INTEGRITY; - } - store_err = amduat_asl_log_chunk_decode_v1(head_artifact.bytes, - &head_chunk); + if (!head_artifact.has_type_tag || + head_artifact.type_tag.tag_id != AMDUAT_TYPE_TAG_ASL_LOG_CHUNK_1) { + amduat_reference_free(&head_ref); amduat_artifact_free(&head_artifact); - if (store_err != AMDUAT_ASL_STORE_OK) { - amduat_log(AMDUAT_LOG_ERROR, - "asl_log_append: decode head chunk failed (log=%s pointer=%s attempt=%u err=%d)", - log_name != NULL ? log_name : "(null)", - pointer_name != NULL ? pointer_name : "(null)", - (unsigned)attempt, - (int)store_err); - amduat_reference_free(&head_ref); - free(pointer_name); - return store_err; - } - if (head_chunk.base_offset > - UINT64_MAX - (uint64_t)head_chunk.entry_count) { - amduat_asl_log_chunk_free(&head_chunk); - amduat_reference_free(&head_ref); - free(pointer_name); - return AMDUAT_ASL_STORE_ERR_INTEGRITY; - } - base_offset = head_chunk.base_offset + head_chunk.entry_count; - amduat_asl_log_chunk_free(&head_chunk); - } else { - char *head_ref_hex = NULL; - (void)amduat_asl_ref_encode_hex(head_ref, &head_ref_hex); - amduat_log(AMDUAT_LOG_WARN, - "asl_log_append: stale head pointer missing chunk; rebasing log head (log=%s pointer=%s attempt=%u head_ref=%s)", + free(pointer_name); + return AMDUAT_ASL_STORE_ERR_INTEGRITY; + } + store_err = amduat_asl_log_chunk_decode_v1(head_artifact.bytes, + &head_chunk); + amduat_artifact_free(&head_artifact); + if (store_err != AMDUAT_ASL_STORE_OK) { + amduat_log(AMDUAT_LOG_ERROR, + "asl_log_append: decode head chunk failed (log=%s pointer=%s attempt=%u err=%d)", log_name != NULL ? log_name : "(null)", pointer_name != NULL ? pointer_name : "(null)", (unsigned)attempt, - head_ref_hex != NULL ? head_ref_hex : "(hex-encode-failed)"); - free(head_ref_hex); + (int)store_err); + amduat_reference_free(&head_ref); + free(pointer_name); + return store_err; } + if (head_chunk.base_offset > + UINT64_MAX - (uint64_t)head_chunk.entry_count) { + amduat_asl_log_chunk_free(&head_chunk); + amduat_reference_free(&head_ref); + free(pointer_name); + return AMDUAT_ASL_STORE_ERR_INTEGRITY; + } + base_offset = head_chunk.base_offset + head_chunk.entry_count; + amduat_asl_log_chunk_free(&head_chunk); } { @@ -636,7 +640,7 @@ amduat_asl_store_error_t amduat_asl_log_append( new_chunk.has_timestamp = has_timestamp; new_chunk.has_actor = has_actor; new_chunk.entries = (amduat_asl_log_entry_t *)entries; - if (head_exists && !head_ref_missing) { + if (head_exists) { new_chunk.has_prev = true; new_chunk.prev_ref = head_ref; } @@ -691,6 +695,25 @@ amduat_asl_store_error_t amduat_asl_log_append( verify_attempt++; } if (verify_err != AMDUAT_ASL_STORE_OK) { + if (verify_err == AMDUAT_ASL_STORE_ERR_NOT_FOUND) { + /* Retry from a fresh head read instead of failing hard; this avoids + * surfacing transient visibility windows as append failures. */ + amduat_log(AMDUAT_LOG_WARN, + "asl_log_append: store_put verify get missing; retrying append (log=%s pointer=%s attempt=%u verify_attempts=%u err=%d new_ref=%s)", + log_name != NULL ? log_name : "(null)", + pointer_name != NULL ? pointer_name : "(null)", + (unsigned)attempt, + (unsigned)(verify_attempt + 1u), + (int)verify_err, + new_ref_hex != NULL ? new_ref_hex : "(hex-encode-failed)"); + free(new_ref_hex); + amduat_reference_free(&new_ref); + if (head_exists) { + amduat_reference_free(&head_ref); + } + amduat_asl_log_retry_pause(attempt); + continue; + } amduat_log(AMDUAT_LOG_ERROR, "asl_log_append: store_put verify get failed (log=%s pointer=%s attempt=%u verify_attempts=%u err=%d new_ref=%s)", log_name != NULL ? log_name : "(null)", @@ -705,8 +728,9 @@ amduat_asl_store_error_t amduat_asl_log_append( } free(pointer_name); return AMDUAT_ASL_STORE_ERR_IO; + } else { + amduat_artifact_free(&verify_artifact); } - amduat_artifact_free(&verify_artifact); free(new_ref_hex); } @@ -741,6 +765,7 @@ amduat_asl_store_error_t amduat_asl_log_append( return AMDUAT_ASL_STORE_OK; } amduat_log(AMDUAT_LOG_DEBUG, "log append CAS retry"); + amduat_asl_log_retry_pause(attempt); } } } diff --git a/tests/asl/test_asl_log_store_index_fs.c b/tests/asl/test_asl_log_store_index_fs.c index 41c64ea..4ec7430 100644 --- a/tests/asl/test_asl_log_store_index_fs.c +++ b/tests/asl/test_asl_log_store_index_fs.c @@ -115,6 +115,7 @@ typedef struct { uint16_t kind; size_t iterations; amduat_asl_store_error_t first_err; + uint64_t *observed_offsets; } append_worker_t; typedef struct { @@ -172,6 +173,9 @@ static void *append_worker_main(void *opaque) { worker->first_err = err; return NULL; } + if (worker->observed_offsets != NULL && i < worker->iterations) { + worker->observed_offsets[i] = first_offset; + } } return NULL; @@ -247,6 +251,8 @@ int main(void) { bool started_b = false; bool started_mixed_append = false; bool started_mixed_put = false; + uint64_t worker_a_offsets[WORKER_ITERS]; + uint64_t worker_b_offsets[WORKER_ITERS]; char *root = NULL; int rc = 1; @@ -355,6 +361,7 @@ int main(void) { worker_a.log_name = log_name; worker_a.kind = 10u; worker_a.iterations = WORKER_ITERS; + worker_a.observed_offsets = worker_a_offsets; memset(&worker_b, 0, sizeof(worker_b)); worker_b.store = &store_b; @@ -362,6 +369,7 @@ int main(void) { worker_b.log_name = log_name; worker_b.kind = 11u; worker_b.iterations = WORKER_ITERS; + worker_b.observed_offsets = worker_b_offsets; if (pthread_create(&thread_a, NULL, append_worker_main, &worker_a) != 0) { fprintf(stderr, "pthread_create failed\n"); @@ -386,6 +394,45 @@ int main(void) { fprintf(stderr, "worker_b failed err=%d\n", (int)worker_b.first_err); goto cleanup; } + { + bool seen[2u + 2u * WORKER_ITERS]; + size_t i; + memset(seen, 0, sizeof(seen)); + for (i = 0u; i < WORKER_ITERS; ++i) { + if (worker_a_offsets[i] < 2u || + worker_a_offsets[i] >= (uint64_t)(2u + 2u * WORKER_ITERS)) { + fprintf(stderr, "worker_a offset out of range: %" PRIu64 "\n", + worker_a_offsets[i]); + goto cleanup; + } + if (seen[worker_a_offsets[i]]) { + fprintf(stderr, "duplicate offset from worker_a: %" PRIu64 "\n", + worker_a_offsets[i]); + goto cleanup; + } + seen[worker_a_offsets[i]] = true; + } + for (i = 0u; i < WORKER_ITERS; ++i) { + if (worker_b_offsets[i] < 2u || + worker_b_offsets[i] >= (uint64_t)(2u + 2u * WORKER_ITERS)) { + fprintf(stderr, "worker_b offset out of range: %" PRIu64 "\n", + worker_b_offsets[i]); + goto cleanup; + } + if (seen[worker_b_offsets[i]]) { + fprintf(stderr, "duplicate offset from worker_b: %" PRIu64 "\n", + worker_b_offsets[i]); + goto cleanup; + } + seen[worker_b_offsets[i]] = true; + } + for (i = 2u; i < (2u + 2u * WORKER_ITERS); ++i) { + if (!seen[i]) { + fprintf(stderr, "missing worker offset: %zu\n", i); + goto cleanup; + } + } + } if (amduat_asl_log_read(&log_a, log_name,