From 489738c5baa9ed6635ec0964a1683cb57eb003c6 Mon Sep 17 00:00:00 2001 From: Carl Niklas Rydberg Date: Sun, 18 Jan 2026 11:17:35 +0100 Subject: [PATCH] Add federation view cache and resolve --- CMakeLists.txt | 11 ++ include/amduat/fed/view.h | 63 +++++++ src/near_core/fed/view.c | 361 ++++++++++++++++++++++++++++++++++++++ tests/fed/test_fed_view.c | 186 ++++++++++++++++++++ 4 files changed, 621 insertions(+) create mode 100644 include/amduat/fed/view.h create mode 100644 src/near_core/fed/view.c create mode 100644 tests/fed/test_fed_view.c diff --git a/CMakeLists.txt b/CMakeLists.txt index 3edc306..53c11cc 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -126,6 +126,7 @@ set(AMDUAT_FED_SRCS src/near_core/fed/registry.c src/near_core/fed/replay.c src/near_core/fed/ingest.c + src/near_core/fed/view.c ) set(AMDUAT_ASL_STORE_FS_SRCS @@ -572,3 +573,13 @@ target_link_libraries(amduat_test_fed_ingest PRIVATE amduat_fed ) add_test(NAME fed_ingest COMMAND amduat_test_fed_ingest) + +add_executable(amduat_test_fed_view tests/fed/test_fed_view.c) +target_include_directories(amduat_test_fed_view + PRIVATE ${AMDUAT_INTERNAL_DIR} + PRIVATE ${AMDUAT_INCLUDE_DIR} +) +target_link_libraries(amduat_test_fed_view + PRIVATE amduat_fed +) +add_test(NAME fed_view COMMAND amduat_test_fed_view) diff --git a/include/amduat/fed/view.h b/include/amduat/fed/view.h new file mode 100644 index 0000000..7cb8a17 --- /dev/null +++ b/include/amduat/fed/view.h @@ -0,0 +1,63 @@ +#ifndef AMDUAT_FED_VIEW_H +#define AMDUAT_FED_VIEW_H + +#include "amduat/asl/store.h" +#include "amduat/fed/replay.h" + +#include +#include + +#ifdef __cplusplus +extern "C" { +#endif + +typedef enum { + AMDUAT_FED_RESOLVE_OK = 0, + AMDUAT_FED_RESOLVE_NOT_FOUND = 1, + AMDUAT_FED_RESOLVE_FOUND_REMOTE_NO_BYTES = 2, + AMDUAT_FED_RESOLVE_INTEGRITY_ERROR = 3, + AMDUAT_FED_RESOLVE_POLICY_DENIED = 4, + AMDUAT_FED_RESOLVE_STORE_ERROR = 5 +} amduat_fed_resolve_error_t; + +typedef struct { + uint32_t domain_id; + uint64_t snapshot_id; + uint64_t log_prefix; +} amduat_fed_view_bounds_t; + +typedef struct { + amduat_fed_record_id_t id; + uint32_t reason_code; +} amduat_fed_policy_deny_t; + +typedef struct { + uint32_t local_domain_id; + amduat_fed_record_t *records; + size_t len; + amduat_fed_policy_deny_t *denies; + size_t denies_len; +} amduat_fed_view_t; + +bool amduat_fed_view_build(const amduat_fed_record_t *records, + size_t count, + uint32_t local_domain_id, + const amduat_fed_view_bounds_t *bounds, + size_t bounds_len, + const amduat_fed_policy_deny_t *denies, + size_t denies_len, + amduat_fed_view_t *out_view); + +void amduat_fed_view_free(amduat_fed_view_t *view); + +amduat_fed_resolve_error_t amduat_fed_resolve( + const amduat_fed_view_t *view, + amduat_asl_store_t *local_store, + amduat_reference_t ref, + amduat_artifact_t *out_artifact); + +#ifdef __cplusplus +} /* extern "C" */ +#endif + +#endif /* AMDUAT_FED_VIEW_H */ diff --git a/src/near_core/fed/view.c b/src/near_core/fed/view.c new file mode 100644 index 0000000..c7ec511 --- /dev/null +++ b/src/near_core/fed/view.c @@ -0,0 +1,361 @@ +#include "amduat/fed/view.h" + +#include +#include + +static bool amduat_fed_record_clone(const amduat_fed_record_t *src, + amduat_fed_record_t *out) { + if (src == NULL || out == NULL) { + return false; + } + *out = *src; + if (!amduat_reference_clone(src->id.ref, &out->id.ref)) { + return false; + } + return true; +} + +static void amduat_fed_record_free(amduat_fed_record_t *record) { + if (record == NULL) { + return; + } + amduat_reference_free(&record->id.ref); + memset(record, 0, sizeof(*record)); +} + +static bool amduat_fed_record_equivalent(const amduat_fed_record_t *a, + const amduat_fed_record_t *b) { + return a->meta.domain_id == b->meta.domain_id && + a->meta.visibility == b->meta.visibility && + a->meta.has_source == b->meta.has_source && + a->meta.source_domain == b->meta.source_domain && + a->id.type == b->id.type && + amduat_reference_eq(a->id.ref, b->id.ref) && + a->logseq == b->logseq && + a->snapshot_id == b->snapshot_id && + a->log_prefix == b->log_prefix; +} + +static const amduat_fed_record_t *amduat_fed_view_find( + const amduat_fed_view_t *view, + const amduat_fed_record_t *record) { + size_t i; + + for (i = 0; i < view->len; ++i) { + const amduat_fed_record_t *existing = &view->records[i]; + + if (existing->id.type != record->id.type) { + continue; + } + if (!amduat_reference_eq(existing->id.ref, record->id.ref)) { + continue; + } + return existing; + } + return NULL; +} + +static bool amduat_fed_policy_deny_clone(const amduat_fed_policy_deny_t *src, + amduat_fed_policy_deny_t *out) { + if (src == NULL || out == NULL) { + return false; + } + *out = *src; + if (!amduat_reference_clone(src->id.ref, &out->id.ref)) { + return false; + } + return true; +} + +static void amduat_fed_policy_deny_free(amduat_fed_policy_deny_t *deny) { + if (deny == NULL) { + return; + } + amduat_reference_free(&deny->id.ref); + memset(deny, 0, sizeof(*deny)); +} + +static bool amduat_fed_view_append(amduat_fed_view_t *view, + const amduat_fed_record_t *records, + size_t count) { + size_t i; + size_t base_len; + + if (count == 0u) { + return true; + } + if (view->len > SIZE_MAX - count) { + return false; + } + if (view->records == NULL) { + view->records = (amduat_fed_record_t *)calloc(count, sizeof(*view->records)); + if (view->records == NULL) { + return false; + } + } else { + amduat_fed_record_t *next = + (amduat_fed_record_t *)realloc(view->records, + (view->len + count) * + sizeof(*view->records)); + if (next == NULL) { + return false; + } + view->records = next; + } + + base_len = view->len; + for (i = 0; i < count; ++i) { + const amduat_fed_record_t *existing = amduat_fed_view_find(view, + &records[i]); + + if (existing != NULL) { + if (!amduat_fed_record_equivalent(existing, &records[i])) { + size_t j; + for (j = base_len; j < view->len; ++j) { + amduat_fed_record_free(&view->records[j]); + } + view->len = base_len; + return false; + } + continue; + } + + if (!amduat_fed_record_clone(&records[i], + &view->records[view->len])) { + size_t j; + for (j = base_len; j < view->len; ++j) { + amduat_fed_record_free(&view->records[j]); + } + view->len = base_len; + return false; + } + view->len++; + } + return true; +} + +bool amduat_fed_view_build(const amduat_fed_record_t *records, + size_t count, + uint32_t local_domain_id, + const amduat_fed_view_bounds_t *bounds, + size_t bounds_len, + const amduat_fed_policy_deny_t *denies, + size_t denies_len, + amduat_fed_view_t *out_view) { + size_t i; + amduat_fed_view_t view; + + if (out_view == NULL) { + return false; + } + out_view->records = NULL; + out_view->len = 0; + out_view->local_domain_id = local_domain_id; + out_view->denies = NULL; + out_view->denies_len = 0; + + if (records == NULL && count != 0u) { + return false; + } + + view.records = NULL; + view.len = 0; + view.local_domain_id = local_domain_id; + view.denies = NULL; + view.denies_len = 0; + + if (denies_len != 0u && denies == NULL) { + return false; + } + if (denies_len != 0u) { + view.denies = (amduat_fed_policy_deny_t *)calloc(denies_len, + sizeof(*view.denies)); + if (view.denies == NULL) { + return false; + } + for (i = 0; i < denies_len; ++i) { + if (!amduat_fed_policy_deny_clone(&denies[i], &view.denies[i])) { + size_t j; + for (j = 0; j < i; ++j) { + amduat_fed_policy_deny_free(&view.denies[j]); + } + free(view.denies); + return false; + } + } + view.denies_len = denies_len; + } + + for (i = 0; i < bounds_len; ++i) { + amduat_fed_replay_view_t domain_view; + amduat_fed_view_bounds_t bound = bounds[i]; + size_t j; + size_t domain_count = 0; + amduat_fed_record_t *domain_records = NULL; + bool ok; + + for (j = 0; j < count; ++j) { + const amduat_fed_record_t *record = &records[j]; + + if (record->meta.domain_id != bound.domain_id) { + continue; + } + if (bound.domain_id != local_domain_id && record->meta.visibility != 1u) { + continue; + } + domain_count++; + } + + if (domain_count == 0u) { + continue; + } + + domain_records = + (amduat_fed_record_t *)calloc(domain_count, sizeof(*domain_records)); + if (domain_records == NULL) { + amduat_fed_view_free(&view); + return false; + } + + domain_count = 0; + for (j = 0; j < count; ++j) { + const amduat_fed_record_t *record = &records[j]; + + if (record->meta.domain_id != bound.domain_id) { + continue; + } + if (bound.domain_id != local_domain_id && record->meta.visibility != 1u) { + continue; + } + if (!amduat_fed_record_clone(record, &domain_records[domain_count])) { + size_t k; + for (k = 0; k < domain_count; ++k) { + amduat_fed_record_free(&domain_records[k]); + } + free(domain_records); + amduat_fed_view_free(&view); + return false; + } + domain_count++; + } + + ok = amduat_fed_replay_domain(domain_records, + domain_count, + bound.domain_id, + bound.snapshot_id, + bound.log_prefix, + &domain_view); + for (j = 0; j < domain_count; ++j) { + amduat_fed_record_free(&domain_records[j]); + } + free(domain_records); + if (!ok) { + amduat_fed_view_free(&view); + return false; + } + if (!amduat_fed_view_append(&view, + domain_view.records, + domain_view.len)) { + amduat_fed_replay_view_free(&domain_view); + amduat_fed_view_free(&view); + return false; + } + amduat_fed_replay_view_free(&domain_view); + } + + *out_view = view; + return true; +} + +void amduat_fed_view_free(amduat_fed_view_t *view) { + size_t i; + + if (view == NULL) { + return; + } + if (view->records != NULL) { + for (i = 0; i < view->len; ++i) { + amduat_fed_record_free(&view->records[i]); + } + free(view->records); + } + if (view->denies != NULL) { + for (i = 0; i < view->denies_len; ++i) { + amduat_fed_policy_deny_free(&view->denies[i]); + } + free(view->denies); + } + view->records = NULL; + view->len = 0; + view->local_domain_id = 0; + view->denies = NULL; + view->denies_len = 0; +} + +static const amduat_fed_record_t *amduat_fed_view_lookup( + const amduat_fed_view_t *view, + amduat_reference_t ref) { + size_t i; + + if (view == NULL) { + return NULL; + } + for (i = 0; i < view->len; ++i) { + if (amduat_reference_eq(view->records[i].id.ref, ref)) { + return &view->records[i]; + } + } + return NULL; +} + +static bool amduat_fed_view_policy_denied(const amduat_fed_view_t *view, + amduat_reference_t ref) { + size_t i; + + if (view == NULL) { + return false; + } + for (i = 0; i < view->denies_len; ++i) { + if (amduat_reference_eq(view->denies[i].id.ref, ref)) { + return true; + } + } + return false; +} + +amduat_fed_resolve_error_t amduat_fed_resolve( + const amduat_fed_view_t *view, + amduat_asl_store_t *local_store, + amduat_reference_t ref, + amduat_artifact_t *out_artifact) { + amduat_asl_store_error_t store_err; + const amduat_fed_record_t *record; + + if (local_store == NULL || out_artifact == NULL) { + return AMDUAT_FED_RESOLVE_STORE_ERROR; + } + + store_err = amduat_asl_store_get(local_store, ref, out_artifact); + if (store_err == AMDUAT_ASL_STORE_OK) { + return AMDUAT_FED_RESOLVE_OK; + } + if (store_err == AMDUAT_ASL_STORE_ERR_INTEGRITY) { + return AMDUAT_FED_RESOLVE_INTEGRITY_ERROR; + } + if (store_err != AMDUAT_ASL_STORE_ERR_NOT_FOUND) { + return AMDUAT_FED_RESOLVE_STORE_ERROR; + } + + if (amduat_fed_view_policy_denied(view, ref)) { + return AMDUAT_FED_RESOLVE_POLICY_DENIED; + } + + record = amduat_fed_view_lookup(view, ref); + if (record == NULL) { + return AMDUAT_FED_RESOLVE_NOT_FOUND; + } + if (record->meta.domain_id != view->local_domain_id) { + return AMDUAT_FED_RESOLVE_FOUND_REMOTE_NO_BYTES; + } + return AMDUAT_FED_RESOLVE_NOT_FOUND; +} diff --git a/tests/fed/test_fed_view.c b/tests/fed/test_fed_view.c new file mode 100644 index 0000000..e4d504a --- /dev/null +++ b/tests/fed/test_fed_view.c @@ -0,0 +1,186 @@ +#include "amduat/fed/view.h" + +#include +#include +#include + +typedef struct { + amduat_reference_t ref; + amduat_artifact_t artifact; +} stub_store_entry_t; + +typedef struct { + amduat_asl_store_config_t config; + stub_store_entry_t entries[4]; + size_t entries_len; +} stub_store_t; + +static void stub_store_init(stub_store_t *store) { + memset(store, 0, sizeof(*store)); + store->config.hash_id = 1; +} + +static amduat_asl_store_error_t stub_store_get(void *ctx, + amduat_reference_t ref, + amduat_artifact_t *out) { + stub_store_t *store = (stub_store_t *)ctx; + size_t i; + + for (i = 0; i < store->entries_len; ++i) { + if (amduat_reference_eq(store->entries[i].ref, ref)) { + amduat_artifact_t src = store->entries[i].artifact; + uint8_t *payload = NULL; + if (src.bytes.len != 0u) { + payload = (uint8_t *)malloc(src.bytes.len); + if (payload == NULL) { + return AMDUAT_ASL_STORE_ERR_IO; + } + memcpy(payload, src.bytes.data, src.bytes.len); + } + out->bytes = amduat_octets(payload, src.bytes.len); + out->has_type_tag = src.has_type_tag; + out->type_tag = src.type_tag; + return AMDUAT_ASL_STORE_OK; + } + } + + return AMDUAT_ASL_STORE_ERR_NOT_FOUND; +} + +static amduat_reference_t make_ref(amduat_hash_id_t hash_id, + const uint8_t *bytes, + size_t len) { + return amduat_reference(hash_id, amduat_octets(bytes, len)); +} + +static amduat_fed_record_t make_record(uint32_t domain_id, + uint8_t visibility, + uint64_t logseq, + amduat_reference_t ref) { + amduat_fed_record_t record; + + memset(&record, 0, sizeof(record)); + record.meta.domain_id = domain_id; + record.meta.visibility = visibility; + record.meta.has_source = 0; + record.id.type = AMDUAT_FED_REC_ARTIFACT; + record.id.ref = ref; + record.logseq = logseq; + record.snapshot_id = 1; + record.log_prefix = 10; + return record; +} + +static int test_view_and_resolve(void) { + uint8_t a_bytes[] = {0x01}; + uint8_t b_bytes[] = {0x02}; + uint8_t c_bytes[] = {0x03}; + uint8_t d_bytes[] = {0x04}; + amduat_reference_t ref_a = make_ref(1, a_bytes, sizeof(a_bytes)); + amduat_reference_t ref_b = make_ref(1, b_bytes, sizeof(b_bytes)); + amduat_reference_t ref_c = make_ref(1, c_bytes, sizeof(c_bytes)); + amduat_reference_t ref_d = make_ref(1, d_bytes, sizeof(d_bytes)); + amduat_fed_record_t records[4]; + amduat_fed_view_bounds_t bounds[2]; + amduat_fed_view_t view; + amduat_fed_policy_deny_t denies[1]; + stub_store_t stub; + amduat_asl_store_ops_t ops; + amduat_asl_store_t store; + amduat_artifact_t artifact; + + records[0] = make_record(1, 0, 1, ref_a); + records[1] = make_record(1, 1, 2, ref_b); + records[2] = make_record(2, 1, 1, ref_c); + records[3] = make_record(2, 0, 1, ref_d); + + bounds[0].domain_id = 1; + bounds[0].snapshot_id = 1; + bounds[0].log_prefix = 10; + bounds[1].domain_id = 2; + bounds[1].snapshot_id = 1; + bounds[1].log_prefix = 10; + + denies[0].id.type = AMDUAT_FED_REC_ARTIFACT; + denies[0].id.ref = ref_d; + denies[0].reason_code = 0; + + if (!amduat_fed_view_build(records, 4, 1, bounds, 2, denies, 1, &view)) { + fprintf(stderr, "view build failed\n"); + return 1; + } + if (view.len != 3) { + fprintf(stderr, "view size mismatch\n"); + amduat_fed_view_free(&view); + return 1; + } + + stub_store_init(&stub); + stub.entries[0].ref = ref_a; + stub.entries[0].artifact = amduat_artifact(amduat_octets(a_bytes, + sizeof(a_bytes))); + stub.entries_len = 1; + amduat_asl_store_ops_init(&ops); + ops.get = stub_store_get; + amduat_asl_store_init(&store, stub.config, ops, &stub); + + if (amduat_fed_resolve(&view, &store, ref_a, &artifact) != + AMDUAT_FED_RESOLVE_OK) { + fprintf(stderr, "resolve local failed\n"); + amduat_fed_view_free(&view); + return 1; + } + amduat_artifact_free(&artifact); + + if (amduat_fed_resolve(&view, &store, ref_c, &artifact) != + AMDUAT_FED_RESOLVE_FOUND_REMOTE_NO_BYTES) { + fprintf(stderr, "resolve remote mismatch\n"); + amduat_fed_view_free(&view); + return 1; + } + + if (amduat_fed_resolve(&view, &store, ref_d, &artifact) != + AMDUAT_FED_RESOLVE_POLICY_DENIED) { + fprintf(stderr, "resolve policy denied mismatch\n"); + amduat_fed_view_free(&view); + return 1; + } + + amduat_fed_view_free(&view); + return 0; +} + +static int test_view_conflict(void) { + uint8_t key[] = {0x09}; + amduat_reference_t ref = make_ref(1, key, sizeof(key)); + amduat_fed_record_t records[2]; + amduat_fed_view_bounds_t bounds[2]; + amduat_fed_view_t view; + + records[0] = make_record(1, 1, 1, ref); + records[1] = make_record(2, 1, 1, ref); + + bounds[0].domain_id = 1; + bounds[0].snapshot_id = 1; + bounds[0].log_prefix = 10; + bounds[1].domain_id = 2; + bounds[1].snapshot_id = 1; + bounds[1].log_prefix = 10; + + if (amduat_fed_view_build(records, 2, 1, bounds, 2, NULL, 0, &view)) { + fprintf(stderr, "expected conflict\n"); + amduat_fed_view_free(&view); + return 1; + } + return 0; +} + +int main(void) { + if (test_view_and_resolve() != 0) { + return 1; + } + if (test_view_conflict() != 0) { + return 1; + } + return 0; +}