Implemented batch ingest API and tests

This commit is contained in:
Carl Niklas Rydberg 2025-12-21 21:31:55 +01:00
parent d3224b26ac
commit f83dc9c2bd
5 changed files with 298 additions and 29 deletions

View file

@ -89,6 +89,10 @@ typedef struct {
bool (*ingest_artifact)(void *ctx,
amduat_reference_t ref,
amduat_artifact_t artifact);
bool (*ingest_batch)(void *ctx,
const amduat_reference_t *refs,
const amduat_artifact_t *artifacts,
size_t len);
bool (*remove_artifact)(void *ctx, amduat_reference_t ref);
amduat_tgk_graph_error_t (*resolve_edge)(void *ctx,
amduat_reference_t ref,
@ -138,6 +142,11 @@ bool amduat_tgk_store_ingest_artifact(amduat_tgk_store_t *store,
amduat_reference_t ref,
amduat_artifact_t artifact);
bool amduat_tgk_store_ingest_batch(amduat_tgk_store_t *store,
const amduat_reference_t *refs,
const amduat_artifact_t *artifacts,
size_t len);
bool amduat_tgk_store_remove_artifact(amduat_tgk_store_t *store,
amduat_reference_t ref);

View file

@ -145,6 +145,20 @@ static bool amduat_tgk_store_fs_ingest_artifact(void *ctx,
return mem_ops.ingest_artifact(&fs->mem, ref, artifact);
}
static bool amduat_tgk_store_fs_ingest_batch(
void *ctx,
const amduat_reference_t *refs,
const amduat_artifact_t *artifacts,
size_t len) {
amduat_tgk_store_fs_t *fs = (amduat_tgk_store_fs_t *)ctx;
amduat_tgk_store_ops_t mem_ops = amduat_tgk_store_mem_ops();
if (fs == NULL || mem_ops.ingest_batch == NULL) {
return false;
}
return mem_ops.ingest_batch(&fs->mem, refs, artifacts, len);
}
static bool amduat_tgk_store_fs_remove_artifact(void *ctx,
amduat_reference_t ref) {
amduat_tgk_store_fs_t *fs = (amduat_tgk_store_fs_t *)ctx;
@ -385,6 +399,7 @@ amduat_tgk_store_ops_t amduat_tgk_store_fs_ops(void) {
ops.get_config = amduat_tgk_store_fs_get_config;
ops.snapshot_id = amduat_tgk_store_fs_snapshot_id;
ops.ingest_artifact = amduat_tgk_store_fs_ingest_artifact;
ops.ingest_batch = amduat_tgk_store_fs_ingest_batch;
ops.remove_artifact = amduat_tgk_store_fs_remove_artifact;
ops.resolve_edge = amduat_tgk_store_fs_resolve_edge;
ops.edges_from = amduat_tgk_store_fs_edges_from;

View file

@ -694,60 +694,138 @@ static bool amduat_tgk_store_mem_snapshot_id(
return true;
}
static bool amduat_tgk_store_mem_ingest_artifact(
void *ctx,
amduat_reference_t ref,
amduat_artifact_t artifact) {
amduat_tgk_store_mem_t *mem = (amduat_tgk_store_mem_t *)ctx;
size_t i;
size_t insert_at;
amduat_tgk_store_mem_artifact_t entry;
static bool amduat_tgk_store_mem_ingest_batch_impl(
amduat_tgk_store_mem_t *mem,
const amduat_reference_t *refs,
const amduat_artifact_t *artifacts,
size_t len) {
amduat_tgk_store_mem_artifact_t *pending = NULL;
size_t pending_len = 0;
size_t pending_cap = 0;
amduat_tgk_graph_edge_view_t *edges = NULL;
size_t edges_len = 0;
size_t i;
size_t j;
if (mem == NULL) {
return false;
}
if (!amduat_tgk_store_mem_hash_id_supported(mem, ref.hash_id)) {
return false;
if (len == 0) {
return true;
}
if (artifact.bytes.len != 0 && artifact.bytes.data == NULL) {
if (refs == NULL || artifacts == NULL) {
return false;
}
for (i = 0; i < mem->artifacts_len; ++i) {
if (!amduat_reference_eq(mem->artifacts[i].ref, ref)) {
for (i = 0; i < len; ++i) {
amduat_reference_t ref = refs[i];
amduat_artifact_t artifact = artifacts[i];
bool seen_duplicate = false;
if (!amduat_tgk_store_mem_hash_id_supported(mem, ref.hash_id)) {
goto cleanup;
}
if (artifact.bytes.len != 0 && artifact.bytes.data == NULL) {
goto cleanup;
}
for (j = 0; j < mem->artifacts_len; ++j) {
if (!amduat_reference_eq(mem->artifacts[j].ref, ref)) {
continue;
}
if (!amduat_tgk_store_mem_artifact_value_eq(&mem->artifacts[j],
artifact)) {
goto cleanup;
}
seen_duplicate = true;
break;
}
if (seen_duplicate) {
continue;
}
if (amduat_tgk_store_mem_artifact_value_eq(&mem->artifacts[i], artifact)) {
return true;
for (j = 0; j < pending_len; ++j) {
if (!amduat_reference_eq(pending[j].ref, ref)) {
continue;
}
if (!amduat_tgk_store_mem_artifact_value_eq(&pending[j], artifact)) {
goto cleanup;
}
seen_duplicate = true;
break;
}
return false;
if (seen_duplicate) {
continue;
}
if (pending_len == pending_cap) {
size_t new_cap = (pending_cap == 0) ? 4u : pending_cap * 2u;
amduat_tgk_store_mem_artifact_t *next =
(amduat_tgk_store_mem_artifact_t *)realloc(
pending, new_cap * sizeof(*pending));
if (next == NULL) {
goto cleanup;
}
pending = next;
pending_cap = new_cap;
}
if (!amduat_tgk_store_mem_artifact_clone(ref, artifact,
&pending[pending_len])) {
goto cleanup;
}
pending_len++;
}
if (mem->artifacts_len == mem->artifacts_cap) {
size_t new_cap = (mem->artifacts_cap == 0) ? 8u : mem->artifacts_cap * 2u;
if (pending_len == 0) {
free(pending);
return true;
}
if (mem->artifacts_len > SIZE_MAX - pending_len) {
goto cleanup;
}
if (mem->artifacts_len + pending_len > mem->artifacts_cap) {
size_t new_cap = mem->artifacts_cap == 0 ? pending_len
: mem->artifacts_cap;
while (new_cap < mem->artifacts_len + pending_len) {
if (new_cap > SIZE_MAX / 2) {
goto cleanup;
}
new_cap *= 2u;
}
if (new_cap < mem->artifacts_len + pending_len) {
new_cap = mem->artifacts_len + pending_len;
}
amduat_tgk_store_mem_artifact_t *next =
(amduat_tgk_store_mem_artifact_t *)realloc(
mem->artifacts, new_cap * sizeof(*mem->artifacts));
if (next == NULL) {
return false;
goto cleanup;
}
mem->artifacts = next;
mem->artifacts_cap = new_cap;
}
insert_at = mem->artifacts_len;
if (!amduat_tgk_store_mem_artifact_clone(ref, artifact, &entry)) {
return false;
}
mem->artifacts[insert_at] = entry;
mem->artifacts_len++;
{
size_t appended_len = pending_len;
size_t appended_start = mem->artifacts_len;
if (!amduat_tgk_store_mem_edges_build(mem, &edges, &edges_len)) {
mem->artifacts_len--;
amduat_tgk_store_mem_artifact_free(&mem->artifacts[insert_at]);
return false;
memcpy(mem->artifacts + appended_start, pending,
appended_len * sizeof(*pending));
mem->artifacts_len += appended_len;
free(pending);
pending = NULL;
pending_len = 0;
if (!amduat_tgk_store_mem_edges_build(mem, &edges, &edges_len)) {
for (i = 0; i < appended_len; ++i) {
amduat_tgk_store_mem_artifact_free(
&mem->artifacts[appended_start + i]);
}
mem->artifacts_len = appended_start;
return false;
}
}
amduat_tgk_store_mem_edge_view_list_free(mem->edges, mem->edges_len);
@ -755,6 +833,32 @@ static bool amduat_tgk_store_mem_ingest_artifact(
mem->edges_len = edges_len;
mem->snapshot_id++;
return true;
cleanup:
for (i = 0; i < pending_len; ++i) {
amduat_tgk_store_mem_artifact_free(&pending[i]);
}
free(pending);
return false;
}
static bool amduat_tgk_store_mem_ingest_artifact(
void *ctx,
amduat_reference_t ref,
amduat_artifact_t artifact) {
amduat_tgk_store_mem_t *mem = (amduat_tgk_store_mem_t *)ctx;
return amduat_tgk_store_mem_ingest_batch_impl(mem, &ref, &artifact, 1);
}
static bool amduat_tgk_store_mem_ingest_batch(
void *ctx,
const amduat_reference_t *refs,
const amduat_artifact_t *artifacts,
size_t len) {
amduat_tgk_store_mem_t *mem = (amduat_tgk_store_mem_t *)ctx;
return amduat_tgk_store_mem_ingest_batch_impl(mem, refs, artifacts, len);
}
static bool amduat_tgk_store_mem_remove_artifact(void *ctx,
@ -1187,6 +1291,7 @@ amduat_tgk_store_ops_t amduat_tgk_store_mem_ops(void) {
ops.get_config = amduat_tgk_store_mem_get_config;
ops.snapshot_id = amduat_tgk_store_mem_snapshot_id;
ops.ingest_artifact = amduat_tgk_store_mem_ingest_artifact;
ops.ingest_batch = amduat_tgk_store_mem_ingest_batch;
ops.remove_artifact = amduat_tgk_store_mem_remove_artifact;
ops.resolve_edge = amduat_tgk_store_mem_resolve_edge;
ops.edges_from = amduat_tgk_store_mem_edges_from;

View file

@ -48,6 +48,16 @@ bool amduat_tgk_store_ingest_artifact(amduat_tgk_store_t *store,
return store->ops.ingest_artifact(store->ctx, ref, artifact);
}
bool amduat_tgk_store_ingest_batch(amduat_tgk_store_t *store,
const amduat_reference_t *refs,
const amduat_artifact_t *artifacts,
size_t len) {
if (store == NULL || store->ops.ingest_batch == NULL) {
return false;
}
return store->ops.ingest_batch(store->ctx, refs, artifacts, len);
}
bool amduat_tgk_store_remove_artifact(amduat_tgk_store_t *store,
amduat_reference_t ref) {
if (store == NULL || store->ops.remove_artifact == NULL) {

View file

@ -918,6 +918,135 @@ cleanup:
return exit_code;
}
static int test_ingest_batch_epoch(void) {
amduat_tgk_store_mem_t mem;
amduat_tgk_store_t store;
amduat_tgk_store_config_t config;
amduat_tgk_identity_domain_t domains[1];
uint32_t edge_tags[1];
amduat_tgk_edge_type_id_t edge_types[1];
amduat_asl_encoding_profile_id_t encodings[1];
amduat_reference_t refs[2];
amduat_artifact_t artifacts[2];
amduat_octets_t edge_bytes;
amduat_reference_t node_a;
amduat_reference_t node_b;
amduat_reference_t payload;
uint8_t digest_ref1[32];
uint8_t digest_ref2[32];
uint8_t digest_a[32];
uint8_t digest_b[32];
uint8_t digest_payload[32];
amduat_tgk_edge_body_t edge;
amduat_reference_t edge_from[1];
amduat_reference_t edge_to[1];
amduat_tgk_snapshot_id_t epoch_before = 0;
amduat_tgk_snapshot_id_t epoch_after = 0;
amduat_tgk_graph_scan_result_t scan;
int exit_code = 1;
memset(&mem, 0, sizeof(mem));
memset(&config, 0, sizeof(config));
memset(&edge, 0, sizeof(edge));
edge_bytes = amduat_octets(NULL, 0);
domains[0].encoding_profile = AMDUAT_ENC_ASL1_CORE_V1;
domains[0].hash_id = AMDUAT_HASH_ASL1_ID_SHA256;
edge_tags[0] = TYPE_TAG_TGK1_EDGE_V1;
edge_types[0] = 0x10;
encodings[0] = TGK1_EDGE_ENC_V1;
config.id_space.domains = domains;
config.id_space.domains_len = 1;
config.tgk_profiles.edge_tags = edge_tags;
config.tgk_profiles.edge_tags_len = 1;
config.tgk_profiles.edge_types = edge_types;
config.tgk_profiles.edge_types_len = 1;
config.tgk_profiles.encodings = encodings;
config.tgk_profiles.encodings_len = 1;
if (!amduat_tgk_store_mem_init(&mem, config, NULL, 0)) {
fprintf(stderr, "batch epoch init failed\n");
return 1;
}
amduat_tgk_store_init(&store, config, amduat_tgk_store_mem_ops(), &mem);
if (!amduat_tgk_store_snapshot_id(&store, &epoch_before)) {
fprintf(stderr, "batch epoch get failed\n");
goto cleanup_mem;
}
refs[0] = make_ref(0x81, digest_ref1);
refs[1] = make_ref(0x82, digest_ref2);
node_a = make_ref(0xa1, digest_a);
node_b = make_ref(0xb1, digest_b);
payload = make_ref(0xe1, digest_payload);
edge.type = 0x10;
edge_from[0] = node_a;
edge.from = edge_from;
edge.from_len = 1;
edge_to[0] = node_b;
edge.to = edge_to;
edge.to_len = 1;
edge.payload = payload;
if (!amduat_enc_tgk1_edge_encode_v1(&edge, &edge_bytes)) {
fprintf(stderr, "batch epoch encode failed\n");
goto cleanup_mem;
}
artifacts[0] = amduat_artifact_with_type(
edge_bytes, amduat_type_tag(TYPE_TAG_TGK1_EDGE_V1));
artifacts[1] = artifacts[0];
if (!amduat_tgk_store_ingest_batch(&store, refs, artifacts, 2)) {
fprintf(stderr, "batch ingest failed\n");
goto cleanup_mem;
}
if (!amduat_tgk_store_snapshot_id(&store, &epoch_after)) {
fprintf(stderr, "batch epoch after ingest failed\n");
goto cleanup_mem;
}
if (epoch_after != epoch_before + 1u) {
fprintf(stderr, "batch epoch did not increment\n");
goto cleanup_mem;
}
if (!amduat_tgk_store_scan_edges(&store,
(amduat_tgk_edge_type_filter_t){0},
amduat_octets(NULL, 0), false, &scan)) {
fprintf(stderr, "batch scan failed\n");
goto cleanup_mem;
}
if (scan.edges.len != 2) {
fprintf(stderr, "batch scan count mismatch\n");
amduat_tgk_graph_scan_result_free(&scan);
goto cleanup_mem;
}
amduat_tgk_graph_scan_result_free(&scan);
if (!amduat_tgk_store_ingest_batch(&store, refs, artifacts, 2)) {
fprintf(stderr, "batch re-ingest failed\n");
goto cleanup_mem;
}
if (!amduat_tgk_store_snapshot_id(&store, &epoch_after)) {
fprintf(stderr, "batch epoch after re-ingest failed\n");
goto cleanup_mem;
}
if (epoch_after != epoch_before + 1u) {
fprintf(stderr, "batch epoch changed on no-op\n");
goto cleanup_mem;
}
exit_code = 0;
cleanup_mem:
amduat_tgk_store_mem_free(&mem);
free((void *)edge_bytes.data);
return exit_code;
}
static int test_resolve_edge_unsupported(const test_env_t *env) {
amduat_reference_t ref;
amduat_tgk_edge_body_t body;
@ -1134,6 +1263,7 @@ int main(void) {
test_duplicate_edge_ref_conflict() != 0 ||
test_scan_edges_pagination() != 0 ||
test_ingest_remove_epoch() != 0 ||
test_ingest_batch_epoch() != 0 ||
test_type_filter(&env) != 0 ||
test_ordering(&env) != 0 ||
test_adjacency(&env) != 0 ||