amduatd: add pointer-rooted edge index and refresh loop

This commit is contained in:
Carl Niklas Rydberg 2026-01-24 07:22:51 +01:00
parent 8d7c7d93a5
commit a299b6c463
7 changed files with 1030 additions and 18 deletions

View file

@ -4019,6 +4019,7 @@ static void amduatd_print_usage(FILE *stream) {
"usage:\n" "usage:\n"
" amduatd [--root PATH] [--sock PATH]\n" " amduatd [--root PATH] [--sock PATH]\n"
" [--space SPACE_ID] [--migrate-unscoped-edges]\n" " [--space SPACE_ID] [--migrate-unscoped-edges]\n"
" [--edges-refresh-ms MS]\n"
" [--allow-uid UID] [--allow-user NAME]\n" " [--allow-uid UID] [--allow-user NAME]\n"
" [--enable-cap-reads]\n" " [--enable-cap-reads]\n"
"\n" "\n"
@ -4050,6 +4051,7 @@ int main(int argc, char **argv) {
int i; int i;
int sfd = -1; int sfd = -1;
uint64_t last_tick_ms = 0; uint64_t last_tick_ms = 0;
uint64_t last_edges_refresh_ms = 0;
memset(&api_contract_ref, 0, sizeof(api_contract_ref)); memset(&api_contract_ref, 0, sizeof(api_contract_ref));
memset(&ui_ref, 0, sizeof(ui_ref)); memset(&ui_ref, 0, sizeof(ui_ref));
@ -4083,6 +4085,19 @@ int main(int argc, char **argv) {
} }
} else if (strcmp(argv[i], "--migrate-unscoped-edges") == 0) { } else if (strcmp(argv[i], "--migrate-unscoped-edges") == 0) {
migrate_unscoped_edges = true; migrate_unscoped_edges = true;
} else if (strcmp(argv[i], "--edges-refresh-ms") == 0) {
char *endp = NULL;
unsigned long long refresh_val;
if (i + 1 >= argc) {
fprintf(stderr, "error: --edges-refresh-ms requires a value\n");
return 2;
}
refresh_val = strtoull(argv[++i], &endp, 10);
if (endp == argv[i] || *endp != '\0') {
fprintf(stderr, "error: invalid --edges-refresh-ms\n");
return 2;
}
dcfg.edges_refresh_ms = (uint64_t)refresh_val;
} else if (strcmp(argv[i], "--allow-uid") == 0) { } else if (strcmp(argv[i], "--allow-uid") == 0) {
char *endp = NULL; char *endp = NULL;
unsigned long uid_val; unsigned long uid_val;
@ -4283,6 +4298,23 @@ int main(int argc, char **argv) {
&caps, &caps,
root); root);
(void)close(cfd); (void)close(cfd);
if (dcfg.edges_refresh_ms != 0u) {
uint64_t now_ms = amduatd_now_ms();
if (last_edges_refresh_ms == 0u ||
now_ms - last_edges_refresh_ms >= dcfg.edges_refresh_ms) {
amduatd_ctx_t refresh_ctx;
refresh_ctx.store = &store;
refresh_ctx.ui_ref = amduat_reference(0u, amduat_octets(NULL, 0u));
refresh_ctx.store_cfg = &cfg;
refresh_ctx.concepts = &concepts;
refresh_ctx.daemon_cfg = &dcfg;
refresh_ctx.root_path = root;
refresh_ctx.caps = ∩︀
(void)amduatd_concepts_refresh_edges(&refresh_ctx, 0u);
last_edges_refresh_ms = now_ms;
}
}
} }
} }

View file

