diff --git a/include/amduat/tgk/store.h b/include/amduat/tgk/store.h index 5ba6278..a45df93 100644 --- a/include/amduat/tgk/store.h +++ b/include/amduat/tgk/store.h @@ -86,6 +86,10 @@ typedef struct { typedef struct { bool (*get_config)(void *ctx, amduat_tgk_store_config_t *out_config); bool (*snapshot_id)(void *ctx, amduat_tgk_snapshot_id_t *out_id); + bool (*ingest_artifact)(void *ctx, + amduat_reference_t ref, + amduat_artifact_t artifact); + bool (*remove_artifact)(void *ctx, amduat_reference_t ref); amduat_tgk_graph_error_t (*resolve_edge)(void *ctx, amduat_reference_t ref, amduat_tgk_edge_body_t *out_body); @@ -130,6 +134,13 @@ bool amduat_tgk_store_get_config(amduat_tgk_store_t *store, bool amduat_tgk_store_snapshot_id(amduat_tgk_store_t *store, amduat_tgk_snapshot_id_t *out_id); +bool amduat_tgk_store_ingest_artifact(amduat_tgk_store_t *store, + amduat_reference_t ref, + amduat_artifact_t artifact); + +bool amduat_tgk_store_remove_artifact(amduat_tgk_store_t *store, + amduat_reference_t ref); + amduat_tgk_graph_error_t amduat_tgk_store_resolve_edge( amduat_tgk_store_t *store, amduat_reference_t ref, diff --git a/include/amduat/tgk/tgk_store_mem.h b/include/amduat/tgk/tgk_store_mem.h index b1d3590..a41a708 100644 --- a/include/amduat/tgk/tgk_store_mem.h +++ b/include/amduat/tgk/tgk_store_mem.h @@ -21,13 +21,14 @@ typedef struct { /* Monotonic snapshot/epoch; increments when ingest/remove mutates state. */ amduat_tgk_snapshot_id_t snapshot_id; amduat_tgk_store_config_t config; - const amduat_tgk_store_mem_artifact_t *artifacts; + amduat_tgk_store_mem_artifact_t *artifacts; size_t artifacts_len; + size_t artifacts_cap; amduat_tgk_graph_edge_view_t *edges; size_t edges_len; } amduat_tgk_store_mem_t; -/* Artifacts are borrowed; decoded edges are owned by the store. */ +/* Artifacts and decoded edges are owned by the store. */ bool amduat_tgk_store_mem_init(amduat_tgk_store_mem_t *mem, amduat_tgk_store_config_t config, const amduat_tgk_store_mem_artifact_t *artifacts, diff --git a/src/adapters/tgk_store_fs/tgk_store_fs.c b/src/adapters/tgk_store_fs/tgk_store_fs.c index 16ad7ec..abab4b3 100644 --- a/src/adapters/tgk_store_fs/tgk_store_fs.c +++ b/src/adapters/tgk_store_fs/tgk_store_fs.c @@ -133,6 +133,29 @@ static bool amduat_tgk_store_fs_snapshot_id(void *ctx, return mem_ops.snapshot_id(&fs->mem, out_id); } +static bool amduat_tgk_store_fs_ingest_artifact(void *ctx, + amduat_reference_t ref, + amduat_artifact_t artifact) { + amduat_tgk_store_fs_t *fs = (amduat_tgk_store_fs_t *)ctx; + amduat_tgk_store_ops_t mem_ops = amduat_tgk_store_mem_ops(); + + if (fs == NULL || mem_ops.ingest_artifact == NULL) { + return false; + } + return mem_ops.ingest_artifact(&fs->mem, ref, artifact); +} + +static bool amduat_tgk_store_fs_remove_artifact(void *ctx, + amduat_reference_t ref) { + amduat_tgk_store_fs_t *fs = (amduat_tgk_store_fs_t *)ctx; + amduat_tgk_store_ops_t mem_ops = amduat_tgk_store_mem_ops(); + + if (fs == NULL || mem_ops.remove_artifact == NULL) { + return false; + } + return mem_ops.remove_artifact(&fs->mem, ref); +} + static amduat_tgk_graph_error_t amduat_tgk_store_fs_resolve_edge( void *ctx, amduat_reference_t ref, @@ -361,6 +384,8 @@ amduat_tgk_store_ops_t amduat_tgk_store_fs_ops(void) { memset(&ops, 0, sizeof(ops)); ops.get_config = amduat_tgk_store_fs_get_config; ops.snapshot_id = amduat_tgk_store_fs_snapshot_id; + ops.ingest_artifact = amduat_tgk_store_fs_ingest_artifact; + ops.remove_artifact = amduat_tgk_store_fs_remove_artifact; ops.resolve_edge = amduat_tgk_store_fs_resolve_edge; ops.edges_from = amduat_tgk_store_fs_edges_from; ops.edges_to = amduat_tgk_store_fs_edges_to; diff --git a/src/adapters/tgk_store_mem/tgk_store_mem.c b/src/adapters/tgk_store_mem/tgk_store_mem.c index a95dbc0..aab2793 100644 --- a/src/adapters/tgk_store_mem/tgk_store_mem.c +++ b/src/adapters/tgk_store_mem/tgk_store_mem.c @@ -9,6 +9,10 @@ enum { AMDUAT_TGK_STORE_MEM_SCAN_PAGE_SIZE = 256u }; static void amduat_tgk_store_mem_reference_free(amduat_reference_t *ref); +static amduat_tgk_graph_error_t amduat_tgk_store_mem_decode_edge( + const amduat_tgk_store_mem_t *mem, + const amduat_tgk_store_mem_artifact_t *entry, + amduat_tgk_edge_body_t *out_body); static bool amduat_tgk_store_mem_id_space_valid( amduat_tgk_id_space_config_t id_space) { @@ -252,6 +256,59 @@ static void amduat_tgk_store_mem_reference_free(amduat_reference_t *ref) { ref->digest.len = 0; } +static bool amduat_tgk_store_mem_artifact_clone( + amduat_reference_t ref, + amduat_artifact_t artifact, + amduat_tgk_store_mem_artifact_t *out) { + uint8_t *bytes_copy = NULL; + + if (out == NULL) { + return false; + } + memset(out, 0, sizeof(*out)); + if (!amduat_tgk_store_mem_reference_clone(ref, &out->ref)) { + return false; + } + if (artifact.bytes.len != 0) { + if (artifact.bytes.data == NULL) { + amduat_tgk_store_mem_reference_free(&out->ref); + return false; + } + bytes_copy = (uint8_t *)malloc(artifact.bytes.len); + if (bytes_copy == NULL) { + amduat_tgk_store_mem_reference_free(&out->ref); + return false; + } + memcpy(bytes_copy, artifact.bytes.data, artifact.bytes.len); + } + out->artifact.bytes = amduat_octets(bytes_copy, artifact.bytes.len); + out->artifact.has_type_tag = artifact.has_type_tag; + out->artifact.type_tag = artifact.type_tag; + return true; +} + +static void amduat_tgk_store_mem_artifact_free( + amduat_tgk_store_mem_artifact_t *artifact) { + if (artifact == NULL) { + return; + } + free((void *)artifact->artifact.bytes.data); + artifact->artifact.bytes.data = NULL; + artifact->artifact.bytes.len = 0; + artifact->artifact.has_type_tag = false; + artifact->artifact.type_tag.tag_id = 0; + amduat_tgk_store_mem_reference_free(&artifact->ref); +} + +static bool amduat_tgk_store_mem_artifact_value_eq( + const amduat_tgk_store_mem_artifact_t *entry, + amduat_artifact_t artifact) { + if (entry == NULL) { + return false; + } + return amduat_tgk_store_mem_artifact_eq(entry->artifact, artifact); +} + static bool amduat_tgk_store_mem_edge_body_clone( const amduat_tgk_edge_body_t *src, amduat_tgk_edge_body_t *dst) { @@ -322,6 +379,76 @@ static bool amduat_tgk_store_mem_edge_view_clone( return true; } +static void amduat_tgk_store_mem_edge_view_list_free( + amduat_tgk_graph_edge_view_t *edges, + size_t edges_len) { + size_t i; + + if (edges == NULL) { + return; + } + for (i = 0; i < edges_len; ++i) { + amduat_tgk_store_mem_reference_free(&edges[i].edge_ref); + amduat_tgk_edge_body_free(&edges[i].body); + } + free(edges); +} + +static bool amduat_tgk_store_mem_edges_build( + const amduat_tgk_store_mem_t *mem, + amduat_tgk_graph_edge_view_t **out_edges, + size_t *out_len) { + amduat_tgk_graph_edge_view_t *edges; + size_t count = 0; + size_t i; + + if (out_edges == NULL || out_len == NULL) { + return false; + } + *out_edges = NULL; + *out_len = 0; + + if (mem == NULL || mem->artifacts_len == 0) { + return true; + } + + edges = (amduat_tgk_graph_edge_view_t *)calloc( + mem->artifacts_len, sizeof(*edges)); + if (edges == NULL) { + return false; + } + + for (i = 0; i < mem->artifacts_len; ++i) { + amduat_tgk_edge_body_t body; + amduat_tgk_graph_error_t err; + const amduat_tgk_store_mem_artifact_t *entry = &mem->artifacts[i]; + + if (!amduat_tgk_store_mem_hash_id_supported(mem, entry->ref.hash_id)) { + continue; + } + err = amduat_tgk_store_mem_decode_edge(mem, entry, &body); + if (err != 0) { + continue; + } + if (!amduat_tgk_store_mem_reference_clone(entry->ref, + &edges[count].edge_ref)) { + amduat_tgk_edge_body_free(&body); + amduat_tgk_store_mem_edge_view_list_free(edges, count); + return false; + } + edges[count].body = body; + count++; + } + + if (count > 1) { + qsort(edges, count, sizeof(*edges), amduat_tgk_store_mem_edge_cmp); + } + + *out_edges = edges; + *out_len = count; + return true; +} + static const amduat_tgk_store_mem_artifact_t * amduat_tgk_store_mem_lookup_artifact(const amduat_tgk_store_mem_t *mem, amduat_reference_t ref, @@ -567,6 +694,123 @@ static bool amduat_tgk_store_mem_snapshot_id( return true; } +static bool amduat_tgk_store_mem_ingest_artifact( + void *ctx, + amduat_reference_t ref, + amduat_artifact_t artifact) { + amduat_tgk_store_mem_t *mem = (amduat_tgk_store_mem_t *)ctx; + size_t i; + size_t insert_at; + amduat_tgk_store_mem_artifact_t entry; + amduat_tgk_graph_edge_view_t *edges = NULL; + size_t edges_len = 0; + + if (mem == NULL) { + return false; + } + if (!amduat_tgk_store_mem_hash_id_supported(mem, ref.hash_id)) { + return false; + } + if (artifact.bytes.len != 0 && artifact.bytes.data == NULL) { + return false; + } + + for (i = 0; i < mem->artifacts_len; ++i) { + if (!amduat_reference_eq(mem->artifacts[i].ref, ref)) { + continue; + } + if (amduat_tgk_store_mem_artifact_value_eq(&mem->artifacts[i], artifact)) { + return true; + } + return false; + } + + if (mem->artifacts_len == mem->artifacts_cap) { + size_t new_cap = (mem->artifacts_cap == 0) ? 8u : mem->artifacts_cap * 2u; + amduat_tgk_store_mem_artifact_t *next = + (amduat_tgk_store_mem_artifact_t *)realloc( + mem->artifacts, new_cap * sizeof(*mem->artifacts)); + if (next == NULL) { + return false; + } + mem->artifacts = next; + mem->artifacts_cap = new_cap; + } + + insert_at = mem->artifacts_len; + if (!amduat_tgk_store_mem_artifact_clone(ref, artifact, &entry)) { + return false; + } + mem->artifacts[insert_at] = entry; + mem->artifacts_len++; + + if (!amduat_tgk_store_mem_edges_build(mem, &edges, &edges_len)) { + mem->artifacts_len--; + amduat_tgk_store_mem_artifact_free(&mem->artifacts[insert_at]); + return false; + } + + amduat_tgk_store_mem_edge_view_list_free(mem->edges, mem->edges_len); + mem->edges = edges; + mem->edges_len = edges_len; + mem->snapshot_id++; + return true; +} + +static bool amduat_tgk_store_mem_remove_artifact(void *ctx, + amduat_reference_t ref) { + amduat_tgk_store_mem_t *mem = (amduat_tgk_store_mem_t *)ctx; + amduat_tgk_store_mem_artifact_t removed; + amduat_tgk_graph_edge_view_t *edges = NULL; + size_t edges_len = 0; + size_t i; + size_t index = mem ? mem->artifacts_len : 0; + + if (mem == NULL) { + return false; + } + if (!amduat_tgk_store_mem_hash_id_supported(mem, ref.hash_id)) { + return false; + } + + for (i = 0; i < mem->artifacts_len; ++i) { + if (amduat_reference_eq(mem->artifacts[i].ref, ref)) { + index = i; + break; + } + } + if (index >= mem->artifacts_len) { + return true; + } + + if (!amduat_tgk_store_mem_artifact_clone(mem->artifacts[index].ref, + mem->artifacts[index].artifact, + &removed)) { + return false; + } + + amduat_tgk_store_mem_artifact_free(&mem->artifacts[index]); + memmove(&mem->artifacts[index], &mem->artifacts[index + 1], + (mem->artifacts_len - index - 1) * + sizeof(*mem->artifacts)); + mem->artifacts_len--; + + if (!amduat_tgk_store_mem_edges_build(mem, &edges, &edges_len)) { + memmove(&mem->artifacts[index + 1], &mem->artifacts[index], + (mem->artifacts_len - index) * sizeof(*mem->artifacts)); + mem->artifacts[index] = removed; + mem->artifacts_len++; + return false; + } + + amduat_tgk_store_mem_artifact_free(&removed); + amduat_tgk_store_mem_edge_view_list_free(mem->edges, mem->edges_len); + mem->edges = edges; + mem->edges_len = edges_len; + mem->snapshot_id++; + return true; +} + static amduat_tgk_graph_error_t amduat_tgk_store_mem_resolve_edge( void *ctx, amduat_reference_t ref, @@ -835,16 +1079,18 @@ bool amduat_tgk_store_mem_init(amduat_tgk_store_mem_t *mem, const amduat_tgk_store_mem_artifact_t *artifacts, size_t artifacts_len) { size_t i; - size_t count = 0; - amduat_tgk_graph_edge_view_t *edges; + size_t stored = 0; + amduat_tgk_graph_edge_view_t *edges = NULL; + size_t edges_len = 0; if (mem == NULL) { return false; } mem->snapshot_id = 0; mem->config = config; - mem->artifacts = artifacts; - mem->artifacts_len = artifacts_len; + mem->artifacts = NULL; + mem->artifacts_len = 0; + mem->artifacts_cap = 0; mem->edges = NULL; mem->edges_len = 0; @@ -869,58 +1115,49 @@ bool amduat_tgk_store_mem_init(amduat_tgk_store_mem_t *mem, return false; } - edges = (amduat_tgk_graph_edge_view_t *)calloc( - artifacts_len, sizeof(amduat_tgk_graph_edge_view_t)); - if (edges == NULL) { + mem->artifacts = (amduat_tgk_store_mem_artifact_t *)calloc( + artifacts_len, sizeof(*mem->artifacts)); + if (mem->artifacts == NULL) { return false; } + mem->artifacts_cap = artifacts_len; for (i = 0; i < artifacts_len; ++i) { - amduat_tgk_edge_body_t body; - amduat_tgk_graph_error_t err; const amduat_tgk_store_mem_artifact_t *entry = &artifacts[i]; bool duplicate = false; if (!amduat_tgk_store_mem_check_duplicate_artifact(artifacts, i, &duplicate)) { - goto cleanup; + goto cleanup_artifacts; } if (duplicate) { continue; } - if (!amduat_tgk_store_mem_hash_id_supported(mem, entry->ref.hash_id)) { - continue; + if (!amduat_tgk_store_mem_artifact_clone(entry->ref, entry->artifact, + &mem->artifacts[stored])) { + goto cleanup_artifacts; } - err = amduat_tgk_store_mem_decode_edge(mem, entry, &body); - if (err != 0) { - continue; - } - if (!amduat_tgk_store_mem_reference_clone(entry->ref, - &edges[count].edge_ref)) { - amduat_tgk_edge_body_free(&body); - goto cleanup; - } - edges[count].body = body; - count++; + stored++; } - if (count > 1) { - qsort(edges, count, sizeof(amduat_tgk_graph_edge_view_t), - amduat_tgk_store_mem_edge_cmp); + mem->artifacts_len = stored; + + if (!amduat_tgk_store_mem_edges_build(mem, &edges, &edges_len)) { + goto cleanup_artifacts; } mem->edges = edges; - mem->edges_len = count; + mem->edges_len = edges_len; return true; -cleanup: - for (i = 0; i < count; ++i) { - amduat_tgk_store_mem_reference_free(&edges[i].edge_ref); - amduat_tgk_edge_body_free(&edges[i].body); +cleanup_artifacts: + for (i = 0; i < stored; ++i) { + amduat_tgk_store_mem_artifact_free(&mem->artifacts[i]); } - free(edges); - mem->edges = NULL; - mem->edges_len = 0; + free(mem->artifacts); + mem->artifacts = NULL; + mem->artifacts_len = 0; + mem->artifacts_cap = 0; return false; } @@ -930,15 +1167,16 @@ void amduat_tgk_store_mem_free(amduat_tgk_store_mem_t *mem) { if (mem == NULL) { return; } - for (i = 0; i < mem->edges_len; ++i) { - amduat_tgk_store_mem_reference_free(&mem->edges[i].edge_ref); - amduat_tgk_edge_body_free(&mem->edges[i].body); - } - free(mem->edges); + amduat_tgk_store_mem_edge_view_list_free(mem->edges, mem->edges_len); mem->edges = NULL; mem->edges_len = 0; + for (i = 0; i < mem->artifacts_len; ++i) { + amduat_tgk_store_mem_artifact_free(&mem->artifacts[i]); + } + free(mem->artifacts); mem->artifacts = NULL; mem->artifacts_len = 0; + mem->artifacts_cap = 0; mem->snapshot_id = 0; } @@ -948,6 +1186,8 @@ amduat_tgk_store_ops_t amduat_tgk_store_mem_ops(void) { memset(&ops, 0, sizeof(ops)); ops.get_config = amduat_tgk_store_mem_get_config; ops.snapshot_id = amduat_tgk_store_mem_snapshot_id; + ops.ingest_artifact = amduat_tgk_store_mem_ingest_artifact; + ops.remove_artifact = amduat_tgk_store_mem_remove_artifact; ops.resolve_edge = amduat_tgk_store_mem_resolve_edge; ops.edges_from = amduat_tgk_store_mem_edges_from; ops.edges_to = amduat_tgk_store_mem_edges_to; diff --git a/src/tgk_stack/store/store.c b/src/tgk_stack/store/store.c index 8bbf659..ea3df75 100644 --- a/src/tgk_stack/store/store.c +++ b/src/tgk_stack/store/store.c @@ -39,6 +39,23 @@ bool amduat_tgk_store_snapshot_id(amduat_tgk_store_t *store, return store->ops.snapshot_id(store->ctx, out_id); } +bool amduat_tgk_store_ingest_artifact(amduat_tgk_store_t *store, + amduat_reference_t ref, + amduat_artifact_t artifact) { + if (store == NULL || store->ops.ingest_artifact == NULL) { + return false; + } + return store->ops.ingest_artifact(store->ctx, ref, artifact); +} + +bool amduat_tgk_store_remove_artifact(amduat_tgk_store_t *store, + amduat_reference_t ref) { + if (store == NULL || store->ops.remove_artifact == NULL) { + return false; + } + return store->ops.remove_artifact(store->ctx, ref); +} + amduat_tgk_graph_error_t amduat_tgk_store_resolve_edge( amduat_tgk_store_t *store, amduat_reference_t ref,