diff --git a/include/amduat/tgk/store.h b/include/amduat/tgk/store.h index f5022e8..1a16f41 100644 --- a/include/amduat/tgk/store.h +++ b/include/amduat/tgk/store.h @@ -22,6 +22,8 @@ typedef enum { typedef uint64_t amduat_tgk_snapshot_id_t; +typedef struct amduat_tgk_store_snapshot_t amduat_tgk_store_snapshot_t; + typedef struct { amduat_tgk_edge_type_id_t *types; size_t types_len; @@ -86,6 +88,7 @@ typedef struct { typedef struct { bool (*get_config)(void *ctx, amduat_tgk_store_config_t *out_config); bool (*snapshot_id)(void *ctx, amduat_tgk_snapshot_id_t *out_id); + bool (*snapshot)(void *ctx, amduat_tgk_store_snapshot_t *out_snapshot); bool (*ingest_artifact)(void *ctx, amduat_reference_t ref, amduat_artifact_t artifact); @@ -127,6 +130,11 @@ typedef struct { void *ctx; } amduat_tgk_store_t; +typedef struct amduat_tgk_store_snapshot_t { + amduat_tgk_store_t store; + void (*release)(void *ctx); +} amduat_tgk_store_snapshot_t; + void amduat_tgk_store_init(amduat_tgk_store_t *store, amduat_tgk_store_config_t config, amduat_tgk_store_ops_t ops, @@ -138,6 +146,11 @@ bool amduat_tgk_store_get_config(amduat_tgk_store_t *store, bool amduat_tgk_store_snapshot_id(amduat_tgk_store_t *store, amduat_tgk_snapshot_id_t *out_id); +/* Snapshot views are read-only; release with amduat_tgk_store_snapshot_free. */ +bool amduat_tgk_store_snapshot(amduat_tgk_store_t *store, + amduat_tgk_store_snapshot_t *out_snapshot); +void amduat_tgk_store_snapshot_free(amduat_tgk_store_snapshot_t *snapshot); + bool amduat_tgk_store_ingest_artifact(amduat_tgk_store_t *store, amduat_reference_t ref, amduat_artifact_t artifact); diff --git a/src/adapters/tgk_store_fs/tgk_store_fs.c b/src/adapters/tgk_store_fs/tgk_store_fs.c index c0b1070..4cc89eb 100644 --- a/src/adapters/tgk_store_fs/tgk_store_fs.c +++ b/src/adapters/tgk_store_fs/tgk_store_fs.c @@ -133,6 +133,18 @@ static bool amduat_tgk_store_fs_snapshot_id(void *ctx, return mem_ops.snapshot_id(&fs->mem, out_id); } +static bool amduat_tgk_store_fs_snapshot( + void *ctx, + amduat_tgk_store_snapshot_t *out_snapshot) { + amduat_tgk_store_fs_t *fs = (amduat_tgk_store_fs_t *)ctx; + amduat_tgk_store_ops_t mem_ops = amduat_tgk_store_mem_ops(); + + if (fs == NULL || mem_ops.snapshot == NULL) { + return false; + } + return mem_ops.snapshot(&fs->mem, out_snapshot); +} + static bool amduat_tgk_store_fs_ingest_artifact(void *ctx, amduat_reference_t ref, amduat_artifact_t artifact) { @@ -398,6 +410,7 @@ amduat_tgk_store_ops_t amduat_tgk_store_fs_ops(void) { memset(&ops, 0, sizeof(ops)); ops.get_config = amduat_tgk_store_fs_get_config; ops.snapshot_id = amduat_tgk_store_fs_snapshot_id; + ops.snapshot = amduat_tgk_store_fs_snapshot; ops.ingest_artifact = amduat_tgk_store_fs_ingest_artifact; ops.ingest_batch = amduat_tgk_store_fs_ingest_batch; ops.remove_artifact = amduat_tgk_store_fs_remove_artifact; diff --git a/src/adapters/tgk_store_mem/tgk_store_mem.c b/src/adapters/tgk_store_mem/tgk_store_mem.c index 9faf40b..2ef4cb0 100644 --- a/src/adapters/tgk_store_mem/tgk_store_mem.c +++ b/src/adapters/tgk_store_mem/tgk_store_mem.c @@ -694,6 +694,49 @@ static bool amduat_tgk_store_mem_snapshot_id( return true; } +static void amduat_tgk_store_mem_snapshot_release(void *ctx) { + amduat_tgk_store_mem_t *mem = (amduat_tgk_store_mem_t *)ctx; + + if (mem == NULL) { + return; + } + amduat_tgk_store_mem_free(mem); + free(mem); +} + +static bool amduat_tgk_store_mem_snapshot( + void *ctx, + amduat_tgk_store_snapshot_t *out_snapshot) { + amduat_tgk_store_mem_t *mem = (amduat_tgk_store_mem_t *)ctx; + amduat_tgk_store_mem_t *snapshot_mem; + amduat_tgk_store_ops_t ops; + + if (mem == NULL || out_snapshot == NULL) { + return false; + } + memset(out_snapshot, 0, sizeof(*out_snapshot)); + + snapshot_mem = (amduat_tgk_store_mem_t *)malloc(sizeof(*snapshot_mem)); + if (snapshot_mem == NULL) { + return false; + } + if (!amduat_tgk_store_mem_init(snapshot_mem, mem->config, mem->artifacts, + mem->artifacts_len)) { + free(snapshot_mem); + return false; + } + snapshot_mem->snapshot_id = mem->snapshot_id; + + ops = amduat_tgk_store_mem_ops(); + ops.snapshot = NULL; + ops.ingest_artifact = NULL; + ops.ingest_batch = NULL; + ops.remove_artifact = NULL; + amduat_tgk_store_init(&out_snapshot->store, mem->config, ops, snapshot_mem); + out_snapshot->release = amduat_tgk_store_mem_snapshot_release; + return true; +} + static bool amduat_tgk_store_mem_ingest_batch_impl( amduat_tgk_store_mem_t *mem, const amduat_reference_t *refs, @@ -1290,6 +1333,7 @@ amduat_tgk_store_ops_t amduat_tgk_store_mem_ops(void) { memset(&ops, 0, sizeof(ops)); ops.get_config = amduat_tgk_store_mem_get_config; ops.snapshot_id = amduat_tgk_store_mem_snapshot_id; + ops.snapshot = amduat_tgk_store_mem_snapshot; ops.ingest_artifact = amduat_tgk_store_mem_ingest_artifact; ops.ingest_batch = amduat_tgk_store_mem_ingest_batch; ops.remove_artifact = amduat_tgk_store_mem_remove_artifact; diff --git a/src/tgk_stack/store/store.c b/src/tgk_stack/store/store.c index faa5473..6818f8b 100644 --- a/src/tgk_stack/store/store.c +++ b/src/tgk_stack/store/store.c @@ -1,6 +1,7 @@ #include "amduat/tgk/store.h" #include +#include static void amduat_tgk_store_reference_free(amduat_reference_t *ref) { if (ref == NULL) { @@ -39,6 +40,24 @@ bool amduat_tgk_store_snapshot_id(amduat_tgk_store_t *store, return store->ops.snapshot_id(store->ctx, out_id); } +bool amduat_tgk_store_snapshot(amduat_tgk_store_t *store, + amduat_tgk_store_snapshot_t *out_snapshot) { + if (store == NULL || store->ops.snapshot == NULL || out_snapshot == NULL) { + return false; + } + return store->ops.snapshot(store->ctx, out_snapshot); +} + +void amduat_tgk_store_snapshot_free(amduat_tgk_store_snapshot_t *snapshot) { + if (snapshot == NULL) { + return; + } + if (snapshot->release != NULL) { + snapshot->release(snapshot->store.ctx); + } + memset(snapshot, 0, sizeof(*snapshot)); +} + bool amduat_tgk_store_ingest_artifact(amduat_tgk_store_t *store, amduat_reference_t ref, amduat_artifact_t artifact) { diff --git a/tests/tgk/test_tgk_store_fs.c b/tests/tgk/test_tgk_store_fs.c index fa3f7ed..092d50c 100644 --- a/tests/tgk/test_tgk_store_fs.c +++ b/tests/tgk/test_tgk_store_fs.c @@ -780,11 +780,255 @@ cleanup: return exit_code; } +static int test_snapshot_consistency(void) { + amduat_asl_store_config_t asl_config; + amduat_asl_store_fs_t asl_fs; + amduat_asl_store_t asl_store; + amduat_tgk_store_config_t tgk_config; + amduat_tgk_store_t store; + amduat_tgk_store_fs_t fs; + amduat_tgk_identity_domain_t domains[1]; + uint32_t edge_tags[1]; + amduat_tgk_edge_type_id_t edge_types[1]; + amduat_asl_encoding_profile_id_t encodings[1]; + amduat_octets_t edge_bytes[2]; + amduat_reference_t ref_edge1; + amduat_reference_t ref_edge2; + amduat_reference_t node_a; + amduat_reference_t node_b; + amduat_reference_t node_c; + amduat_reference_t payload1; + amduat_reference_t payload2; + uint8_t digest_node_a[32]; + uint8_t digest_node_b[32]; + uint8_t digest_node_c[32]; + uint8_t digest_payload1[32]; + uint8_t digest_payload2[32]; + uint8_t digest_edge2[32]; + amduat_tgk_edge_body_t edge1; + amduat_tgk_edge_body_t edge2; + amduat_reference_t edge1_from[1]; + amduat_reference_t edge1_to[1]; + amduat_reference_t edge2_from[1]; + amduat_reference_t edge2_to[1]; + amduat_tgk_store_snapshot_t snapshot; + amduat_tgk_snapshot_id_t snap_before = 0; + amduat_tgk_snapshot_id_t snap_after = 0; + amduat_tgk_snapshot_id_t store_epoch = 0; + amduat_tgk_graph_scan_result_t scan; + char *root = NULL; + char *manifest_path = NULL; + int exit_code = 1; + bool fs_ready = false; + + memset(&tgk_config, 0, sizeof(tgk_config)); + memset(&fs, 0, sizeof(fs)); + memset(&edge1, 0, sizeof(edge1)); + memset(&edge2, 0, sizeof(edge2)); + memset(&ref_edge1, 0, sizeof(ref_edge1)); + memset(&ref_edge2, 0, sizeof(ref_edge2)); + memset(&snapshot, 0, sizeof(snapshot)); + edge_bytes[0] = amduat_octets(NULL, 0); + edge_bytes[1] = amduat_octets(NULL, 0); + + root = make_temp_root(); + if (root == NULL) { + fprintf(stderr, "fs snapshot temp root failed\n"); + goto cleanup; + } + if (!join_path(root, "manifest.txt", &manifest_path)) { + fprintf(stderr, "fs snapshot manifest path failed\n"); + goto cleanup; + } + + asl_config.encoding_profile_id = AMDUAT_ENC_ASL1_CORE_V1; + asl_config.hash_id = AMDUAT_HASH_ASL1_ID_SHA256; + if (!amduat_asl_store_fs_init(&asl_fs, asl_config, root)) { + fprintf(stderr, "fs snapshot asl store fs init failed\n"); + goto cleanup; + } + amduat_asl_store_init(&asl_store, asl_config, amduat_asl_store_fs_ops(), + &asl_fs); + + domains[0].encoding_profile = AMDUAT_ENC_ASL1_CORE_V1; + domains[0].hash_id = AMDUAT_HASH_ASL1_ID_SHA256; + edge_tags[0] = TYPE_TAG_TGK1_EDGE_V1; + edge_types[0] = 0x10; + encodings[0] = TGK1_EDGE_ENC_V1; + + tgk_config.id_space.domains = domains; + tgk_config.id_space.domains_len = 1; + tgk_config.artifact_scope.description = amduat_octets(NULL, 0); + tgk_config.tgk_profiles.edge_tags = edge_tags; + tgk_config.tgk_profiles.edge_tags_len = 1; + tgk_config.tgk_profiles.edge_types = edge_types; + tgk_config.tgk_profiles.edge_types_len = 1; + tgk_config.tgk_profiles.encodings = encodings; + tgk_config.tgk_profiles.encodings_len = 1; + + node_a = make_ref(0xa1, digest_node_a); + node_b = make_ref(0xb1, digest_node_b); + node_c = make_ref(0xc1, digest_node_c); + payload1 = make_ref(0xe1, digest_payload1); + payload2 = make_ref(0xe2, digest_payload2); + ref_edge2 = make_ref(0x82, digest_edge2); + + edge1.type = 0x10; + edge1_from[0] = node_a; + edge1.from = edge1_from; + edge1.from_len = 1; + edge1_to[0] = node_b; + edge1.to = edge1_to; + edge1.to_len = 1; + edge1.payload = payload1; + + edge2.type = 0x10; + edge2_from[0] = node_a; + edge2.from = edge2_from; + edge2.from_len = 1; + edge2_to[0] = node_c; + edge2.to = edge2_to; + edge2.to_len = 1; + edge2.payload = payload2; + + if (!amduat_enc_tgk1_edge_encode_v1(&edge1, &edge_bytes[0]) || + !amduat_enc_tgk1_edge_encode_v1(&edge2, &edge_bytes[1])) { + fprintf(stderr, "fs snapshot edge encode failed\n"); + goto cleanup; + } + + if (amduat_asl_store_put( + &asl_store, + amduat_artifact_with_type(edge_bytes[0], + amduat_type_tag(TYPE_TAG_TGK1_EDGE_V1)), + &ref_edge1) != AMDUAT_ASL_STORE_OK) { + fprintf(stderr, "fs snapshot asl store put failed\n"); + goto cleanup; + } + + { + amduat_reference_t refs[1]; + refs[0] = ref_edge1; + if (!write_manifest(manifest_path, refs, 1)) { + fprintf(stderr, "fs snapshot manifest write failed\n"); + goto cleanup; + } + } + + if (!amduat_tgk_store_fs_init(&fs, tgk_config, manifest_path, + AMDUAT_FORMAT_REF_HEX, &asl_store)) { + fprintf(stderr, "fs snapshot tgk store fs init failed\n"); + goto cleanup; + } + fs_ready = true; + amduat_tgk_store_init(&store, tgk_config, amduat_tgk_store_fs_ops(), &fs); + + if (!amduat_tgk_store_snapshot(&store, &snapshot)) { + fprintf(stderr, "fs snapshot capture failed\n"); + goto cleanup; + } + if (!amduat_tgk_store_snapshot_id(&snapshot.store, &snap_before)) { + fprintf(stderr, "fs snapshot id failed\n"); + goto cleanup; + } + + if (!amduat_tgk_store_ingest_artifact( + &store, ref_edge2, + amduat_artifact_with_type(edge_bytes[1], + amduat_type_tag(TYPE_TAG_TGK1_EDGE_V1)))) { + fprintf(stderr, "fs snapshot ingest failed\n"); + goto cleanup; + } + if (!amduat_tgk_store_snapshot_id(&store, &store_epoch)) { + fprintf(stderr, "fs snapshot store epoch failed\n"); + goto cleanup; + } + if (!amduat_tgk_store_snapshot_id(&snapshot.store, &snap_after)) { + fprintf(stderr, "fs snapshot id recheck failed\n"); + goto cleanup; + } + if (snap_after != snap_before) { + fprintf(stderr, "fs snapshot epoch drifted\n"); + goto cleanup; + } + if (store_epoch == snap_before) { + fprintf(stderr, "fs snapshot store epoch did not advance\n"); + goto cleanup; + } + + if (!amduat_tgk_store_scan_edges(&snapshot.store, + (amduat_tgk_edge_type_filter_t){0}, + amduat_octets(NULL, 0), false, &scan)) { + fprintf(stderr, "fs snapshot scan failed\n"); + goto cleanup; + } + if (scan.edges.len != 1 || + !amduat_reference_eq(scan.edges.edges[0].edge_ref, ref_edge1)) { + fprintf(stderr, "fs snapshot scan mismatch\n"); + amduat_tgk_graph_scan_result_free(&scan); + goto cleanup; + } + amduat_tgk_graph_scan_result_free(&scan); + + if (!amduat_tgk_store_scan_edges(&store, + (amduat_tgk_edge_type_filter_t){0}, + amduat_octets(NULL, 0), false, &scan)) { + fprintf(stderr, "fs store scan failed\n"); + goto cleanup; + } + if (scan.edges.len != 2) { + fprintf(stderr, "fs store scan count mismatch\n"); + amduat_tgk_graph_scan_result_free(&scan); + goto cleanup; + } + { + size_t seen_edge1 = 0; + size_t seen_edge2 = 0; + size_t i; + for (i = 0; i < scan.edges.len; ++i) { + if (amduat_reference_eq(scan.edges.edges[i].edge_ref, ref_edge1)) { + seen_edge1++; + } else if (amduat_reference_eq(scan.edges.edges[i].edge_ref, + ref_edge2)) { + seen_edge2++; + } else { + fprintf(stderr, "fs store scan unexpected ref\n"); + amduat_tgk_graph_scan_result_free(&scan); + goto cleanup; + } + } + if (seen_edge1 != 1 || seen_edge2 != 1) { + fprintf(stderr, "fs store scan refs mismatch\n"); + amduat_tgk_graph_scan_result_free(&scan); + goto cleanup; + } + } + amduat_tgk_graph_scan_result_free(&scan); + + exit_code = 0; + +cleanup: + amduat_tgk_store_snapshot_free(&snapshot); + if (fs_ready) { + amduat_tgk_store_fs_free(&fs); + } + free((void *)edge_bytes[0].data); + free((void *)edge_bytes[1].data); + free_ref(&ref_edge1); + free(manifest_path); + if (root != NULL) { + remove_tree(root); + } + free(root); + return exit_code; +} + int main(void) { if (test_manifest_load() != 0 || test_init_rejects_duplicate_hash_id() != 0 || test_manifest_duplicate_refs() != 0 || - test_ingest_remove_epoch() != 0) { + test_ingest_remove_epoch() != 0 || + test_snapshot_consistency() != 0) { return 1; } return 0; diff --git a/tests/tgk/test_tgk_store_mem.c b/tests/tgk/test_tgk_store_mem.c index 0d74867..6fd7120 100644 --- a/tests/tgk/test_tgk_store_mem.c +++ b/tests/tgk/test_tgk_store_mem.c @@ -1047,6 +1047,91 @@ cleanup_mem: return exit_code; } +static int test_snapshot_consistency(void) { + test_env_t env; + amduat_tgk_store_snapshot_t snapshot; + amduat_tgk_snapshot_id_t snap_before = 0; + amduat_tgk_snapshot_id_t snap_after = 0; + amduat_tgk_snapshot_id_t store_epoch = 0; + amduat_tgk_graph_scan_result_t scan; + int exit_code = 1; + + memset(&snapshot, 0, sizeof(snapshot)); + if (!init_env(&env)) { + fprintf(stderr, "snapshot env init failed\n"); + return 1; + } + + if (!amduat_tgk_store_snapshot(&env.store, &snapshot)) { + fprintf(stderr, "snapshot capture failed\n"); + goto cleanup_env; + } + if (!amduat_tgk_store_snapshot_id(&snapshot.store, &snap_before)) { + fprintf(stderr, "snapshot id failed\n"); + goto cleanup_snapshot; + } + + if (!amduat_tgk_store_remove_artifact(&env.store, env.ref_edge3)) { + fprintf(stderr, "snapshot remove failed\n"); + goto cleanup_snapshot; + } + if (!amduat_tgk_store_snapshot_id(&env.store, &store_epoch)) { + fprintf(stderr, "snapshot store epoch failed\n"); + goto cleanup_snapshot; + } + if (!amduat_tgk_store_snapshot_id(&snapshot.store, &snap_after)) { + fprintf(stderr, "snapshot id recheck failed\n"); + goto cleanup_snapshot; + } + if (snap_after != snap_before) { + fprintf(stderr, "snapshot epoch drifted\n"); + goto cleanup_snapshot; + } + if (store_epoch == snap_before) { + fprintf(stderr, "store epoch did not advance\n"); + goto cleanup_snapshot; + } + + if (!amduat_tgk_store_scan_edges(&snapshot.store, + (amduat_tgk_edge_type_filter_t){0}, + amduat_octets(NULL, 0), false, &scan)) { + fprintf(stderr, "snapshot scan failed\n"); + goto cleanup_snapshot; + } + if (scan.edges.len != 3 || + !amduat_reference_eq(scan.edges.edges[0].edge_ref, env.ref_edge2) || + !amduat_reference_eq(scan.edges.edges[1].edge_ref, env.ref_edge1) || + !amduat_reference_eq(scan.edges.edges[2].edge_ref, env.ref_edge3)) { + fprintf(stderr, "snapshot scan mismatch\n"); + amduat_tgk_graph_scan_result_free(&scan); + goto cleanup_snapshot; + } + amduat_tgk_graph_scan_result_free(&scan); + + if (!amduat_tgk_store_scan_edges(&env.store, + (amduat_tgk_edge_type_filter_t){0}, + amduat_octets(NULL, 0), false, &scan)) { + fprintf(stderr, "store scan failed\n"); + goto cleanup_snapshot; + } + if (scan.edges.len != 2 || + !amduat_reference_eq(scan.edges.edges[0].edge_ref, env.ref_edge2) || + !amduat_reference_eq(scan.edges.edges[1].edge_ref, env.ref_edge1)) { + fprintf(stderr, "store scan mismatch\n"); + amduat_tgk_graph_scan_result_free(&scan); + goto cleanup_snapshot; + } + amduat_tgk_graph_scan_result_free(&scan); + + exit_code = 0; + +cleanup_snapshot: + amduat_tgk_store_snapshot_free(&snapshot); +cleanup_env: + free_env(&env); + return exit_code; +} + static int test_resolve_edge_unsupported(const test_env_t *env) { amduat_reference_t ref; amduat_tgk_edge_body_t body; @@ -1264,6 +1349,7 @@ int main(void) { test_scan_edges_pagination() != 0 || test_ingest_remove_epoch() != 0 || test_ingest_batch_epoch() != 0 || + test_snapshot_consistency() != 0 || test_type_filter(&env) != 0 || test_ordering(&env) != 0 || test_adjacency(&env) != 0 ||