@ -6,6 +6,7 @@
#include <stdbool.h> #include <stdbool.h>
#include <stddef.h> #include <stddef.h>
#include <stdint.h>
#ifdef __cplusplus #ifdef __cplusplus
extern "C" { extern "C" {
@ -13,6 +14,7 @@ extern "C" {
typedef struct amduatd_cfg_t { typedef struct amduatd_cfg_t {
amduatd_space_t space; amduatd_space_t space;
uint64_t edges_refresh_ms;
} amduatd_cfg_t; } amduatd_cfg_t;
typedef enum { typedef enum {

File diff suppressed because it is too large Load diff

View file

@ -48,6 +48,9 @@ bool amduatd_concepts_init(amduatd_concepts_t *c,
void amduatd_concepts_free(amduatd_concepts_t *c); void amduatd_concepts_free(amduatd_concepts_t *c);
bool amduatd_concepts_refresh_edges(amduatd_ctx_t *ctx,
size_t max_new_entries);
bool amduatd_concepts_can_handle(const amduatd_http_req_t *req); bool amduatd_concepts_can_handle(const amduatd_http_req_t *req);
bool amduatd_concepts_handle(amduatd_ctx_t *ctx, bool amduatd_concepts_handle(amduatd_ctx_t *ctx,

View file

@ -263,6 +263,12 @@ bool amduatd_space_edges_collection_name(const amduatd_space_t *sp,
return amduatd_space_scope_name(sp, "daemon/edges", out_collection_name); return amduatd_space_scope_name(sp, "daemon/edges", out_collection_name);
} }
bool amduatd_space_edges_index_head_name(const amduatd_space_t *sp,
amduat_octets_t *out_pointer_name) {
return amduatd_space_scope_name(sp, "daemon/edges/index/head",
out_pointer_name);
}
bool amduatd_space_should_migrate_unscoped_edges(const amduatd_space_t *sp) { bool amduatd_space_should_migrate_unscoped_edges(const amduatd_space_t *sp) {
return sp != NULL && sp->enabled && sp->migrate_unscoped_edges; return sp != NULL && sp->enabled && sp->migrate_unscoped_edges;
} }

View file

@ -43,6 +43,9 @@ void amduatd_space_log_mapping(const amduatd_space_t *sp,
bool amduatd_space_edges_collection_name(const amduatd_space_t *sp, bool amduatd_space_edges_collection_name(const amduatd_space_t *sp,
amduat_octets_t *out_collection_name); amduat_octets_t *out_collection_name);
bool amduatd_space_edges_index_head_name(const amduatd_space_t *sp,
amduat_octets_t *out_pointer_name);
bool amduatd_space_should_migrate_unscoped_edges(const amduatd_space_t *sp); bool amduatd_space_should_migrate_unscoped_edges(const amduatd_space_t *sp);
#ifdef __cplusplus #ifdef __cplusplus

View file

@ -60,6 +60,16 @@ static const uint8_t k_amduat_gc_collection_magic[
'A', 'S', 'L', 'C', 'O', 'L', '1', '\0' 'A', 'S', 'L', 'C', 'O', 'L', '1', '\0'
}; };
enum {
AMDUAT_GC_EDGE_INDEX_MAGIC_LEN = 8,
AMDUAT_GC_EDGE_INDEX_VERSION = 1
};
static const uint8_t k_amduat_gc_edge_index_magic[
AMDUAT_GC_EDGE_INDEX_MAGIC_LEN] = {
'A', 'S', 'L', 'E', 'I', 'X', '1', '\0'
};
typedef struct { typedef struct {
size_t digest_len; size_t digest_len;
size_t capacity; size_t capacity;
@ -747,6 +757,56 @@ decode_error:
return false; return false;
} }
static bool amduat_gc_decode_edge_index_state(amduat_octets_t payload,
bool *out_has_graph,
amduat_reference_t *out_graph) {
size_t offset = 0u;
uint32_t version = 0u;
uint32_t ref_len = 0u;
amduat_octets_t ref_bytes;
if (out_has_graph == NULL || out_graph == NULL) {
return false;
}
*out_has_graph = false;
*out_graph = amduat_reference(0u, amduat_octets(NULL, 0u));
if (payload.len < AMDUAT_GC_EDGE_INDEX_MAGIC_LEN + 4u + 8u + 4u) {
return false;
}
if (memcmp(payload.data, k_amduat_gc_edge_index_magic,
AMDUAT_GC_EDGE_INDEX_MAGIC_LEN) != 0) {
return false;
}
offset += AMDUAT_GC_EDGE_INDEX_MAGIC_LEN;
if (!amduat_gc_read_u32_le(payload.data, payload.len, &offset, &version) ||
version != AMDUAT_GC_EDGE_INDEX_VERSION) {
return false;
}
{
uint64_t ignored;
if (!amduat_gc_read_u64_le(payload.data, payload.len, &offset, &ignored)) {
return false;
}
}
if (!amduat_gc_read_u32_le(payload.data, payload.len, &offset, &ref_len)) {
return false;
}
if (payload.len - offset < ref_len) {
return false;
}
if (ref_len != 0u) {
ref_bytes = amduat_octets(payload.data + offset, ref_len);
if (!amduat_enc_asl1_core_decode_reference_v1(ref_bytes, out_graph)) {
amduat_reference_free(out_graph);
return false;
}
offset += ref_len;
*out_has_graph = true;
}
return offset == payload.len;
}
static bool amduat_gc_mark_refs(amduat_gc_ctx_t *ctx, static bool amduat_gc_mark_refs(amduat_gc_ctx_t *ctx,
amduat_reference_t *refs, amduat_reference_t *refs,
size_t refs_len) { size_t refs_len) {
@ -999,6 +1059,19 @@ static bool amduat_gc_mark_artifact(amduat_gc_ctx_t *ctx,
amduat_reference_free(&refs[i]); amduat_reference_free(&refs[i]);
} }
free(refs); free(refs);
} else if (record.schema.len == strlen("tgk/edge_index_state") &&
memcmp(record.schema.data, "tgk/edge_index_state",
record.schema.len) == 0) {
bool has_graph = false;
amduat_reference_t graph_ref;
if (amduat_gc_decode_edge_index_state(record.payload,
&has_graph,
&graph_ref)) {
if (has_graph) {
amduat_gc_mark_ref(ctx, &graph_ref);
amduat_reference_free(&graph_ref);
}
}
} }
amduat_asl_record_free(&record); amduat_asl_record_free(&record);
return true; return true;