From f83dc9c2bdbffd72e93c097bc55f6cecb1bf3825 Mon Sep 17 00:00:00 2001 From: Carl Niklas Rydberg Date: Sun, 21 Dec 2025 21:31:55 +0100 Subject: [PATCH] Implemented batch ingest API and tests --- include/amduat/tgk/store.h | 9 ++ src/adapters/tgk_store_fs/tgk_store_fs.c | 15 ++ src/adapters/tgk_store_mem/tgk_store_mem.c | 163 +++++++++++++++++---- src/tgk_stack/store/store.c | 10 ++ tests/tgk/test_tgk_store_mem.c | 130 ++++++++++++++++ 5 files changed, 298 insertions(+), 29 deletions(-) diff --git a/include/amduat/tgk/store.h b/include/amduat/tgk/store.h index a45df93..f5022e8 100644 --- a/include/amduat/tgk/store.h +++ b/include/amduat/tgk/store.h @@ -89,6 +89,10 @@ typedef struct { bool (*ingest_artifact)(void *ctx, amduat_reference_t ref, amduat_artifact_t artifact); + bool (*ingest_batch)(void *ctx, + const amduat_reference_t *refs, + const amduat_artifact_t *artifacts, + size_t len); bool (*remove_artifact)(void *ctx, amduat_reference_t ref); amduat_tgk_graph_error_t (*resolve_edge)(void *ctx, amduat_reference_t ref, @@ -138,6 +142,11 @@ bool amduat_tgk_store_ingest_artifact(amduat_tgk_store_t *store, amduat_reference_t ref, amduat_artifact_t artifact); +bool amduat_tgk_store_ingest_batch(amduat_tgk_store_t *store, + const amduat_reference_t *refs, + const amduat_artifact_t *artifacts, + size_t len); + bool amduat_tgk_store_remove_artifact(amduat_tgk_store_t *store, amduat_reference_t ref); diff --git a/src/adapters/tgk_store_fs/tgk_store_fs.c b/src/adapters/tgk_store_fs/tgk_store_fs.c index abab4b3..c0b1070 100644 --- a/src/adapters/tgk_store_fs/tgk_store_fs.c +++ b/src/adapters/tgk_store_fs/tgk_store_fs.c @@ -145,6 +145,20 @@ static bool amduat_tgk_store_fs_ingest_artifact(void *ctx, return mem_ops.ingest_artifact(&fs->mem, ref, artifact); } +static bool amduat_tgk_store_fs_ingest_batch( + void *ctx, + const amduat_reference_t *refs, + const amduat_artifact_t *artifacts, + size_t len) { + amduat_tgk_store_fs_t *fs = (amduat_tgk_store_fs_t *)ctx; + amduat_tgk_store_ops_t mem_ops = amduat_tgk_store_mem_ops(); + + if (fs == NULL || mem_ops.ingest_batch == NULL) { + return false; + } + return mem_ops.ingest_batch(&fs->mem, refs, artifacts, len); +} + static bool amduat_tgk_store_fs_remove_artifact(void *ctx, amduat_reference_t ref) { amduat_tgk_store_fs_t *fs = (amduat_tgk_store_fs_t *)ctx; @@ -385,6 +399,7 @@ amduat_tgk_store_ops_t amduat_tgk_store_fs_ops(void) { ops.get_config = amduat_tgk_store_fs_get_config; ops.snapshot_id = amduat_tgk_store_fs_snapshot_id; ops.ingest_artifact = amduat_tgk_store_fs_ingest_artifact; + ops.ingest_batch = amduat_tgk_store_fs_ingest_batch; ops.remove_artifact = amduat_tgk_store_fs_remove_artifact; ops.resolve_edge = amduat_tgk_store_fs_resolve_edge; ops.edges_from = amduat_tgk_store_fs_edges_from; diff --git a/src/adapters/tgk_store_mem/tgk_store_mem.c b/src/adapters/tgk_store_mem/tgk_store_mem.c index aab2793..9faf40b 100644 --- a/src/adapters/tgk_store_mem/tgk_store_mem.c +++ b/src/adapters/tgk_store_mem/tgk_store_mem.c @@ -694,60 +694,138 @@ static bool amduat_tgk_store_mem_snapshot_id( return true; } -static bool amduat_tgk_store_mem_ingest_artifact( - void *ctx, - amduat_reference_t ref, - amduat_artifact_t artifact) { - amduat_tgk_store_mem_t *mem = (amduat_tgk_store_mem_t *)ctx; - size_t i; - size_t insert_at; - amduat_tgk_store_mem_artifact_t entry; +static bool amduat_tgk_store_mem_ingest_batch_impl( + amduat_tgk_store_mem_t *mem, + const amduat_reference_t *refs, + const amduat_artifact_t *artifacts, + size_t len) { + amduat_tgk_store_mem_artifact_t *pending = NULL; + size_t pending_len = 0; + size_t pending_cap = 0; amduat_tgk_graph_edge_view_t *edges = NULL; size_t edges_len = 0; + size_t i; + size_t j; if (mem == NULL) { return false; } - if (!amduat_tgk_store_mem_hash_id_supported(mem, ref.hash_id)) { - return false; + if (len == 0) { + return true; } - if (artifact.bytes.len != 0 && artifact.bytes.data == NULL) { + if (refs == NULL || artifacts == NULL) { return false; } - for (i = 0; i < mem->artifacts_len; ++i) { - if (!amduat_reference_eq(mem->artifacts[i].ref, ref)) { + for (i = 0; i < len; ++i) { + amduat_reference_t ref = refs[i]; + amduat_artifact_t artifact = artifacts[i]; + bool seen_duplicate = false; + + if (!amduat_tgk_store_mem_hash_id_supported(mem, ref.hash_id)) { + goto cleanup; + } + if (artifact.bytes.len != 0 && artifact.bytes.data == NULL) { + goto cleanup; + } + + for (j = 0; j < mem->artifacts_len; ++j) { + if (!amduat_reference_eq(mem->artifacts[j].ref, ref)) { + continue; + } + if (!amduat_tgk_store_mem_artifact_value_eq(&mem->artifacts[j], + artifact)) { + goto cleanup; + } + seen_duplicate = true; + break; + } + if (seen_duplicate) { continue; } - if (amduat_tgk_store_mem_artifact_value_eq(&mem->artifacts[i], artifact)) { - return true; + + for (j = 0; j < pending_len; ++j) { + if (!amduat_reference_eq(pending[j].ref, ref)) { + continue; + } + if (!amduat_tgk_store_mem_artifact_value_eq(&pending[j], artifact)) { + goto cleanup; + } + seen_duplicate = true; + break; } - return false; + if (seen_duplicate) { + continue; + } + + if (pending_len == pending_cap) { + size_t new_cap = (pending_cap == 0) ? 4u : pending_cap * 2u; + amduat_tgk_store_mem_artifact_t *next = + (amduat_tgk_store_mem_artifact_t *)realloc( + pending, new_cap * sizeof(*pending)); + if (next == NULL) { + goto cleanup; + } + pending = next; + pending_cap = new_cap; + } + + if (!amduat_tgk_store_mem_artifact_clone(ref, artifact, + &pending[pending_len])) { + goto cleanup; + } + pending_len++; } - if (mem->artifacts_len == mem->artifacts_cap) { - size_t new_cap = (mem->artifacts_cap == 0) ? 8u : mem->artifacts_cap * 2u; + if (pending_len == 0) { + free(pending); + return true; + } + + if (mem->artifacts_len > SIZE_MAX - pending_len) { + goto cleanup; + } + if (mem->artifacts_len + pending_len > mem->artifacts_cap) { + size_t new_cap = mem->artifacts_cap == 0 ? pending_len + : mem->artifacts_cap; + while (new_cap < mem->artifacts_len + pending_len) { + if (new_cap > SIZE_MAX / 2) { + goto cleanup; + } + new_cap *= 2u; + } + if (new_cap < mem->artifacts_len + pending_len) { + new_cap = mem->artifacts_len + pending_len; + } amduat_tgk_store_mem_artifact_t *next = (amduat_tgk_store_mem_artifact_t *)realloc( mem->artifacts, new_cap * sizeof(*mem->artifacts)); if (next == NULL) { - return false; + goto cleanup; } mem->artifacts = next; mem->artifacts_cap = new_cap; } - insert_at = mem->artifacts_len; - if (!amduat_tgk_store_mem_artifact_clone(ref, artifact, &entry)) { - return false; - } - mem->artifacts[insert_at] = entry; - mem->artifacts_len++; + { + size_t appended_len = pending_len; + size_t appended_start = mem->artifacts_len; - if (!amduat_tgk_store_mem_edges_build(mem, &edges, &edges_len)) { - mem->artifacts_len--; - amduat_tgk_store_mem_artifact_free(&mem->artifacts[insert_at]); - return false; + memcpy(mem->artifacts + appended_start, pending, + appended_len * sizeof(*pending)); + mem->artifacts_len += appended_len; + free(pending); + pending = NULL; + pending_len = 0; + + if (!amduat_tgk_store_mem_edges_build(mem, &edges, &edges_len)) { + for (i = 0; i < appended_len; ++i) { + amduat_tgk_store_mem_artifact_free( + &mem->artifacts[appended_start + i]); + } + mem->artifacts_len = appended_start; + return false; + } } amduat_tgk_store_mem_edge_view_list_free(mem->edges, mem->edges_len); @@ -755,6 +833,32 @@ static bool amduat_tgk_store_mem_ingest_artifact( mem->edges_len = edges_len; mem->snapshot_id++; return true; + +cleanup: + for (i = 0; i < pending_len; ++i) { + amduat_tgk_store_mem_artifact_free(&pending[i]); + } + free(pending); + return false; +} + +static bool amduat_tgk_store_mem_ingest_artifact( + void *ctx, + amduat_reference_t ref, + amduat_artifact_t artifact) { + amduat_tgk_store_mem_t *mem = (amduat_tgk_store_mem_t *)ctx; + + return amduat_tgk_store_mem_ingest_batch_impl(mem, &ref, &artifact, 1); +} + +static bool amduat_tgk_store_mem_ingest_batch( + void *ctx, + const amduat_reference_t *refs, + const amduat_artifact_t *artifacts, + size_t len) { + amduat_tgk_store_mem_t *mem = (amduat_tgk_store_mem_t *)ctx; + + return amduat_tgk_store_mem_ingest_batch_impl(mem, refs, artifacts, len); } static bool amduat_tgk_store_mem_remove_artifact(void *ctx, @@ -1187,6 +1291,7 @@ amduat_tgk_store_ops_t amduat_tgk_store_mem_ops(void) { ops.get_config = amduat_tgk_store_mem_get_config; ops.snapshot_id = amduat_tgk_store_mem_snapshot_id; ops.ingest_artifact = amduat_tgk_store_mem_ingest_artifact; + ops.ingest_batch = amduat_tgk_store_mem_ingest_batch; ops.remove_artifact = amduat_tgk_store_mem_remove_artifact; ops.resolve_edge = amduat_tgk_store_mem_resolve_edge; ops.edges_from = amduat_tgk_store_mem_edges_from; diff --git a/src/tgk_stack/store/store.c b/src/tgk_stack/store/store.c index ea3df75..faa5473 100644 --- a/src/tgk_stack/store/store.c +++ b/src/tgk_stack/store/store.c @@ -48,6 +48,16 @@ bool amduat_tgk_store_ingest_artifact(amduat_tgk_store_t *store, return store->ops.ingest_artifact(store->ctx, ref, artifact); } +bool amduat_tgk_store_ingest_batch(amduat_tgk_store_t *store, + const amduat_reference_t *refs, + const amduat_artifact_t *artifacts, + size_t len) { + if (store == NULL || store->ops.ingest_batch == NULL) { + return false; + } + return store->ops.ingest_batch(store->ctx, refs, artifacts, len); +} + bool amduat_tgk_store_remove_artifact(amduat_tgk_store_t *store, amduat_reference_t ref) { if (store == NULL || store->ops.remove_artifact == NULL) { diff --git a/tests/tgk/test_tgk_store_mem.c b/tests/tgk/test_tgk_store_mem.c index f3ae3ba..0d74867 100644 --- a/tests/tgk/test_tgk_store_mem.c +++ b/tests/tgk/test_tgk_store_mem.c @@ -918,6 +918,135 @@ cleanup: return exit_code; } +static int test_ingest_batch_epoch(void) { + amduat_tgk_store_mem_t mem; + amduat_tgk_store_t store; + amduat_tgk_store_config_t config; + amduat_tgk_identity_domain_t domains[1]; + uint32_t edge_tags[1]; + amduat_tgk_edge_type_id_t edge_types[1]; + amduat_asl_encoding_profile_id_t encodings[1]; + amduat_reference_t refs[2]; + amduat_artifact_t artifacts[2]; + amduat_octets_t edge_bytes; + amduat_reference_t node_a; + amduat_reference_t node_b; + amduat_reference_t payload; + uint8_t digest_ref1[32]; + uint8_t digest_ref2[32]; + uint8_t digest_a[32]; + uint8_t digest_b[32]; + uint8_t digest_payload[32]; + amduat_tgk_edge_body_t edge; + amduat_reference_t edge_from[1]; + amduat_reference_t edge_to[1]; + amduat_tgk_snapshot_id_t epoch_before = 0; + amduat_tgk_snapshot_id_t epoch_after = 0; + amduat_tgk_graph_scan_result_t scan; + int exit_code = 1; + + memset(&mem, 0, sizeof(mem)); + memset(&config, 0, sizeof(config)); + memset(&edge, 0, sizeof(edge)); + edge_bytes = amduat_octets(NULL, 0); + + domains[0].encoding_profile = AMDUAT_ENC_ASL1_CORE_V1; + domains[0].hash_id = AMDUAT_HASH_ASL1_ID_SHA256; + edge_tags[0] = TYPE_TAG_TGK1_EDGE_V1; + edge_types[0] = 0x10; + encodings[0] = TGK1_EDGE_ENC_V1; + + config.id_space.domains = domains; + config.id_space.domains_len = 1; + config.tgk_profiles.edge_tags = edge_tags; + config.tgk_profiles.edge_tags_len = 1; + config.tgk_profiles.edge_types = edge_types; + config.tgk_profiles.edge_types_len = 1; + config.tgk_profiles.encodings = encodings; + config.tgk_profiles.encodings_len = 1; + + if (!amduat_tgk_store_mem_init(&mem, config, NULL, 0)) { + fprintf(stderr, "batch epoch init failed\n"); + return 1; + } + amduat_tgk_store_init(&store, config, amduat_tgk_store_mem_ops(), &mem); + + if (!amduat_tgk_store_snapshot_id(&store, &epoch_before)) { + fprintf(stderr, "batch epoch get failed\n"); + goto cleanup_mem; + } + + refs[0] = make_ref(0x81, digest_ref1); + refs[1] = make_ref(0x82, digest_ref2); + node_a = make_ref(0xa1, digest_a); + node_b = make_ref(0xb1, digest_b); + payload = make_ref(0xe1, digest_payload); + + edge.type = 0x10; + edge_from[0] = node_a; + edge.from = edge_from; + edge.from_len = 1; + edge_to[0] = node_b; + edge.to = edge_to; + edge.to_len = 1; + edge.payload = payload; + + if (!amduat_enc_tgk1_edge_encode_v1(&edge, &edge_bytes)) { + fprintf(stderr, "batch epoch encode failed\n"); + goto cleanup_mem; + } + + artifacts[0] = amduat_artifact_with_type( + edge_bytes, amduat_type_tag(TYPE_TAG_TGK1_EDGE_V1)); + artifacts[1] = artifacts[0]; + + if (!amduat_tgk_store_ingest_batch(&store, refs, artifacts, 2)) { + fprintf(stderr, "batch ingest failed\n"); + goto cleanup_mem; + } + if (!amduat_tgk_store_snapshot_id(&store, &epoch_after)) { + fprintf(stderr, "batch epoch after ingest failed\n"); + goto cleanup_mem; + } + if (epoch_after != epoch_before + 1u) { + fprintf(stderr, "batch epoch did not increment\n"); + goto cleanup_mem; + } + + if (!amduat_tgk_store_scan_edges(&store, + (amduat_tgk_edge_type_filter_t){0}, + amduat_octets(NULL, 0), false, &scan)) { + fprintf(stderr, "batch scan failed\n"); + goto cleanup_mem; + } + if (scan.edges.len != 2) { + fprintf(stderr, "batch scan count mismatch\n"); + amduat_tgk_graph_scan_result_free(&scan); + goto cleanup_mem; + } + amduat_tgk_graph_scan_result_free(&scan); + + if (!amduat_tgk_store_ingest_batch(&store, refs, artifacts, 2)) { + fprintf(stderr, "batch re-ingest failed\n"); + goto cleanup_mem; + } + if (!amduat_tgk_store_snapshot_id(&store, &epoch_after)) { + fprintf(stderr, "batch epoch after re-ingest failed\n"); + goto cleanup_mem; + } + if (epoch_after != epoch_before + 1u) { + fprintf(stderr, "batch epoch changed on no-op\n"); + goto cleanup_mem; + } + + exit_code = 0; + +cleanup_mem: + amduat_tgk_store_mem_free(&mem); + free((void *)edge_bytes.data); + return exit_code; +} + static int test_resolve_edge_unsupported(const test_env_t *env) { amduat_reference_t ref; amduat_tgk_edge_body_t body; @@ -1134,6 +1263,7 @@ int main(void) { test_duplicate_edge_ref_conflict() != 0 || test_scan_edges_pagination() != 0 || test_ingest_remove_epoch() != 0 || + test_ingest_batch_epoch() != 0 || test_type_filter(&env) != 0 || test_ordering(&env) != 0 || test_adjacency(&env) != 0 ||