Added ingest/remove APIs and wired epoch bumps on mutation, plus made the mem store truly mutable/owning

This commit is contained in:
Carl Niklas Rydberg 2025-12-21 21:20:12 +01:00
parent 456e899b50
commit 8bbadd602c
5 changed files with 335 additions and 41 deletions

View file

@ -86,6 +86,10 @@ typedef struct {
typedef struct {
bool (*get_config)(void *ctx, amduat_tgk_store_config_t *out_config);
bool (*snapshot_id)(void *ctx, amduat_tgk_snapshot_id_t *out_id);
bool (*ingest_artifact)(void *ctx,
amduat_reference_t ref,
amduat_artifact_t artifact);
bool (*remove_artifact)(void *ctx, amduat_reference_t ref);
amduat_tgk_graph_error_t (*resolve_edge)(void *ctx,
amduat_reference_t ref,
amduat_tgk_edge_body_t *out_body);
@ -130,6 +134,13 @@ bool amduat_tgk_store_get_config(amduat_tgk_store_t *store,
bool amduat_tgk_store_snapshot_id(amduat_tgk_store_t *store,
amduat_tgk_snapshot_id_t *out_id);
bool amduat_tgk_store_ingest_artifact(amduat_tgk_store_t *store,
amduat_reference_t ref,
amduat_artifact_t artifact);
bool amduat_tgk_store_remove_artifact(amduat_tgk_store_t *store,
amduat_reference_t ref);
amduat_tgk_graph_error_t amduat_tgk_store_resolve_edge(
amduat_tgk_store_t *store,
amduat_reference_t ref,

View file

@ -21,13 +21,14 @@ typedef struct {
/* Monotonic snapshot/epoch; increments when ingest/remove mutates state. */
amduat_tgk_snapshot_id_t snapshot_id;
amduat_tgk_store_config_t config;
const amduat_tgk_store_mem_artifact_t *artifacts;
amduat_tgk_store_mem_artifact_t *artifacts;
size_t artifacts_len;
size_t artifacts_cap;
amduat_tgk_graph_edge_view_t *edges;
size_t edges_len;
} amduat_tgk_store_mem_t;
/* Artifacts are borrowed; decoded edges are owned by the store. */
/* Artifacts and decoded edges are owned by the store. */
bool amduat_tgk_store_mem_init(amduat_tgk_store_mem_t *mem,
amduat_tgk_store_config_t config,
const amduat_tgk_store_mem_artifact_t *artifacts,

View file

@ -133,6 +133,29 @@ static bool amduat_tgk_store_fs_snapshot_id(void *ctx,
return mem_ops.snapshot_id(&fs->mem, out_id);
}
static bool amduat_tgk_store_fs_ingest_artifact(void *ctx,
amduat_reference_t ref,
amduat_artifact_t artifact) {
amduat_tgk_store_fs_t *fs = (amduat_tgk_store_fs_t *)ctx;
amduat_tgk_store_ops_t mem_ops = amduat_tgk_store_mem_ops();
if (fs == NULL || mem_ops.ingest_artifact == NULL) {
return false;
}
return mem_ops.ingest_artifact(&fs->mem, ref, artifact);
}
static bool amduat_tgk_store_fs_remove_artifact(void *ctx,
amduat_reference_t ref) {
amduat_tgk_store_fs_t *fs = (amduat_tgk_store_fs_t *)ctx;
amduat_tgk_store_ops_t mem_ops = amduat_tgk_store_mem_ops();
if (fs == NULL || mem_ops.remove_artifact == NULL) {
return false;
}
return mem_ops.remove_artifact(&fs->mem, ref);
}
static amduat_tgk_graph_error_t amduat_tgk_store_fs_resolve_edge(
void *ctx,
amduat_reference_t ref,
@ -361,6 +384,8 @@ amduat_tgk_store_ops_t amduat_tgk_store_fs_ops(void) {
memset(&ops, 0, sizeof(ops));
ops.get_config = amduat_tgk_store_fs_get_config;
ops.snapshot_id = amduat_tgk_store_fs_snapshot_id;
ops.ingest_artifact = amduat_tgk_store_fs_ingest_artifact;
ops.remove_artifact = amduat_tgk_store_fs_remove_artifact;
ops.resolve_edge = amduat_tgk_store_fs_resolve_edge;
ops.edges_from = amduat_tgk_store_fs_edges_from;
ops.edges_to = amduat_tgk_store_fs_edges_to;

View file

@ -9,6 +9,10 @@
enum { AMDUAT_TGK_STORE_MEM_SCAN_PAGE_SIZE = 256u };
static void amduat_tgk_store_mem_reference_free(amduat_reference_t *ref);
static amduat_tgk_graph_error_t amduat_tgk_store_mem_decode_edge(
const amduat_tgk_store_mem_t *mem,
const amduat_tgk_store_mem_artifact_t *entry,
amduat_tgk_edge_body_t *out_body);
static bool amduat_tgk_store_mem_id_space_valid(
amduat_tgk_id_space_config_t id_space) {
@ -252,6 +256,59 @@ static void amduat_tgk_store_mem_reference_free(amduat_reference_t *ref) {
ref->digest.len = 0;
}
static bool amduat_tgk_store_mem_artifact_clone(
amduat_reference_t ref,
amduat_artifact_t artifact,
amduat_tgk_store_mem_artifact_t *out) {
uint8_t *bytes_copy = NULL;
if (out == NULL) {
return false;
}
memset(out, 0, sizeof(*out));
if (!amduat_tgk_store_mem_reference_clone(ref, &out->ref)) {
return false;
}
if (artifact.bytes.len != 0) {
if (artifact.bytes.data == NULL) {
amduat_tgk_store_mem_reference_free(&out->ref);
return false;
}
bytes_copy = (uint8_t *)malloc(artifact.bytes.len);
if (bytes_copy == NULL) {
amduat_tgk_store_mem_reference_free(&out->ref);
return false;
}
memcpy(bytes_copy, artifact.bytes.data, artifact.bytes.len);
}
out->artifact.bytes = amduat_octets(bytes_copy, artifact.bytes.len);
out->artifact.has_type_tag = artifact.has_type_tag;
out->artifact.type_tag = artifact.type_tag;
return true;
}
static void amduat_tgk_store_mem_artifact_free(
amduat_tgk_store_mem_artifact_t *artifact) {
if (artifact == NULL) {
return;
}
free((void *)artifact->artifact.bytes.data);
artifact->artifact.bytes.data = NULL;
artifact->artifact.bytes.len = 0;
artifact->artifact.has_type_tag = false;
artifact->artifact.type_tag.tag_id = 0;
amduat_tgk_store_mem_reference_free(&artifact->ref);
}
static bool amduat_tgk_store_mem_artifact_value_eq(
const amduat_tgk_store_mem_artifact_t *entry,
amduat_artifact_t artifact) {
if (entry == NULL) {
return false;
}
return amduat_tgk_store_mem_artifact_eq(entry->artifact, artifact);
}
static bool amduat_tgk_store_mem_edge_body_clone(
const amduat_tgk_edge_body_t *src,
amduat_tgk_edge_body_t *dst) {
@ -322,6 +379,76 @@ static bool amduat_tgk_store_mem_edge_view_clone(
return true;
}
static void amduat_tgk_store_mem_edge_view_list_free(
amduat_tgk_graph_edge_view_t *edges,
size_t edges_len) {
size_t i;
if (edges == NULL) {
return;
}
for (i = 0; i < edges_len; ++i) {
amduat_tgk_store_mem_reference_free(&edges[i].edge_ref);
amduat_tgk_edge_body_free(&edges[i].body);
}
free(edges);
}
static bool amduat_tgk_store_mem_edges_build(
const amduat_tgk_store_mem_t *mem,
amduat_tgk_graph_edge_view_t **out_edges,
size_t *out_len) {
amduat_tgk_graph_edge_view_t *edges;
size_t count = 0;
size_t i;
if (out_edges == NULL || out_len == NULL) {
return false;
}
*out_edges = NULL;
*out_len = 0;
if (mem == NULL || mem->artifacts_len == 0) {
return true;
}
edges = (amduat_tgk_graph_edge_view_t *)calloc(
mem->artifacts_len, sizeof(*edges));
if (edges == NULL) {
return false;
}
for (i = 0; i < mem->artifacts_len; ++i) {
amduat_tgk_edge_body_t body;
amduat_tgk_graph_error_t err;
const amduat_tgk_store_mem_artifact_t *entry = &mem->artifacts[i];
if (!amduat_tgk_store_mem_hash_id_supported(mem, entry->ref.hash_id)) {
continue;
}
err = amduat_tgk_store_mem_decode_edge(mem, entry, &body);
if (err != 0) {
continue;
}
if (!amduat_tgk_store_mem_reference_clone(entry->ref,
&edges[count].edge_ref)) {
amduat_tgk_edge_body_free(&body);
amduat_tgk_store_mem_edge_view_list_free(edges, count);
return false;
}
edges[count].body = body;
count++;
}
if (count > 1) {
qsort(edges, count, sizeof(*edges), amduat_tgk_store_mem_edge_cmp);
}
*out_edges = edges;
*out_len = count;
return true;
}
static const amduat_tgk_store_mem_artifact_t *
amduat_tgk_store_mem_lookup_artifact(const amduat_tgk_store_mem_t *mem,
amduat_reference_t ref,
@ -567,6 +694,123 @@ static bool amduat_tgk_store_mem_snapshot_id(
return true;
}
static bool amduat_tgk_store_mem_ingest_artifact(
void *ctx,
amduat_reference_t ref,
amduat_artifact_t artifact) {
amduat_tgk_store_mem_t *mem = (amduat_tgk_store_mem_t *)ctx;
size_t i;
size_t insert_at;
amduat_tgk_store_mem_artifact_t entry;
amduat_tgk_graph_edge_view_t *edges = NULL;
size_t edges_len = 0;
if (mem == NULL) {
return false;
}
if (!amduat_tgk_store_mem_hash_id_supported(mem, ref.hash_id)) {
return false;
}
if (artifact.bytes.len != 0 && artifact.bytes.data == NULL) {
return false;
}
for (i = 0; i < mem->artifacts_len; ++i) {
if (!amduat_reference_eq(mem->artifacts[i].ref, ref)) {
continue;
}
if (amduat_tgk_store_mem_artifact_value_eq(&mem->artifacts[i], artifact)) {
return true;
}
return false;
}
if (mem->artifacts_len == mem->artifacts_cap) {
size_t new_cap = (mem->artifacts_cap == 0) ? 8u : mem->artifacts_cap * 2u;
amduat_tgk_store_mem_artifact_t *next =
(amduat_tgk_store_mem_artifact_t *)realloc(
mem->artifacts, new_cap * sizeof(*mem->artifacts));
if (next == NULL) {
return false;
}
mem->artifacts = next;
mem->artifacts_cap = new_cap;
}
insert_at = mem->artifacts_len;
if (!amduat_tgk_store_mem_artifact_clone(ref, artifact, &entry)) {
return false;
}
mem->artifacts[insert_at] = entry;
mem->artifacts_len++;
if (!amduat_tgk_store_mem_edges_build(mem, &edges, &edges_len)) {
mem->artifacts_len--;
amduat_tgk_store_mem_artifact_free(&mem->artifacts[insert_at]);
return false;
}
amduat_tgk_store_mem_edge_view_list_free(mem->edges, mem->edges_len);
mem->edges = edges;
mem->edges_len = edges_len;
mem->snapshot_id++;
return true;
}
static bool amduat_tgk_store_mem_remove_artifact(void *ctx,
amduat_reference_t ref) {
amduat_tgk_store_mem_t *mem = (amduat_tgk_store_mem_t *)ctx;
amduat_tgk_store_mem_artifact_t removed;
amduat_tgk_graph_edge_view_t *edges = NULL;
size_t edges_len = 0;
size_t i;
size_t index = mem ? mem->artifacts_len : 0;
if (mem == NULL) {
return false;
}
if (!amduat_tgk_store_mem_hash_id_supported(mem, ref.hash_id)) {
return false;
}
for (i = 0; i < mem->artifacts_len; ++i) {
if (amduat_reference_eq(mem->artifacts[i].ref, ref)) {
index = i;
break;
}
}
if (index >= mem->artifacts_len) {
return true;
}
if (!amduat_tgk_store_mem_artifact_clone(mem->artifacts[index].ref,
mem->artifacts[index].artifact,
&removed)) {
return false;
}
amduat_tgk_store_mem_artifact_free(&mem->artifacts[index]);
memmove(&mem->artifacts[index], &mem->artifacts[index + 1],
(mem->artifacts_len - index - 1) *
sizeof(*mem->artifacts));
mem->artifacts_len--;
if (!amduat_tgk_store_mem_edges_build(mem, &edges, &edges_len)) {
memmove(&mem->artifacts[index + 1], &mem->artifacts[index],
(mem->artifacts_len - index) * sizeof(*mem->artifacts));
mem->artifacts[index] = removed;
mem->artifacts_len++;
return false;
}
amduat_tgk_store_mem_artifact_free(&removed);
amduat_tgk_store_mem_edge_view_list_free(mem->edges, mem->edges_len);
mem->edges = edges;
mem->edges_len = edges_len;
mem->snapshot_id++;
return true;
}
static amduat_tgk_graph_error_t amduat_tgk_store_mem_resolve_edge(
void *ctx,
amduat_reference_t ref,
@ -835,16 +1079,18 @@ bool amduat_tgk_store_mem_init(amduat_tgk_store_mem_t *mem,
const amduat_tgk_store_mem_artifact_t *artifacts,
size_t artifacts_len) {
size_t i;
size_t count = 0;
amduat_tgk_graph_edge_view_t *edges;
size_t stored = 0;
amduat_tgk_graph_edge_view_t *edges = NULL;
size_t edges_len = 0;
if (mem == NULL) {
return false;
}
mem->snapshot_id = 0;
mem->config = config;
mem->artifacts = artifacts;
mem->artifacts_len = artifacts_len;
mem->artifacts = NULL;
mem->artifacts_len = 0;
mem->artifacts_cap = 0;
mem->edges = NULL;
mem->edges_len = 0;
@ -869,58 +1115,49 @@ bool amduat_tgk_store_mem_init(amduat_tgk_store_mem_t *mem,
return false;
}
edges = (amduat_tgk_graph_edge_view_t *)calloc(
artifacts_len, sizeof(amduat_tgk_graph_edge_view_t));
if (edges == NULL) {
mem->artifacts = (amduat_tgk_store_mem_artifact_t *)calloc(
artifacts_len, sizeof(*mem->artifacts));
if (mem->artifacts == NULL) {
return false;
}
mem->artifacts_cap = artifacts_len;
for (i = 0; i < artifacts_len; ++i) {
amduat_tgk_edge_body_t body;
amduat_tgk_graph_error_t err;
const amduat_tgk_store_mem_artifact_t *entry = &artifacts[i];
bool duplicate = false;
if (!amduat_tgk_store_mem_check_duplicate_artifact(artifacts, i,
&duplicate)) {
goto cleanup;
goto cleanup_artifacts;
}
if (duplicate) {
continue;
}
if (!amduat_tgk_store_mem_hash_id_supported(mem, entry->ref.hash_id)) {
continue;
if (!amduat_tgk_store_mem_artifact_clone(entry->ref, entry->artifact,
&mem->artifacts[stored])) {
goto cleanup_artifacts;
}
err = amduat_tgk_store_mem_decode_edge(mem, entry, &body);
if (err != 0) {
continue;
}
if (!amduat_tgk_store_mem_reference_clone(entry->ref,
&edges[count].edge_ref)) {
amduat_tgk_edge_body_free(&body);
goto cleanup;
}
edges[count].body = body;
count++;
stored++;
}
if (count > 1) {
qsort(edges, count, sizeof(amduat_tgk_graph_edge_view_t),
amduat_tgk_store_mem_edge_cmp);
mem->artifacts_len = stored;
if (!amduat_tgk_store_mem_edges_build(mem, &edges, &edges_len)) {
goto cleanup_artifacts;
}
mem->edges = edges;
mem->edges_len = count;
mem->edges_len = edges_len;
return true;
cleanup:
for (i = 0; i < count; ++i) {
amduat_tgk_store_mem_reference_free(&edges[i].edge_ref);
amduat_tgk_edge_body_free(&edges[i].body);
cleanup_artifacts:
for (i = 0; i < stored; ++i) {
amduat_tgk_store_mem_artifact_free(&mem->artifacts[i]);
}
free(edges);
mem->edges = NULL;
mem->edges_len = 0;
free(mem->artifacts);
mem->artifacts = NULL;
mem->artifacts_len = 0;
mem->artifacts_cap = 0;
return false;
}
@ -930,15 +1167,16 @@ void amduat_tgk_store_mem_free(amduat_tgk_store_mem_t *mem) {
if (mem == NULL) {
return;
}
for (i = 0; i < mem->edges_len; ++i) {
amduat_tgk_store_mem_reference_free(&mem->edges[i].edge_ref);
amduat_tgk_edge_body_free(&mem->edges[i].body);
}
free(mem->edges);
amduat_tgk_store_mem_edge_view_list_free(mem->edges, mem->edges_len);
mem->edges = NULL;
mem->edges_len = 0;
for (i = 0; i < mem->artifacts_len; ++i) {
amduat_tgk_store_mem_artifact_free(&mem->artifacts[i]);
}
free(mem->artifacts);
mem->artifacts = NULL;
mem->artifacts_len = 0;
mem->artifacts_cap = 0;
mem->snapshot_id = 0;
}
@ -948,6 +1186,8 @@ amduat_tgk_store_ops_t amduat_tgk_store_mem_ops(void) {
memset(&ops, 0, sizeof(ops));
ops.get_config = amduat_tgk_store_mem_get_config;
ops.snapshot_id = amduat_tgk_store_mem_snapshot_id;
ops.ingest_artifact = amduat_tgk_store_mem_ingest_artifact;
ops.remove_artifact = amduat_tgk_store_mem_remove_artifact;
ops.resolve_edge = amduat_tgk_store_mem_resolve_edge;
ops.edges_from = amduat_tgk_store_mem_edges_from;
ops.edges_to = amduat_tgk_store_mem_edges_to;

View file

@ -39,6 +39,23 @@ bool amduat_tgk_store_snapshot_id(amduat_tgk_store_t *store,
return store->ops.snapshot_id(store->ctx, out_id);
}
bool amduat_tgk_store_ingest_artifact(amduat_tgk_store_t *store,
amduat_reference_t ref,
amduat_artifact_t artifact) {
if (store == NULL || store->ops.ingest_artifact == NULL) {
return false;
}
return store->ops.ingest_artifact(store->ctx, ref, artifact);
}
bool amduat_tgk_store_remove_artifact(amduat_tgk_store_t *store,
amduat_reference_t ref) {
if (store == NULL || store->ops.remove_artifact == NULL) {
return false;
}
return store->ops.remove_artifact(store->ctx, ref);
}
amduat_tgk_graph_error_t amduat_tgk_store_resolve_edge(
amduat_tgk_store_t *store,
amduat_reference_t ref,