From a299b6c463463c02d2c5b5a59852893f1c77fa55 Mon Sep 17 00:00:00 2001 From: Carl Niklas Rydberg Date: Sat, 24 Jan 2026 07:22:51 +0100 Subject: [PATCH] amduatd: add pointer-rooted edge index and refresh loop --- src/amduatd.c | 32 ++ src/amduatd_caps.h | 2 + src/amduatd_concepts.c | 929 ++++++++++++++++++++++++++++++++++++++++- src/amduatd_concepts.h | 3 + src/amduatd_space.c | 6 + src/amduatd_space.h | 3 + src/asl_gc_fs.c | 73 ++++ 7 files changed, 1030 insertions(+), 18 deletions(-) diff --git a/src/amduatd.c b/src/amduatd.c index 2e9a1cb..6b79ad2 100644 --- a/src/amduatd.c +++ b/src/amduatd.c @@ -4019,6 +4019,7 @@ static void amduatd_print_usage(FILE *stream) { "usage:\n" " amduatd [--root PATH] [--sock PATH]\n" " [--space SPACE_ID] [--migrate-unscoped-edges]\n" + " [--edges-refresh-ms MS]\n" " [--allow-uid UID] [--allow-user NAME]\n" " [--enable-cap-reads]\n" "\n" @@ -4050,6 +4051,7 @@ int main(int argc, char **argv) { int i; int sfd = -1; uint64_t last_tick_ms = 0; + uint64_t last_edges_refresh_ms = 0; memset(&api_contract_ref, 0, sizeof(api_contract_ref)); memset(&ui_ref, 0, sizeof(ui_ref)); @@ -4083,6 +4085,19 @@ int main(int argc, char **argv) { } } else if (strcmp(argv[i], "--migrate-unscoped-edges") == 0) { migrate_unscoped_edges = true; + } else if (strcmp(argv[i], "--edges-refresh-ms") == 0) { + char *endp = NULL; + unsigned long long refresh_val; + if (i + 1 >= argc) { + fprintf(stderr, "error: --edges-refresh-ms requires a value\n"); + return 2; + } + refresh_val = strtoull(argv[++i], &endp, 10); + if (endp == argv[i] || *endp != '\0') { + fprintf(stderr, "error: invalid --edges-refresh-ms\n"); + return 2; + } + dcfg.edges_refresh_ms = (uint64_t)refresh_val; } else if (strcmp(argv[i], "--allow-uid") == 0) { char *endp = NULL; unsigned long uid_val; @@ -4283,6 +4298,23 @@ int main(int argc, char **argv) { &caps, root); (void)close(cfd); + + if (dcfg.edges_refresh_ms != 0u) { + uint64_t now_ms = amduatd_now_ms(); + if (last_edges_refresh_ms == 0u || + now_ms - last_edges_refresh_ms >= dcfg.edges_refresh_ms) { + amduatd_ctx_t refresh_ctx; + refresh_ctx.store = &store; + refresh_ctx.ui_ref = amduat_reference(0u, amduat_octets(NULL, 0u)); + refresh_ctx.store_cfg = &cfg; + refresh_ctx.concepts = &concepts; + refresh_ctx.daemon_cfg = &dcfg; + refresh_ctx.root_path = root; + refresh_ctx.caps = ∩︀ + (void)amduatd_concepts_refresh_edges(&refresh_ctx, 0u); + last_edges_refresh_ms = now_ms; + } + } } } diff --git a/src/amduatd_caps.h b/src/amduatd_caps.h index 3cd1eb5..688ae4f 100644 --- a/src/amduatd_caps.h +++ b/src/amduatd_caps.h @@ -6,6 +6,7 @@ #include #include +#include #ifdef __cplusplus extern "C" { @@ -13,6 +14,7 @@ extern "C" { typedef struct amduatd_cfg_t { amduatd_space_t space; + uint64_t edges_refresh_ms; } amduatd_cfg_t; typedef enum { diff --git a/src/amduatd_concepts.c b/src/amduatd_concepts.c index 227a0c7..f66f621 100644 --- a/src/amduatd_concepts.c +++ b/src/amduatd_concepts.c @@ -10,6 +10,7 @@ #include "amduat/asl/artifact_io.h" #include "amduat/asl/collection_view.h" +#include "amduat/asl/log_store.h" #include "amduat/asl/none.h" #include "amduat/asl/record.h" #include "amduat/asl/asl_pointer_fs.h" @@ -44,6 +45,7 @@ typedef struct amduatd_strbuf { static const char *const AMDUATD_EDGES_FILE = ".amduatd.edges"; static const char *const AMDUATD_EDGE_COLLECTION = "daemon/edges"; static const uint32_t AMDUATD_EDGE_VIEW_BATCH = 1024u; +static const uint32_t AMDUATD_EDGE_REFRESH_BATCH = 512u; static const uint16_t AMDUATD_EDGE_COLLECTION_KIND = 1u; static void amduatd_edge_entry_free(amduatd_edge_entry_t *entry) { @@ -133,6 +135,27 @@ static const uint8_t k_amduatd_edge_magic[AMDUATD_EDGE_MAGIC_LEN] = { }; static const char *const AMDUATD_EDGE_SCHEMA = "tgk/edge"; +static const char *const AMDUATD_EDGE_INDEX_SCHEMA = "tgk/edge_index_state"; + +enum { + AMDUATD_EDGE_INDEX_MAGIC_LEN = 8, + AMDUATD_EDGE_INDEX_VERSION = 1 +}; + +static const uint8_t k_amduatd_edge_index_magic[AMDUATD_EDGE_INDEX_MAGIC_LEN] = { + 'A', 'S', 'L', 'E', 'I', 'X', '1', '\0' +}; + +enum { + AMDUATD_EDGE_GRAPH_MAGIC_LEN = 8, + AMDUATD_EDGE_GRAPH_VERSION = 1 +}; + +static const uint8_t k_amduatd_edge_graph_magic[AMDUATD_EDGE_GRAPH_MAGIC_LEN] = { + 'A', 'S', 'L', 'E', 'G', 'R', '1', '\0' +}; + +enum { AMDUAT_TYPE_TAG_TGK_EDGE_GRAPH_1 = 0x00000202u }; static const char *const AMDUATD_REL_ALIAS = "alias"; static const char *const AMDUATD_REL_MATERIALIZES = "materializes"; @@ -156,6 +179,11 @@ static bool amduatd_concepts_lookup_alias(amduat_asl_store_t *store, const amduatd_concepts_t *c, const char *name, amduat_reference_t *out_concept_ref); +static bool amduatd_concepts_refresh_edges_internal( + amduat_asl_store_t *store, + amduatd_concepts_t *c, + const amduatd_space_t *space, + size_t max_new_entries); static void amduatd_store_u32_le(uint8_t *out, uint32_t value) { out[0] = (uint8_t)(value & 0xffu); @@ -217,6 +245,120 @@ static bool amduatd_add_size(size_t *acc, size_t add) { return true; } +typedef struct { + uint64_t indexed_up_to_offset; + bool has_graph_ref; + amduat_reference_t graph_ref; +} amduatd_edge_index_state_t; + +static void amduatd_edge_index_state_free(amduatd_edge_index_state_t *state) { + if (state == NULL) { + return; + } + if (state->has_graph_ref) { + amduat_reference_free(&state->graph_ref); + } + memset(state, 0, sizeof(*state)); +} + +static bool amduatd_edge_index_state_encode( + const amduatd_edge_index_state_t *state, + amduat_octets_t *out_payload) { + amduat_octets_t graph_bytes = amduat_octets(NULL, 0u); + uint8_t *payload = NULL; + size_t payload_len = 0u; + size_t offset = 0u; + uint32_t ref_len = 0u; + + if (out_payload == NULL) { + return false; + } + *out_payload = amduat_octets(NULL, 0u); + if (state == NULL) { + return false; + } + if (state->has_graph_ref) { + if (!amduat_enc_asl1_core_encode_reference_v1(state->graph_ref, + &graph_bytes)) { + return false; + } + if (graph_bytes.len > UINT32_MAX) { + free((void *)graph_bytes.data); + return false; + } + ref_len = (uint32_t)graph_bytes.len; + } + + payload_len = AMDUATD_EDGE_INDEX_MAGIC_LEN + 4u + 8u + 4u + ref_len; + payload = (uint8_t *)malloc(payload_len); + if (payload == NULL) { + free((void *)graph_bytes.data); + return false; + } + memcpy(payload + offset, k_amduatd_edge_index_magic, + AMDUATD_EDGE_INDEX_MAGIC_LEN); + offset += AMDUATD_EDGE_INDEX_MAGIC_LEN; + amduatd_store_u32_le(payload + offset, AMDUATD_EDGE_INDEX_VERSION); + offset += 4u; + amduatd_store_u64_le(payload + offset, state->indexed_up_to_offset); + offset += 8u; + amduatd_store_u32_le(payload + offset, ref_len); + offset += 4u; + if (ref_len != 0u) { + memcpy(payload + offset, graph_bytes.data, ref_len); + offset += ref_len; + } + free((void *)graph_bytes.data); + *out_payload = amduat_octets(payload, payload_len); + return offset == payload_len; +} + +static bool amduatd_edge_index_state_decode( + amduat_octets_t payload, + amduatd_edge_index_state_t *out_state) { + size_t offset = 0u; + uint32_t version = 0u; + uint32_t ref_len = 0u; + amduat_octets_t ref_bytes; + + if (out_state == NULL) { + return false; + } + memset(out_state, 0, sizeof(*out_state)); + if (payload.data == NULL || + payload.len < AMDUATD_EDGE_INDEX_MAGIC_LEN + 4u + 8u + 4u) { + return false; + } + if (memcmp(payload.data, k_amduatd_edge_index_magic, + AMDUATD_EDGE_INDEX_MAGIC_LEN) != 0) { + return false; + } + offset += AMDUATD_EDGE_INDEX_MAGIC_LEN; + if (!amduatd_read_u32_le(payload.data, payload.len, &offset, &version) || + version != AMDUATD_EDGE_INDEX_VERSION) { + return false; + } + if (!amduatd_read_u64_le(payload.data, payload.len, &offset, + &out_state->indexed_up_to_offset) || + !amduatd_read_u32_le(payload.data, payload.len, &offset, &ref_len)) { + return false; + } + if (payload.len - offset < ref_len) { + return false; + } + if (ref_len != 0u) { + ref_bytes = amduat_octets(payload.data + offset, ref_len); + if (!amduat_enc_asl1_core_decode_reference_v1(ref_bytes, + &out_state->graph_ref)) { + amduatd_edge_index_state_free(out_state); + return false; + } + out_state->has_graph_ref = true; + offset += ref_len; + } + return offset == payload.len; +} + static bool amduatd_is_ascii(amduat_octets_t bytes) { if (bytes.len == 0u || bytes.data == NULL) { return false; @@ -409,6 +551,255 @@ static bool amduatd_edge_payload_decode(amduat_octets_t payload, return true; } +typedef struct { + amduat_octets_t record_bytes; + amduat_octets_t src_bytes; + amduat_octets_t dst_bytes; +} amduatd_edge_graph_encoded_t; + +static void amduatd_edge_graph_encoded_free( + amduatd_edge_graph_encoded_t *encoded, + size_t len) { + if (encoded == NULL) { + return; + } + for (size_t i = 0u; i < len; ++i) { + amduat_octets_free(&encoded[i].record_bytes); + amduat_octets_free(&encoded[i].src_bytes); + amduat_octets_free(&encoded[i].dst_bytes); + } + free(encoded); +} + +static bool amduatd_edge_graph_encode(const amduatd_edge_list_t *edges, + amduat_octets_t *out_bytes) { + amduatd_edge_graph_encoded_t *encoded = NULL; + size_t total_len = 0u; + uint8_t *buffer = NULL; + size_t offset = 0u; + uint32_t edge_count = 0u; + + if (out_bytes == NULL) { + return false; + } + *out_bytes = amduat_octets(NULL, 0u); + if (edges == NULL) { + return false; + } + if (edges->len > UINT32_MAX) { + return false; + } + edge_count = (uint32_t)edges->len; + if (edge_count != 0u) { + encoded = (amduatd_edge_graph_encoded_t *)calloc(edge_count, + sizeof(*encoded)); + if (encoded == NULL) { + return false; + } + } + + if (!amduatd_add_size(&total_len, + AMDUATD_EDGE_GRAPH_MAGIC_LEN + 4u + 4u)) { + amduatd_edge_graph_encoded_free(encoded, edge_count); + return false; + } + + for (uint32_t i = 0u; i < edge_count; ++i) { + const amduatd_edge_entry_t *entry = &edges->items[i]; + size_t rel_len = 0u; + if (entry->rel == NULL) { + amduatd_edge_graph_encoded_free(encoded, edge_count); + return false; + } + rel_len = strlen(entry->rel); + if (rel_len > UINT32_MAX) { + amduatd_edge_graph_encoded_free(encoded, edge_count); + return false; + } + if (!amduat_enc_asl1_core_encode_reference_v1(entry->record_ref, + &encoded[i].record_bytes) || + !amduat_enc_asl1_core_encode_reference_v1(entry->src_ref, + &encoded[i].src_bytes) || + !amduat_enc_asl1_core_encode_reference_v1(entry->dst_ref, + &encoded[i].dst_bytes)) { + amduatd_edge_graph_encoded_free(encoded, edge_count); + return false; + } + if (encoded[i].record_bytes.len > UINT32_MAX || + encoded[i].src_bytes.len > UINT32_MAX || + encoded[i].dst_bytes.len > UINT32_MAX) { + amduatd_edge_graph_encoded_free(encoded, edge_count); + return false; + } + if (!amduatd_add_size(&total_len, 4u + rel_len) || + !amduatd_add_size(&total_len, 4u + encoded[i].record_bytes.len) || + !amduatd_add_size(&total_len, 4u + encoded[i].src_bytes.len) || + !amduatd_add_size(&total_len, 4u + encoded[i].dst_bytes.len)) { + amduatd_edge_graph_encoded_free(encoded, edge_count); + return false; + } + } + + buffer = (uint8_t *)malloc(total_len); + if (buffer == NULL) { + amduatd_edge_graph_encoded_free(encoded, edge_count); + return false; + } + + memcpy(buffer + offset, k_amduatd_edge_graph_magic, + AMDUATD_EDGE_GRAPH_MAGIC_LEN); + offset += AMDUATD_EDGE_GRAPH_MAGIC_LEN; + amduatd_store_u32_le(buffer + offset, AMDUATD_EDGE_GRAPH_VERSION); + offset += 4u; + amduatd_store_u32_le(buffer + offset, edge_count); + offset += 4u; + + for (uint32_t i = 0u; i < edge_count; ++i) { + const amduatd_edge_entry_t *entry = &edges->items[i]; + uint32_t rel_len = (uint32_t)strlen(entry->rel); + amduatd_store_u32_le(buffer + offset, rel_len); + offset += 4u; + memcpy(buffer + offset, entry->rel, rel_len); + offset += rel_len; + amduatd_store_u32_le(buffer + offset, + (uint32_t)encoded[i].record_bytes.len); + offset += 4u; + memcpy(buffer + offset, encoded[i].record_bytes.data, + encoded[i].record_bytes.len); + offset += encoded[i].record_bytes.len; + amduatd_store_u32_le(buffer + offset, + (uint32_t)encoded[i].src_bytes.len); + offset += 4u; + memcpy(buffer + offset, encoded[i].src_bytes.data, + encoded[i].src_bytes.len); + offset += encoded[i].src_bytes.len; + amduatd_store_u32_le(buffer + offset, + (uint32_t)encoded[i].dst_bytes.len); + offset += 4u; + memcpy(buffer + offset, encoded[i].dst_bytes.data, + encoded[i].dst_bytes.len); + offset += encoded[i].dst_bytes.len; + } + + amduatd_edge_graph_encoded_free(encoded, edge_count); + if (offset != total_len) { + free(buffer); + return false; + } + *out_bytes = amduat_octets(buffer, total_len); + return true; +} + +static bool amduatd_edge_graph_decode(amduat_octets_t bytes, + amduatd_edge_list_t *out_edges) { + size_t offset = 0u; + uint32_t version = 0u; + uint32_t edge_count = 0u; + amduatd_edge_list_t parsed; + + if (out_edges == NULL) { + return false; + } + memset(&parsed, 0, sizeof(parsed)); + if (bytes.data == NULL || + bytes.len < AMDUATD_EDGE_GRAPH_MAGIC_LEN + 4u + 4u) { + return false; + } + if (memcmp(bytes.data, k_amduatd_edge_graph_magic, + AMDUATD_EDGE_GRAPH_MAGIC_LEN) != 0) { + return false; + } + offset += AMDUATD_EDGE_GRAPH_MAGIC_LEN; + if (!amduatd_read_u32_le(bytes.data, bytes.len, &offset, &version) || + version != AMDUATD_EDGE_GRAPH_VERSION) { + return false; + } + if (!amduatd_read_u32_le(bytes.data, bytes.len, &offset, &edge_count)) { + return false; + } + + for (uint32_t i = 0u; i < edge_count; ++i) { + uint32_t rel_len = 0u; + uint32_t record_len = 0u; + uint32_t src_len = 0u; + uint32_t dst_len = 0u; + amduat_octets_t record_bytes; + amduat_octets_t src_bytes; + amduat_octets_t dst_bytes; + amduatd_edge_entry_t entry; + char *rel = NULL; + + if (!amduatd_read_u32_le(bytes.data, bytes.len, &offset, &rel_len) || + bytes.len - offset < rel_len) { + goto decode_fail; + } + if (rel_len == 0u || + !amduatd_is_ascii(amduat_octets(bytes.data + offset, rel_len))) { + goto decode_fail; + } + rel = (char *)malloc(rel_len + 1u); + if (rel == NULL) { + goto decode_fail; + } + memcpy(rel, bytes.data + offset, rel_len); + rel[rel_len] = '\0'; + offset += rel_len; + + if (!amduatd_read_u32_le(bytes.data, bytes.len, &offset, &record_len) || + bytes.len - offset < record_len) { + free(rel); + goto decode_fail; + } + record_bytes = amduat_octets(bytes.data + offset, record_len); + offset += record_len; + + if (!amduatd_read_u32_le(bytes.data, bytes.len, &offset, &src_len) || + bytes.len - offset < src_len) { + free(rel); + goto decode_fail; + } + src_bytes = amduat_octets(bytes.data + offset, src_len); + offset += src_len; + + if (!amduatd_read_u32_le(bytes.data, bytes.len, &offset, &dst_len) || + bytes.len - offset < dst_len) { + free(rel); + goto decode_fail; + } + dst_bytes = amduat_octets(bytes.data + offset, dst_len); + offset += dst_len; + + memset(&entry, 0, sizeof(entry)); + if (!amduat_enc_asl1_core_decode_reference_v1(record_bytes, + &entry.record_ref) || + !amduat_enc_asl1_core_decode_reference_v1(src_bytes, + &entry.src_ref) || + !amduat_enc_asl1_core_decode_reference_v1(dst_bytes, + &entry.dst_ref)) { + free(rel); + amduatd_edge_entry_free(&entry); + goto decode_fail; + } + entry.rel = rel; + if (!amduatd_edge_list_push(&parsed, &entry)) { + amduatd_edge_entry_free(&entry); + goto decode_fail; + } + amduatd_edge_entry_free(&entry); + } + + if (offset != bytes.len) { + goto decode_fail; + } + amduatd_edge_list_clear(out_edges); + *out_edges = parsed; + return true; + +decode_fail: + amduatd_edge_list_clear(&parsed); + return false; +} + static const char *amduatd_relation_name_for_ref( const amduatd_concepts_t *c, amduat_reference_t ref) { @@ -469,6 +860,36 @@ static bool amduatd_build_collection_head_name(const char *name, return true; } +static bool amduatd_build_collection_log_name(const char *name, + char **out_name) { + size_t name_len; + size_t total_len; + char *buffer; + size_t offset = 0u; + + if (name == NULL || out_name == NULL) { + return false; + } + if (!amduat_asl_pointer_name_is_valid(name)) { + return false; + } + name_len = strlen(name); + total_len = 11u + name_len + 4u + 1u; + buffer = (char *)malloc(total_len); + if (buffer == NULL) { + return false; + } + memcpy(buffer + offset, "collection/", 11u); + offset += 11u; + memcpy(buffer + offset, name, name_len); + offset += name_len; + memcpy(buffer + offset, "/log", 4u); + offset += 4u; + buffer[offset] = '\0'; + *out_name = buffer; + return true; +} + static bool amduatd_build_collection_log_head_name(const char *name, char **out_name) { size_t name_len; @@ -1001,14 +1422,19 @@ alias_cleanup: } static bool amduatd_concepts_load_edges(amduatd_concepts_t *c, - amduat_asl_store_t *store) { + amduat_asl_store_t *store, + uint64_t *out_up_to_offset) { amduat_asl_collection_view_t view; uint64_t from = 0u; + uint64_t computed_up_to = 0u; bool ok = false; if (c == NULL || store == NULL) { return false; } + if (out_up_to_offset != NULL) { + *out_up_to_offset = 0u; + } amduatd_edge_list_clear(&c->edges); @@ -1071,9 +1497,11 @@ static bool amduatd_concepts_load_edges(amduatd_concepts_t *c, } if (view.refs_len == 0u || view.computed_up_to_offset <= from) { + computed_up_to = view.computed_up_to_offset; break; } from = view.computed_up_to_offset; + computed_up_to = view.computed_up_to_offset; amduat_asl_collection_view_free(&view); } @@ -1081,6 +1509,9 @@ static bool amduatd_concepts_load_edges(amduatd_concepts_t *c, load_cleanup: amduat_asl_collection_view_free(&view); + if (ok && out_up_to_offset != NULL) { + *out_up_to_offset = computed_up_to; + } return ok; } @@ -1380,17 +1811,404 @@ migrate_cleanup: return ok; } +static bool amduatd_concepts_edge_index_pointer_name( + const amduatd_space_t *space, + amduat_octets_t *out_pointer_name) { + return amduatd_space_edges_index_head_name(space, out_pointer_name); +} + +static bool amduatd_concepts_put_edge_graph(amduat_asl_store_t *store, + const amduatd_edge_list_t *edges, + amduat_reference_t *out_ref) { + amduat_octets_t graph_bytes = amduat_octets(NULL, 0u); + amduat_artifact_t artifact; + + if (out_ref != NULL) { + *out_ref = amduat_reference(0u, amduat_octets(NULL, 0u)); + } + if (store == NULL || edges == NULL || out_ref == NULL) { + return false; + } + if (!amduatd_edge_graph_encode(edges, &graph_bytes)) { + return false; + } + artifact = amduat_artifact_with_type( + graph_bytes, amduat_type_tag(AMDUAT_TYPE_TAG_TGK_EDGE_GRAPH_1)); + if (amduat_asl_store_put(store, artifact, out_ref) != AMDUAT_ASL_STORE_OK) { + free((void *)graph_bytes.data); + return false; + } + free((void *)graph_bytes.data); + return true; +} + +static bool amduatd_concepts_load_edge_graph(amduat_asl_store_t *store, + amduat_reference_t graph_ref, + amduatd_edge_list_t *out_edges) { + amduat_artifact_t artifact; + + if (store == NULL || out_edges == NULL) { + return false; + } + memset(&artifact, 0, sizeof(artifact)); + if (amduat_asl_store_get(store, graph_ref, &artifact) != AMDUAT_ASL_STORE_OK) { + return false; + } + if (!artifact.has_type_tag || + artifact.type_tag.tag_id != AMDUAT_TYPE_TAG_TGK_EDGE_GRAPH_1) { + amduat_asl_artifact_free(&artifact); + return false; + } + if (!amduatd_edge_graph_decode(artifact.bytes, out_edges)) { + amduat_asl_artifact_free(&artifact); + return false; + } + amduat_asl_artifact_free(&artifact); + return true; +} + +static bool amduatd_concepts_put_edge_index_state( + amduat_asl_store_t *store, + const amduatd_edge_index_state_t *state, + amduat_reference_t *out_ref) { + amduat_octets_t payload = amduat_octets(NULL, 0u); + + if (out_ref != NULL) { + *out_ref = amduat_reference(0u, amduat_octets(NULL, 0u)); + } + if (store == NULL || state == NULL || out_ref == NULL) { + return false; + } + if (!amduatd_edge_index_state_encode(state, &payload)) { + return false; + } + if (amduat_asl_record_store_put( + store, + amduat_octets(AMDUATD_EDGE_INDEX_SCHEMA, + strlen(AMDUATD_EDGE_INDEX_SCHEMA)), + payload, + out_ref) != AMDUAT_ASL_STORE_OK) { + free((void *)payload.data); + return false; + } + free((void *)payload.data); + return true; +} + +static bool amduatd_concepts_load_edge_index_state( + amduatd_concepts_t *c, + amduat_asl_store_t *store, + const amduatd_space_t *space, + amduatd_edge_index_state_t *out_state, + amduat_reference_t *out_state_ref, + bool *out_exists) { + amduat_octets_t pointer_name = amduat_octets(NULL, 0u); + bool exists = false; + amduat_reference_t state_ref; + amduat_asl_record_t record; + amduat_asl_store_error_t err; + bool ok = false; + + if (out_exists != NULL) { + *out_exists = false; + } + if (out_state_ref != NULL) { + *out_state_ref = amduat_reference(0u, amduat_octets(NULL, 0u)); + } + if (c == NULL || store == NULL || out_state == NULL || out_exists == NULL) { + return false; + } + memset(out_state, 0, sizeof(*out_state)); + + if (!amduatd_concepts_edge_index_pointer_name(space, &pointer_name)) { + return false; + } + if (amduat_asl_pointer_get(&c->edge_collection.pointer_store, + (const char *)pointer_name.data, + &exists, + &state_ref) != AMDUAT_ASL_POINTER_OK) { + free((void *)pointer_name.data); + return false; + } + free((void *)pointer_name.data); + + if (!exists) { + *out_exists = false; + return true; + } + + memset(&record, 0, sizeof(record)); + err = amduat_asl_record_store_get(store, state_ref, &record); + if (err != AMDUAT_ASL_STORE_OK) { + amduat_reference_free(&state_ref); + return false; + } + if (record.schema.len != strlen(AMDUATD_EDGE_INDEX_SCHEMA) || + memcmp(record.schema.data, AMDUATD_EDGE_INDEX_SCHEMA, + record.schema.len) != 0) { + amduat_asl_record_free(&record); + amduat_reference_free(&state_ref); + return false; + } + ok = amduatd_edge_index_state_decode(record.payload, out_state); + amduat_asl_record_free(&record); + if (!ok) { + amduat_reference_free(&state_ref); + amduatd_edge_index_state_free(out_state); + return false; + } + *out_exists = true; + if (out_state_ref != NULL) { + *out_state_ref = state_ref; + } else { + amduat_reference_free(&state_ref); + } + return true; +} + +static bool amduatd_concepts_write_edge_index_state( + amduatd_concepts_t *c, + amduat_asl_store_t *store, + const amduatd_space_t *space, + uint64_t indexed_up_to_offset, + const amduatd_edge_list_t *edges, + bool expected_exists, + const amduat_reference_t *expected_ref, + bool *out_swapped) { + amduat_reference_t graph_ref; + amduat_reference_t state_ref; + amduatd_edge_index_state_t state; + amduat_octets_t pointer_name = amduat_octets(NULL, 0u); + bool swapped = false; + bool ok = false; + + if (out_swapped != NULL) { + *out_swapped = false; + } + if (c == NULL || store == NULL || edges == NULL) { + return false; + } + if (expected_exists && expected_ref == NULL) { + return false; + } + + memset(&graph_ref, 0, sizeof(graph_ref)); + memset(&state_ref, 0, sizeof(state_ref)); + memset(&state, 0, sizeof(state)); + state.indexed_up_to_offset = indexed_up_to_offset; + + if (!amduatd_concepts_put_edge_graph(store, edges, &graph_ref)) { + return false; + } + state.has_graph_ref = true; + state.graph_ref = graph_ref; + if (!amduatd_concepts_put_edge_index_state(store, &state, &state_ref)) { + amduat_reference_free(&graph_ref); + return false; + } + if (!amduatd_concepts_edge_index_pointer_name(space, &pointer_name)) { + amduat_reference_free(&graph_ref); + amduat_reference_free(&state_ref); + return false; + } + + if (amduat_asl_pointer_cas(&c->edge_collection.pointer_store, + (const char *)pointer_name.data, + expected_exists, + expected_ref, + &state_ref, + &swapped) != AMDUAT_ASL_POINTER_OK) { + goto cleanup; + } + ok = true; + +cleanup: + if (out_swapped != NULL) { + *out_swapped = swapped; + } + free((void *)pointer_name.data); + amduat_reference_free(&graph_ref); + amduat_reference_free(&state_ref); + return ok; +} + +static bool amduatd_concepts_refresh_edges_internal( + amduat_asl_store_t *store, + amduatd_concepts_t *c, + const amduatd_space_t *space, + size_t max_new_entries) { + amduatd_edge_index_state_t state; + amduat_reference_t state_ref; + bool state_exists = false; + bool ok = false; + + if (store == NULL || c == NULL) { + return false; + } + if (max_new_entries == 0u) { + max_new_entries = AMDUATD_EDGE_REFRESH_BATCH; + } + + for (int attempt = 0; attempt < 2; ++attempt) { + amduat_asl_log_entry_t *entries = NULL; + size_t entries_len = 0u; + uint64_t next_offset = 0u; + bool end = false; + char *log_name = NULL; + bool swapped = false; + size_t applied = 0u; + + memset(&state, 0, sizeof(state)); + memset(&state_ref, 0, sizeof(state_ref)); + if (!amduatd_concepts_load_edge_index_state( + c, store, space, &state, &state_ref, &state_exists)) { + return false; + } + + if (!amduatd_build_collection_log_name(c->edge_collection_name, &log_name)) { + amduatd_edge_index_state_free(&state); + amduat_reference_free(&state_ref); + return false; + } + if (amduat_asl_log_read(&c->edge_collection.log_store, + log_name, + state_exists ? state.indexed_up_to_offset : 0u, + max_new_entries, + &entries, + &entries_len, + &next_offset, + &end) != AMDUAT_ASL_STORE_OK) { + free(log_name); + amduatd_edge_index_state_free(&state); + amduat_reference_free(&state_ref); + return false; + } + free(log_name); + (void)end; + + if (entries_len == 0u) { + amduat_asl_log_entries_free(entries, entries_len); + amduatd_edge_index_state_free(&state); + amduat_reference_free(&state_ref); + return false; + } + + for (size_t i = 0u; i < entries_len; ++i) { + amduat_reference_t record_ref; + amduat_asl_record_t record; + amduat_reference_t src_ref; + amduat_reference_t dst_ref; + char *rel = NULL; + amduatd_edge_entry_t entry; + bool schema_ok = false; + + if (entries[i].kind != AMDUATD_EDGE_COLLECTION_KIND) { + continue; + } + memset(&record_ref, 0, sizeof(record_ref)); + if (!amduat_reference_clone(entries[i].payload_ref, &record_ref)) { + continue; + } + memset(&record, 0, sizeof(record)); + if (amduat_asl_record_store_get(store, record_ref, &record) != + AMDUAT_ASL_STORE_OK) { + amduat_reference_free(&record_ref); + continue; + } + if (record.schema.len == strlen(AMDUATD_EDGE_SCHEMA) && + memcmp(record.schema.data, AMDUATD_EDGE_SCHEMA, + record.schema.len) == 0) { + schema_ok = true; + } + if (!schema_ok || + !amduatd_edge_payload_decode(record.payload, + &src_ref, + &dst_ref, + &rel)) { + amduat_asl_record_free(&record); + amduat_reference_free(&record_ref); + continue; + } + amduat_asl_record_free(&record); + + memset(&entry, 0, sizeof(entry)); + entry.record_ref = record_ref; + entry.src_ref = src_ref; + entry.dst_ref = dst_ref; + entry.rel = rel; + if (amduatd_edge_list_push(&c->edges, &entry)) { + applied++; + } + amduatd_edge_entry_free(&entry); + } + + ok = amduatd_concepts_write_edge_index_state( + c, + store, + space, + next_offset, + &c->edges, + state_exists, + state_exists ? &state_ref : NULL, + &swapped); + + amduat_asl_log_entries_free(entries, entries_len); + amduatd_edge_index_state_free(&state); + amduat_reference_free(&state_ref); + + if (!ok) { + return false; + } + if (swapped) { + return applied != 0u; + } + + if (attempt == 0) { + amduatd_edge_list_t refreshed; + amduatd_edge_index_state_t latest; + amduat_reference_t latest_ref; + bool latest_exists = false; + + memset(&refreshed, 0, sizeof(refreshed)); + memset(&latest, 0, sizeof(latest)); + memset(&latest_ref, 0, sizeof(latest_ref)); + if (!amduatd_concepts_load_edge_index_state( + c, store, space, &latest, &latest_ref, &latest_exists)) { + return false; + } + if (latest_exists && latest.has_graph_ref && + amduatd_concepts_load_edge_graph(store, latest.graph_ref, + &refreshed)) { + amduatd_edge_list_clear(&c->edges); + c->edges = refreshed; + } else { + amduatd_edge_list_clear(&refreshed); + } + amduatd_edge_index_state_free(&latest); + amduat_reference_free(&latest_ref); + } + } + + return false; +} + bool amduatd_concepts_init(amduatd_concepts_t *c, amduat_asl_store_t *store, const amduatd_space_t *space, const char *root_path, bool enable_migrations) { amduat_octets_t scoped_collection = amduat_octets(NULL, 0u); + amduat_octets_t index_head_name = amduat_octets(NULL, 0u); + amduat_reference_t index_head_ref; + bool index_head_exists = false; + uint64_t indexed_up_to_offset = 0u; + bool used_index = false; if (c == NULL || store == NULL || root_path == NULL) { return false; } memset(c, 0, sizeof(*c)); + memset(&index_head_ref, 0, sizeof(index_head_ref)); c->root_path = root_path; (void)snprintf(c->edges_path, sizeof(c->edges_path), "%s/%s", root_path, AMDUATD_EDGES_FILE); @@ -1483,36 +2301,111 @@ bool amduatd_concepts_init(amduatd_concepts_t *c, "concepts init: ensure ms.has_provenance failed"); return false; } - if (!amduatd_concepts_load_edges(c, store)) { - amduat_log(AMDUAT_LOG_ERROR, "concepts init: load edges failed"); - return false; + + if (amduatd_concepts_edge_index_pointer_name(space, &index_head_name)) { + (void)amduat_asl_pointer_get(&c->edge_collection.pointer_store, + (const char *)index_head_name.data, + &index_head_exists, + &index_head_ref); + free((void *)index_head_name.data); } - if (enable_migrations && c->edges.len == 0u) { - if (space != NULL && space->enabled) { - if (amduatd_space_should_migrate_unscoped_edges(space)) { - if (!amduatd_concepts_migrate_unscoped_edges( - c, - store, - AMDUATD_EDGE_COLLECTION)) { + + { + amduatd_edge_index_state_t state; + amduat_reference_t state_ref; + bool state_exists = false; + bool have_index = false; + + memset(&state, 0, sizeof(state)); + memset(&state_ref, 0, sizeof(state_ref)); + if (amduatd_concepts_load_edge_index_state(c, store, space, + &state, &state_ref, + &state_exists) && + state_exists && state.has_graph_ref && + amduatd_concepts_load_edge_graph(store, state.graph_ref, &c->edges)) { + indexed_up_to_offset = state.indexed_up_to_offset; + have_index = true; + } + amduatd_edge_index_state_free(&state); + amduat_reference_free(&state_ref); + + if (have_index) { + used_index = true; + (void)amduatd_concepts_refresh_edges_internal( + store, c, space, AMDUATD_EDGE_REFRESH_BATCH); + } + } + + if (!used_index) { + if (!amduatd_concepts_load_edges(c, store, &indexed_up_to_offset)) { + amduat_log(AMDUAT_LOG_ERROR, "concepts init: load edges failed"); + amduat_reference_free(&index_head_ref); + return false; + } + if (enable_migrations && c->edges.len == 0u) { + if (space != NULL && space->enabled) { + if (amduatd_space_should_migrate_unscoped_edges(space)) { + if (!amduatd_concepts_migrate_unscoped_edges( + c, + store, + AMDUATD_EDGE_COLLECTION)) { + amduat_reference_free(&index_head_ref); + return false; + } + if (!amduatd_concepts_load_edges(c, store, &indexed_up_to_offset)) { + amduat_reference_free(&index_head_ref); + return false; + } + } + } else { + if (!amduatd_concepts_migrate_edges(c, store)) { + amduat_reference_free(&index_head_ref); return false; } - if (!amduatd_concepts_load_edges(c, store)) { + if (!amduatd_concepts_load_edges(c, store, &indexed_up_to_offset)) { + amduat_reference_free(&index_head_ref); return false; } } - } else { - if (!amduatd_concepts_migrate_edges(c, store)) { - return false; - } - if (!amduatd_concepts_load_edges(c, store)) { - return false; + } + { + bool swapped = false; + if (!amduatd_concepts_write_edge_index_state( + c, + store, + space, + indexed_up_to_offset, + &c->edges, + index_head_exists, + index_head_exists ? &index_head_ref : NULL, + &swapped)) { + amduat_log(AMDUAT_LOG_ERROR, + "concepts init: write edge index failed"); } } } + amduat_reference_free(&index_head_ref); + return true; } +bool amduatd_concepts_refresh_edges(amduatd_ctx_t *ctx, + size_t max_new_entries) { + const amduatd_space_t *space = NULL; + + if (ctx == NULL || ctx->store == NULL || ctx->concepts == NULL) { + return false; + } + if (ctx->daemon_cfg != NULL) { + space = &ctx->daemon_cfg->space; + } + return amduatd_concepts_refresh_edges_internal(ctx->store, + ctx->concepts, + space, + max_new_entries); +} + static bool amduatd_concepts_derive_name_ref( amduat_asl_store_t *store, const char *name, diff --git a/src/amduatd_concepts.h b/src/amduatd_concepts.h index bc0765d..5e8bbc8 100644 --- a/src/amduatd_concepts.h +++ b/src/amduatd_concepts.h @@ -48,6 +48,9 @@ bool amduatd_concepts_init(amduatd_concepts_t *c, void amduatd_concepts_free(amduatd_concepts_t *c); +bool amduatd_concepts_refresh_edges(amduatd_ctx_t *ctx, + size_t max_new_entries); + bool amduatd_concepts_can_handle(const amduatd_http_req_t *req); bool amduatd_concepts_handle(amduatd_ctx_t *ctx, diff --git a/src/amduatd_space.c b/src/amduatd_space.c index 3802862..c9b9e12 100644 --- a/src/amduatd_space.c +++ b/src/amduatd_space.c @@ -263,6 +263,12 @@ bool amduatd_space_edges_collection_name(const amduatd_space_t *sp, return amduatd_space_scope_name(sp, "daemon/edges", out_collection_name); } +bool amduatd_space_edges_index_head_name(const amduatd_space_t *sp, + amduat_octets_t *out_pointer_name) { + return amduatd_space_scope_name(sp, "daemon/edges/index/head", + out_pointer_name); +} + bool amduatd_space_should_migrate_unscoped_edges(const amduatd_space_t *sp) { return sp != NULL && sp->enabled && sp->migrate_unscoped_edges; } diff --git a/src/amduatd_space.h b/src/amduatd_space.h index 949cecc..c0238b5 100644 --- a/src/amduatd_space.h +++ b/src/amduatd_space.h @@ -43,6 +43,9 @@ void amduatd_space_log_mapping(const amduatd_space_t *sp, bool amduatd_space_edges_collection_name(const amduatd_space_t *sp, amduat_octets_t *out_collection_name); +bool amduatd_space_edges_index_head_name(const amduatd_space_t *sp, + amduat_octets_t *out_pointer_name); + bool amduatd_space_should_migrate_unscoped_edges(const amduatd_space_t *sp); #ifdef __cplusplus diff --git a/src/asl_gc_fs.c b/src/asl_gc_fs.c index d49a460..f092a2f 100644 --- a/src/asl_gc_fs.c +++ b/src/asl_gc_fs.c @@ -60,6 +60,16 @@ static const uint8_t k_amduat_gc_collection_magic[ 'A', 'S', 'L', 'C', 'O', 'L', '1', '\0' }; +enum { + AMDUAT_GC_EDGE_INDEX_MAGIC_LEN = 8, + AMDUAT_GC_EDGE_INDEX_VERSION = 1 +}; + +static const uint8_t k_amduat_gc_edge_index_magic[ + AMDUAT_GC_EDGE_INDEX_MAGIC_LEN] = { + 'A', 'S', 'L', 'E', 'I', 'X', '1', '\0' +}; + typedef struct { size_t digest_len; size_t capacity; @@ -747,6 +757,56 @@ decode_error: return false; } +static bool amduat_gc_decode_edge_index_state(amduat_octets_t payload, + bool *out_has_graph, + amduat_reference_t *out_graph) { + size_t offset = 0u; + uint32_t version = 0u; + uint32_t ref_len = 0u; + amduat_octets_t ref_bytes; + + if (out_has_graph == NULL || out_graph == NULL) { + return false; + } + *out_has_graph = false; + *out_graph = amduat_reference(0u, amduat_octets(NULL, 0u)); + + if (payload.len < AMDUAT_GC_EDGE_INDEX_MAGIC_LEN + 4u + 8u + 4u) { + return false; + } + if (memcmp(payload.data, k_amduat_gc_edge_index_magic, + AMDUAT_GC_EDGE_INDEX_MAGIC_LEN) != 0) { + return false; + } + offset += AMDUAT_GC_EDGE_INDEX_MAGIC_LEN; + if (!amduat_gc_read_u32_le(payload.data, payload.len, &offset, &version) || + version != AMDUAT_GC_EDGE_INDEX_VERSION) { + return false; + } + { + uint64_t ignored; + if (!amduat_gc_read_u64_le(payload.data, payload.len, &offset, &ignored)) { + return false; + } + } + if (!amduat_gc_read_u32_le(payload.data, payload.len, &offset, &ref_len)) { + return false; + } + if (payload.len - offset < ref_len) { + return false; + } + if (ref_len != 0u) { + ref_bytes = amduat_octets(payload.data + offset, ref_len); + if (!amduat_enc_asl1_core_decode_reference_v1(ref_bytes, out_graph)) { + amduat_reference_free(out_graph); + return false; + } + offset += ref_len; + *out_has_graph = true; + } + return offset == payload.len; +} + static bool amduat_gc_mark_refs(amduat_gc_ctx_t *ctx, amduat_reference_t *refs, size_t refs_len) { @@ -999,6 +1059,19 @@ static bool amduat_gc_mark_artifact(amduat_gc_ctx_t *ctx, amduat_reference_free(&refs[i]); } free(refs); + } else if (record.schema.len == strlen("tgk/edge_index_state") && + memcmp(record.schema.data, "tgk/edge_index_state", + record.schema.len) == 0) { + bool has_graph = false; + amduat_reference_t graph_ref; + if (amduat_gc_decode_edge_index_state(record.payload, + &has_graph, + &graph_ref)) { + if (has_graph) { + amduat_gc_mark_ref(ctx, &graph_ref); + amduat_reference_free(&graph_ref); + } + } } amduat_asl_record_free(&record); return true;