federation

This commit is contained in:
Carl Niklas Rydberg 2026-01-21 19:51:26 +01:00
parent 275c0b8345
commit a4b501e48d
14 changed files with 3229 additions and 30 deletions

View file

@ -7,6 +7,21 @@ set(CMAKE_C_EXTENSIONS OFF)
add_subdirectory(vendor/amduat)
add_library(amduat_federation
federation/coord.c
federation/transport_stub.c
federation/transport_unix.c
)
target_include_directories(amduat_federation
PUBLIC ${CMAKE_CURRENT_SOURCE_DIR}
PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}/vendor/amduat/include
)
target_link_libraries(amduat_federation
PRIVATE amduat_asl amduat_enc amduat_util amduat_fed
)
add_executable(amduatd src/amduatd.c)
target_include_directories(amduatd
@ -16,5 +31,5 @@ target_include_directories(amduatd
target_link_libraries(amduatd
PRIVATE amduat_tgk amduat_pel amduat_format amduat_asl_store_fs amduat_asl
amduat_enc amduat_hash_asl1 amduat_util
amduat_enc amduat_hash_asl1 amduat_util amduat_federation
)

View file

@ -141,6 +141,9 @@ curl --unix-socket amduatd.sock 'http://localhost/v1/artifacts/<ref>?format=info
- `GET /v1/contract` → contract bytes (JSON) (+ `X-Amduat-Contract-Ref` header)
- `GET /v1/contract?format=ref``{ref}`
- `GET /v1/ui` → browser UI for authoring/running programs
- `GET /v1/fed/records?domain_id=...&from_logseq=...&limit=...``{domain_id, snapshot_id, log_prefix, next_logseq, records[]}` (published artifacts + tombstones + PER + TGK edges)
- `GET /v1/fed/artifacts/{ref}` → raw bytes for federation resolve
- `GET /v1/fed/status``{status, domain_id, registry_ref, last_tick_ms}`
- `POST /v1/artifacts`
- raw bytes: `Content-Type: application/octet-stream` (+ optional `X-Amduat-Type-Tag: 0x...`)
- artifact framing: `Content-Type: application/vnd.amduat.asl.artifact+v1`
@ -156,10 +159,85 @@ curl --unix-socket amduatd.sock 'http://localhost/v1/artifacts/<ref>?format=info
- `GET /v1/resolve/{name}``{ref}` (latest published)
- `POST /v1/pel/run`
- request: `{program_ref, input_refs[], params_ref?, scheme_ref?}` (`program_ref`/`input_refs`/`params_ref` accept hex refs or concept names; omit `scheme_ref` to use `dag`)
- response: `{result_ref, trace_ref?, output_refs[], status}`
- request receipt (optional): `{receipt:{input_manifest_ref, environment_ref, evaluator_id, executor_ref, started_at, completed_at, sbom_ref?, parity_digest_hex?, executor_fingerprint_ref?, run_id_hex?, limits?, logs?, determinism_level?, rng_seed_hex?, signature_hex?}}`
- response: `{result_ref, trace_ref?, receipt_ref?, output_refs[], status}`
- `POST /v1/pel/programs`
- request: authoring JSON for `PEL/PROGRAM-DAG/1` (kernel ops only; `params_hex` is raw hex bytes)
- response: `{program_ref}`
- `POST /v1/context_frames`
Receipt example (with v1.1 fields):
```json
{
"program_ref": "ab12...",
"input_refs": ["cd34..."],
"receipt": {
"input_manifest_ref": "ef56...",
"environment_ref": "7890...",
"evaluator_id": "local-amduatd",
"executor_ref": "1122...",
"started_at": 1712345678,
"completed_at": 1712345688,
"executor_fingerprint_ref": "3344...",
"run_id_hex": "deadbeef",
"limits": {
"cpu_ms": 12,
"wall_ms": 20,
"max_rss_kib": 1024,
"io_reads": 1,
"io_writes": 0
},
"logs": [
{"kind": 1, "log_ref": "5566...", "sha256_hex": "aabbcc"}
],
"determinism_level": 2,
"rng_seed_hex": "010203",
"signature_hex": "bead"
}
}
```
Federation records example:
```bash
curl --unix-socket amduatd.sock \
'http://localhost/v1/fed/records?domain_id=1&from_logseq=0&limit=256'
```
```json
{
"domain_id": 1,
"snapshot_id": 42,
"log_prefix": 1234,
"next_logseq": 120,
"records": [
{
"domain_id": 1,
"type": 0,
"ref": "ab12...",
"logseq": 100,
"snapshot_id": 42,
"log_prefix": 1234,
"visibility": 1,
"has_source": false,
"source_domain": 0
}
]
}
```
Response example:
```json
{
"result_ref": "aa11...",
"trace_ref": "bb22...",
"receipt_ref": "cc33...",
"output_refs": ["dd44..."],
"status": "OK"
}
```
## Notes

View file

@ -0,0 +1,355 @@
# Federation Coordinator Middle Layer Spec
This document specifies the middle-layer coordinator that orchestrates federation
above the core C substrate. It remains transport- and policy-aware while keeping
core semantics unchanged.
## Scope and Goals
- Maintain per-domain replay bounds and an admitted set.
- Fetch and ingest published records from remotes.
- Build federation views via core APIs.
- Resolve missing bytes by fetching artifacts into a cache store.
- Keep storage layout private (no extents or blocks exposed).
- Align with tier1 federation semantics and replay determinism.
- Federation view is the union of admitted domains regardless of topology.
Non-goals:
- Re-implement core semantics in this layer.
- Introduce a single global snapshot ID (federation is per-domain).
## Core Dependencies
The coordinator calls into vendor core APIs (`vendor/amduat/include/amduat/fed/*`)
and MUST stay aligned with their signatures:
- `amduat_fed_registry_*` (persist per-domain admission state)
- `amduat_fed_ingest_validate` (record validation + conflict detection)
- `amduat_fed_replay_build` (deterministic replay per domain)
- `amduat_fed_view_build` + `amduat_fed_resolve` (build view and resolve ref-only)
Core expects per-domain replay bounds `{domain_id, snapshot_id, log_prefix}` and
does not handle transport, auth, caching policy, or remote fetch.
Alignment note: the daemon-layer API in this repo still needs updates to match
current vendor core types and signatures. This spec reflects vendor headers as
the source of truth.
Tier1 alignment (normative):
- `ASL/FEDERATION/1`
- `ASL/FEDERATION-REPLAY/1`
- `ASL/DOMAIN-MODEL/1`
- `ASL/POLICY-HASH/1`
- `ENC/ASL-CORE-INDEX/1` (canonical in `vendor/amduat/tier1/enc-asl-core-index-1.md`)
## Data Structures
### Registry State (per domain, persisted)
```
struct amduat_fed_domain_state {
uint32_t domain_id;
uint64_t snapshot_id;
uint64_t log_prefix;
uint64_t last_logseq;
uint8_t admitted;
uint8_t policy_ok;
uint8_t reserved[6];
amduat_hash_id_t policy_hash_id;
amduat_octets_t policy_hash; /* Empty when unknown; caller-owned bytes. */
};
```
Registry bytes are stored via `amduat_fed_registry_store_*` in the local ASL
store; the coordinator owns admission workflows and policy compatibility checks.
### Admitted Set (in-memory)
```
struct amduat_fed_admitted_set {
uint32_t *domain_ids; // sorted, unique
size_t len;
};
```
The admitted set is derived from registry entries with `admitted != 0` and
`policy_ok != 0`.
### Snapshot Vector (per view build)
```
struct amduat_fed_snapshot_vector {
amduat_fed_view_bounds_t *bounds;
size_t len;
uint64_t vector_epoch;
};
```
### Record Staging (per fetch batch)
```
struct amduat_fed_record {
amduat_fed_record_meta_t meta;
amduat_fed_record_id_t id;
uint64_t logseq;
uint64_t snapshot_id;
uint64_t log_prefix;
};
```
Records MUST carry the fields required by `ASL/FEDERATION-REPLAY/1`, and replay
ordering MUST be deterministic (sort by `logseq`, then canonical identity).
Record metadata includes visibility and optional cross-domain source identity.
### View and Policy Denies
```
struct amduat_fed_view_bounds {
uint32_t domain_id;
uint64_t snapshot_id;
uint64_t log_prefix;
};
struct amduat_fed_policy_deny {
amduat_fed_record_id_t id;
uint32_t reason_code;
};
```
### Fetch Backlog / Retry State
```
struct amduat_fed_fetch_state {
uint32_t domain_id;
uint64_t next_snapshot_id;
uint64_t next_log_prefix;
uint64_t next_logseq;
uint64_t backoff_ms;
uint64_t last_attempt_ms;
uint32_t consecutive_failures;
};
```
### Cache Store Metadata (optional)
```
struct amduat_fed_cache_policy {
bool enabled;
uint64_t max_bytes;
uint64_t used_bytes;
uint32_t ttl_seconds;
uint32_t prefetch_depth;
};
```
## Coordinator Interfaces
### Configuration
```
struct amduat_fed_coord_cfg {
const char *registry_path;
amduat_asl_store_t *authoritative_store;
amduat_asl_store_t *cache_store; // optional
amduat_asl_store_t *session_store; // optional
amduat_fed_transport transport;
amduat_fed_cache_policy cache_policy;
amduat_fed_policy_hooks policy_hooks;
};
```
### Lifecycle and Operations
```
int amduat_fed_coord_open(
const struct amduat_fed_coord_cfg *cfg,
struct amduat_fed_coord **out);
int amduat_fed_coord_close(struct amduat_fed_coord *c);
int amduat_fed_coord_load_registry(struct amduat_fed_coord *c);
int amduat_fed_coord_set_admitted(
struct amduat_fed_coord *c,
uint32_t domain_id,
bool admitted);
int amduat_fed_coord_tick(struct amduat_fed_coord *c, uint64_t now_ms);
int amduat_fed_coord_resolve(
struct amduat_fed_coord *c,
amduat_reference_t ref,
amduat_artifact_t *out);
```
## API Status
Planned coordinator surface (not yet implemented):
- `amduat_fed_coord_open`
- `amduat_fed_coord_close`
- `amduat_fed_coord_load_registry`
- `amduat_fed_coord_set_admitted`
- `amduat_fed_coord_tick`
- `amduat_fed_coord_resolve`
Implemented in core (already available):
- `amduat_fed_registry_*`
- `amduat_fed_ingest_validate`
- `amduat_fed_replay_build`
- `amduat_fed_view_build`
- `amduat_fed_resolve`
## Transport Abstraction
Minimal interface that hides protocol, auth, and topology:
```
struct amduat_fed_transport {
int (*get_records)(
void *ctx, uint32_t domain_id,
uint64_t snapshot_id, uint64_t log_prefix,
uint64_t from_logseq,
amduat_fed_record_iter *out_iter);
int (*get_artifact)(
void *ctx, amduat_reference_t ref,
amduat_sink *out_sink);
};
```
Transport MUST return records that can be validated with `amduat_fed_record_validate`
and MUST provide all fields required by `ASL/FEDERATION-REPLAY/1`.
Transport MUST NOT surface internal-only records from foreign domains.
## Storage and Encodings
- The coordinator stores records/artifacts via ASL store APIs and does not touch
segment layouts or extents directly.
- Federation metadata in index records is encoded by core per
`ENC/ASL-CORE-INDEX/1`; the coordinator must not override it.
- Cache store semantics are best-effort and do not affect authoritative state.
## Policies
- Admission is per-domain and controlled via registry entries and `policy_ok`.
- Policy compatibility uses `policy_hash_id` + `policy_hash` (ASL/POLICY-HASH/1).
- `policy_ok` is computed during admission by comparing local policy hash to the
remote domain's published policy hash.
- Admission and policy compatibility MUST be enforced before any foreign state is
admitted into the federation view.
- Per-record filtering, if used, MUST be deterministic and SHOULD be expressed
as policy denies passed to `amduat_fed_view_build` rather than by dropping
records before validation.
- Cache write policy is middle-layer only (fetch-on-miss, optional prefetch).
- Eviction is local (LRU or segmented queues) and must not leak layout.
- Conflict policy: reject on identity collision with differing metadata and keep
bounds stable until operator intervention (ASL/FEDERATION-REPLAY/1).
## Sequencing and Consistency
- Deterministic views require a stable snapshot vector per build.
- Bounds advance only after successful ingest; they never move backwards.
- Before validation, records are ordered by `(logseq, canonical identity)` and
filtered to `logseq <= log_prefix`.
- Tombstones and shadowing apply only within the source domain.
- Use `vector_epoch` to swap snapshot vectors atomically after a build.
- Persist registry updates atomically via `amduat_fed_registry_store_put` before
swapping the snapshot vector.
- If a remote retracts or regresses, keep local bounds and mark domain degraded.
## Core Flow
### Startup
- Load registry.
- Derive admitted set from `admitted != 0` and `policy_ok != 0`.
- Build initial snapshot vector.
### Periodic Tick
- For each admitted domain, fetch records up to bound and validate.
- Update `last_logseq`, and advance bounds if remote snapshot moves forward.
- Build federation view using `amduat_fed_view_build` over the staged records,
with optional policy denies derived from coordinator hooks.
- Swap snapshot vector atomically after registry updates.
### Resolve Loop
- Call `amduat_fed_resolve` against the latest built `amduat_fed_view_t`.
- If `AMDUAT_FED_RESOLVE_FOUND_REMOTE_NO_BYTES`, fetch bytes via transport into cache store.
- Retry resolve after cache write completes.
## Example Tick Pseudocode
```
int amduat_fed_coord_tick(struct amduat_fed_coord *c, uint64_t now_ms) {
amduat_fed_snapshot_vector vec = build_snapshot_vector(c);
clear(staged_records_all);
build_policy_denies(&policy_denies, &policy_denies_len);
for each domain in vec.bounds:
bound = &vec.bounds[i];
if !admitted(bound->domain_id) continue;
if backoff_active(bound->domain_id, now_ms) continue;
clear(staged_records);
iter = transport.get_records(
bound->domain_id,
bound->snapshot_id,
bound->log_prefix,
state.next_logseq);
while iter.next(record):
if !amduat_fed_record_validate(&record):
mark_domain_error(bound->domain_id);
break;
append(staged_records, record);
sort_by_logseq_then_id(staged_records);
clamp_to_log_prefix(staged_records, bound->log_prefix);
rc = amduat_fed_ingest_validate(
staged_records, staged_len, &err_index, &conflict_index);
if rc == AMDUAT_FED_INGEST_ERR_CONFLICT:
mark_domain_error(bound->domain_id);
continue;
if rc == AMDUAT_FED_INGEST_ERR_INVALID:
mark_domain_error(bound->domain_id);
continue;
update_registry_bounds(bound->domain_id, state);
append_all(staged_records_all, staged_records);
amduat_fed_view_build(
staged_records_all, staged_len_all,
local_domain_id, vec.bounds, vec.len,
policy_denies, policy_denies_len,
&view);
swap_snapshot_vector(c, build_snapshot_vector(c));
return 0;
}
```
## Example Resolve Pseudocode
```
int amduat_fed_coord_resolve(
struct amduat_fed_coord *c,
amduat_reference_t ref,
amduat_artifact_t *out) {
view = c->last_view;
rc = amduat_fed_resolve(&view, c->authoritative_store, ref, out);
if (rc == AMDUAT_FED_RESOLVE_FOUND_REMOTE_NO_BYTES && c->cache_store) {
rc = transport.get_artifact(ref, sink_for_cache(c->cache_store));
if (rc == 0) {
rc = amduat_fed_resolve(&view, c->authoritative_store, ref, out);
}
}
return rc;
}
```
## Coordinator Wiring Example
```
amduat_fed_transport_unix_t unix_transport;
amduat_fed_coord_cfg_t cfg;
amduat_fed_coord_t *coord = NULL;
amduat_fed_transport_unix_init(&unix_transport, "amduatd.sock");
memset(&cfg, 0, sizeof(cfg));
cfg.local_domain_id = 1;
cfg.authoritative_store = &store;
cfg.cache_store = &cache_store;
cfg.transport = amduat_fed_transport_unix_ops(&unix_transport);
if (amduat_fed_coord_open(&cfg, &coord) == AMDUAT_FED_COORD_OK) {
amduat_fed_coord_tick(coord, now_ms);
amduat_fed_coord_resolve(coord, some_ref, &artifact);
amduat_fed_coord_close(coord);
}
```

584
federation/coord.c Normal file
View file

@ -0,0 +1,584 @@
#include "federation/coord.h"
#include "amduat/fed/ingest.h"
#include "amduat/asl/artifact_io.h"
#include <stdlib.h>
#include <string.h>
struct amduat_fed_coord {
amduat_fed_coord_cfg_t cfg;
amduat_fed_registry_value_t registry;
bool registry_loaded;
amduat_fed_view_t last_view;
bool has_view;
uint64_t last_tick_ms;
};
static bool amduat_fed_coord_has_registry_ref(amduat_reference_t ref) {
return ref.hash_id != 0 && ref.digest.data != NULL && ref.digest.len != 0;
}
static int amduat_fed_coord_ref_cmp(amduat_reference_t a, amduat_reference_t b) {
size_t min_len;
int cmp;
if (a.hash_id != b.hash_id) {
return (int)a.hash_id - (int)b.hash_id;
}
if (a.digest.len != b.digest.len) {
return a.digest.len < b.digest.len ? -1 : 1;
}
min_len = a.digest.len;
if (min_len == 0) {
return 0;
}
cmp = memcmp(a.digest.data, b.digest.data, min_len);
if (cmp != 0) {
return cmp;
}
return 0;
}
static int amduat_fed_coord_record_cmp(const void *lhs, const void *rhs) {
const amduat_fed_record_t *a = (const amduat_fed_record_t *)lhs;
const amduat_fed_record_t *b = (const amduat_fed_record_t *)rhs;
if (a->logseq != b->logseq) {
return a->logseq < b->logseq ? -1 : 1;
}
if (a->id.type != b->id.type) {
return (int)a->id.type - (int)b->id.type;
}
return amduat_fed_coord_ref_cmp(a->id.ref, b->id.ref);
}
static bool amduat_fed_coord_record_clone(const amduat_fed_record_t *src,
amduat_fed_record_t *out) {
if (src == NULL || out == NULL) {
return false;
}
*out = *src;
if (!amduat_reference_clone(src->id.ref, &out->id.ref)) {
return false;
}
return true;
}
static void amduat_fed_coord_record_free(amduat_fed_record_t *rec) {
if (rec == NULL) {
return;
}
amduat_reference_free(&rec->id.ref);
}
static void amduat_fed_coord_free_batch(amduat_fed_coord_t *coord,
amduat_fed_record_t *batch,
size_t batch_len) {
if (coord == NULL || batch == NULL) {
return;
}
if (coord->cfg.transport.free_records != NULL) {
coord->cfg.transport.free_records(coord->cfg.transport.ctx,
batch,
batch_len);
}
}
static amduat_fed_domain_state_t *amduat_fed_coord_find_state(
amduat_fed_registry_value_t *registry,
uint32_t domain_id) {
size_t i;
if (registry == NULL) {
return NULL;
}
for (i = 0; i < registry->len; ++i) {
if (registry->states[i].domain_id == domain_id) {
return &registry->states[i];
}
}
return NULL;
}
static bool amduat_fed_coord_records_push(amduat_fed_record_t **records,
size_t *len,
size_t *cap,
amduat_fed_record_t value) {
amduat_fed_record_t *next;
size_t next_cap;
if (records == NULL || len == NULL || cap == NULL) {
return false;
}
if (*len == *cap) {
next_cap = *cap != 0 ? *cap * 2u : 64u;
next = (amduat_fed_record_t *)realloc(*records,
next_cap * sizeof(*next));
if (next == NULL) {
return false;
}
*records = next;
*cap = next_cap;
}
(*records)[(*len)++] = value;
return true;
}
static bool amduat_fed_coord_denies_push(amduat_fed_policy_deny_t **denies,
size_t *len,
size_t *cap,
amduat_fed_policy_deny_t value) {
amduat_fed_policy_deny_t *next;
size_t next_cap;
if (denies == NULL || len == NULL || cap == NULL) {
return false;
}
if (*len == *cap) {
next_cap = *cap != 0 ? *cap * 2u : 32u;
next = (amduat_fed_policy_deny_t *)realloc(*denies,
next_cap * sizeof(*next));
if (next == NULL) {
return false;
}
*denies = next;
*cap = next_cap;
}
(*denies)[(*len)++] = value;
return true;
}
amduat_fed_coord_error_t amduat_fed_coord_open(
const amduat_fed_coord_cfg_t *cfg,
amduat_fed_coord_t **out_coord) {
amduat_fed_coord_t *coord = NULL;
if (out_coord == NULL) {
return AMDUAT_FED_COORD_ERR_INVALID;
}
*out_coord = NULL;
if (cfg == NULL || cfg->authoritative_store == NULL) {
return AMDUAT_FED_COORD_ERR_INVALID;
}
coord = (amduat_fed_coord_t *)calloc(1, sizeof(*coord));
if (coord == NULL) {
return AMDUAT_FED_COORD_ERR_OOM;
}
coord->cfg = *cfg;
if (amduat_fed_coord_has_registry_ref(cfg->registry_ref)) {
if (!amduat_reference_clone(cfg->registry_ref, &coord->cfg.registry_ref)) {
free(coord);
return AMDUAT_FED_COORD_ERR_OOM;
}
} else {
coord->cfg.registry_ref = amduat_reference(0u, amduat_octets(NULL, 0u));
}
amduat_fed_registry_value_init(&coord->registry, NULL, 0);
coord->registry_loaded = false;
memset(&coord->last_view, 0, sizeof(coord->last_view));
coord->has_view = false;
coord->last_tick_ms = 0;
*out_coord = coord;
return AMDUAT_FED_COORD_OK;
}
amduat_fed_coord_error_t amduat_fed_coord_close(amduat_fed_coord_t *coord) {
if (coord == NULL) {
return AMDUAT_FED_COORD_ERR_INVALID;
}
if (coord->has_view) {
amduat_fed_view_free(&coord->last_view);
}
amduat_fed_registry_value_free(&coord->registry);
amduat_reference_free(&coord->cfg.registry_ref);
free(coord);
return AMDUAT_FED_COORD_OK;
}
amduat_fed_coord_error_t amduat_fed_coord_load_registry(
amduat_fed_coord_t *coord) {
amduat_fed_registry_store_t store;
amduat_fed_registry_value_t value;
amduat_asl_store_error_t store_err = AMDUAT_ASL_STORE_OK;
amduat_fed_registry_error_t err;
if (coord == NULL) {
return AMDUAT_FED_COORD_ERR_INVALID;
}
if (!amduat_fed_coord_has_registry_ref(coord->cfg.registry_ref)) {
return AMDUAT_FED_COORD_ERR_INVALID;
}
amduat_fed_registry_store_init(&store, coord->cfg.authoritative_store);
amduat_fed_registry_value_init(&value, NULL, 0);
err = amduat_fed_registry_store_get(&store,
coord->cfg.registry_ref,
&value,
&store_err);
if (err == AMDUAT_FED_REGISTRY_ERR_CODEC) {
amduat_fed_registry_value_free(&value);
return AMDUAT_FED_COORD_ERR_CODEC;
}
if (err != AMDUAT_FED_REGISTRY_OK || store_err != AMDUAT_ASL_STORE_OK) {
amduat_fed_registry_value_free(&value);
return AMDUAT_FED_COORD_ERR_STORE;
}
amduat_fed_registry_value_free(&coord->registry);
coord->registry = value;
coord->registry_loaded = true;
return AMDUAT_FED_COORD_OK;
}
amduat_fed_coord_error_t amduat_fed_coord_set_admitted(
amduat_fed_coord_t *coord,
uint32_t domain_id,
bool admitted) {
size_t i;
amduat_fed_domain_state_t state;
if (coord == NULL) {
return AMDUAT_FED_COORD_ERR_INVALID;
}
for (i = 0; i < coord->registry.len; ++i) {
if (coord->registry.states[i].domain_id == domain_id) {
coord->registry.states[i].admitted = admitted ? 1u : 0u;
if (admitted && coord->registry.states[i].policy_ok == 0u) {
coord->registry.states[i].policy_ok = 1u;
}
return AMDUAT_FED_COORD_OK;
}
}
memset(&state, 0, sizeof(state));
state.domain_id = domain_id;
state.admitted = admitted ? 1u : 0u;
state.policy_ok = admitted ? 1u : 0u;
state.policy_hash_id = 0;
state.policy_hash = amduat_octets(NULL, 0u);
if (!amduat_fed_registry_value_insert(&coord->registry, state)) {
return AMDUAT_FED_COORD_ERR_OOM;
}
return AMDUAT_FED_COORD_OK;
}
amduat_fed_coord_error_t amduat_fed_coord_tick(
amduat_fed_coord_t *coord,
uint64_t now_ms) {
amduat_fed_view_bounds_t *bounds = NULL;
size_t bounds_len = 0;
size_t bounds_cap = 0;
amduat_fed_record_t *records = NULL;
size_t records_len = 0;
size_t records_cap = 0;
amduat_fed_policy_deny_t *denies = NULL;
size_t denies_len = 0;
size_t denies_cap = 0;
size_t i;
amduat_fed_coord_error_t status = AMDUAT_FED_COORD_OK;
amduat_fed_registry_store_t reg_store;
amduat_fed_registry_error_t reg_err;
amduat_reference_t new_ref;
amduat_asl_store_error_t store_err = AMDUAT_ASL_STORE_OK;
bool registry_dirty = false;
if (coord == NULL) {
return AMDUAT_FED_COORD_ERR_INVALID;
}
coord->last_tick_ms = now_ms;
if (!coord->registry_loaded) {
if (amduat_fed_coord_has_registry_ref(coord->cfg.registry_ref)) {
status = amduat_fed_coord_load_registry(coord);
if (status != AMDUAT_FED_COORD_OK) {
return status;
}
} else {
coord->registry_loaded = true;
}
}
for (i = 0; i < coord->registry.len; ++i) {
const amduat_fed_domain_state_t *state = &coord->registry.states[i];
bool policy_ok = state->policy_ok != 0u ||
amduat_octets_is_empty(state->policy_hash);
if (state->admitted == 0u || !policy_ok) {
continue;
}
if (bounds_len == bounds_cap) {
size_t next_cap = bounds_cap != 0 ? bounds_cap * 2u : 8u;
amduat_fed_view_bounds_t *next =
(amduat_fed_view_bounds_t *)realloc(
bounds, next_cap * sizeof(*next));
if (next == NULL) {
status = AMDUAT_FED_COORD_ERR_OOM;
goto tick_cleanup;
}
bounds = next;
bounds_cap = next_cap;
}
bounds[bounds_len].domain_id = state->domain_id;
bounds[bounds_len].snapshot_id = state->snapshot_id;
bounds[bounds_len].log_prefix = state->log_prefix;
bounds_len++;
}
for (i = 0; i < bounds_len; ++i) {
amduat_fed_view_bounds_t *bound = &bounds[i];
amduat_fed_domain_state_t *state =
amduat_fed_coord_find_state(&coord->registry, bound->domain_id);
amduat_fed_record_t *batch = NULL;
size_t batch_len = 0;
size_t j;
uint64_t max_logseq = 0;
int transport_rc;
if (state == NULL) {
status = AMDUAT_FED_COORD_ERR_INVALID;
goto tick_cleanup;
}
if (coord->cfg.transport.get_records == NULL) {
status = AMDUAT_FED_COORD_ERR_INVALID;
goto tick_cleanup;
}
transport_rc = coord->cfg.transport.get_records(
coord->cfg.transport.ctx,
bound->domain_id,
bound->snapshot_id,
bound->log_prefix,
state->last_logseq + 1u,
&batch,
&batch_len);
if (transport_rc != 0) {
status = AMDUAT_FED_COORD_ERR_STORE;
goto tick_cleanup;
}
if (batch == NULL && batch_len != 0) {
status = AMDUAT_FED_COORD_ERR_INVALID;
goto tick_cleanup;
}
for (j = 0; j < batch_len; ++j) {
amduat_fed_record_t cloned;
amduat_fed_policy_deny_t deny;
amduat_reference_t deny_ref;
bool allowed = true;
bool ok = amduat_fed_coord_record_clone(&batch[j], &cloned);
if (!ok) {
status = AMDUAT_FED_COORD_ERR_OOM;
goto tick_cleanup;
}
if (cloned.logseq > bound->log_prefix) {
amduat_fed_coord_record_free(&cloned);
continue;
}
if (!amduat_fed_record_validate(&cloned)) {
amduat_fed_coord_record_free(&cloned);
status = AMDUAT_FED_COORD_ERR_INVALID;
amduat_fed_coord_free_batch(coord, batch, batch_len);
goto tick_cleanup;
}
if (coord->cfg.policy_hooks.record_allowed != NULL) {
memset(&deny, 0, sizeof(deny));
allowed = coord->cfg.policy_hooks.record_allowed(
coord->cfg.policy_hooks.ctx, &cloned, &deny);
if (!allowed) {
if (deny.id.ref.digest.data == NULL && deny.id.ref.hash_id == 0u) {
deny.id = cloned.id;
}
deny_ref = amduat_reference(0u, amduat_octets(NULL, 0u));
if (!amduat_reference_clone(deny.id.ref, &deny_ref)) {
amduat_fed_coord_record_free(&cloned);
status = AMDUAT_FED_COORD_ERR_OOM;
amduat_fed_coord_free_batch(coord, batch, batch_len);
goto tick_cleanup;
}
deny.id.ref = deny_ref;
if (!amduat_fed_coord_denies_push(&denies,
&denies_len,
&denies_cap,
deny)) {
amduat_reference_free(&deny.id.ref);
amduat_fed_coord_record_free(&cloned);
status = AMDUAT_FED_COORD_ERR_OOM;
amduat_fed_coord_free_batch(coord, batch, batch_len);
goto tick_cleanup;
}
}
}
if (!amduat_fed_coord_records_push(&records,
&records_len,
&records_cap,
cloned)) {
amduat_fed_coord_record_free(&cloned);
status = AMDUAT_FED_COORD_ERR_OOM;
amduat_fed_coord_free_batch(coord, batch, batch_len);
goto tick_cleanup;
}
if (cloned.logseq > max_logseq) {
max_logseq = cloned.logseq;
}
}
amduat_fed_coord_free_batch(coord, batch, batch_len);
if (max_logseq > state->last_logseq) {
state->last_logseq = max_logseq;
registry_dirty = true;
}
}
if (records_len != 0) {
size_t err_index = 0;
size_t conflict_index = 0;
amduat_fed_ingest_error_t ingest_rc;
qsort(records, records_len, sizeof(*records), amduat_fed_coord_record_cmp);
ingest_rc = amduat_fed_ingest_validate(records,
records_len,
&err_index,
&conflict_index);
if (ingest_rc != AMDUAT_FED_INGEST_OK) {
status = AMDUAT_FED_COORD_ERR_INVALID;
goto tick_cleanup;
}
}
if (coord->has_view) {
amduat_fed_view_free(&coord->last_view);
coord->has_view = false;
}
if (bounds_len != 0) {
amduat_fed_view_error_t view_rc;
view_rc = amduat_fed_view_build(records,
records_len,
coord->cfg.local_domain_id,
bounds,
bounds_len,
denies,
denies_len,
&coord->last_view);
if (view_rc != AMDUAT_FED_VIEW_OK) {
status = AMDUAT_FED_COORD_ERR_INVALID;
goto tick_cleanup;
}
coord->has_view = true;
}
if (registry_dirty) {
amduat_fed_registry_store_init(&reg_store, coord->cfg.authoritative_store);
reg_err = amduat_fed_registry_store_put(&reg_store,
&coord->registry,
&new_ref,
&store_err);
if (reg_err != AMDUAT_FED_REGISTRY_OK ||
store_err != AMDUAT_ASL_STORE_OK) {
status = AMDUAT_FED_COORD_ERR_STORE;
goto tick_cleanup;
}
amduat_reference_free(&coord->cfg.registry_ref);
coord->cfg.registry_ref = new_ref;
}
tick_cleanup:
if (bounds != NULL) {
free(bounds);
}
if (records != NULL) {
for (i = 0; i < records_len; ++i) {
amduat_fed_coord_record_free(&records[i]);
}
free(records);
}
if (denies != NULL) {
for (i = 0; i < denies_len; ++i) {
amduat_reference_free(&denies[i].id.ref);
}
free(denies);
}
return status;
}
amduat_fed_coord_error_t amduat_fed_coord_resolve(
amduat_fed_coord_t *coord,
amduat_reference_t ref,
amduat_artifact_t *out_artifact) {
amduat_fed_resolve_error_t rc;
if (coord == NULL || out_artifact == NULL) {
return AMDUAT_FED_COORD_ERR_INVALID;
}
if (!coord->has_view) {
return AMDUAT_FED_COORD_ERR_INVALID;
}
rc = amduat_fed_resolve(&coord->last_view,
coord->cfg.authoritative_store,
ref,
out_artifact);
if (rc == AMDUAT_FED_RESOLVE_OK) {
return AMDUAT_FED_COORD_OK;
}
if (rc == AMDUAT_FED_RESOLVE_FOUND_REMOTE_NO_BYTES &&
coord->cfg.cache_store != NULL &&
coord->cfg.transport.get_artifact != NULL) {
amduat_octets_t bytes = amduat_octets(NULL, 0u);
amduat_artifact_t artifact;
amduat_asl_store_error_t store_err;
int fetch_rc = coord->cfg.transport.get_artifact(
coord->cfg.transport.ctx, ref, &bytes);
if (fetch_rc != 0) {
amduat_octets_free(&bytes);
return AMDUAT_FED_COORD_ERR_STORE;
}
if (!amduat_asl_artifact_from_bytes(bytes,
AMDUAT_ASL_IO_RAW,
false,
amduat_type_tag(0u),
&artifact)) {
amduat_octets_free(&bytes);
return AMDUAT_FED_COORD_ERR_INVALID;
}
store_err = amduat_asl_store_put(coord->cfg.cache_store,
artifact,
&ref);
amduat_asl_artifact_free(&artifact);
amduat_octets_free(&bytes);
if (store_err != AMDUAT_ASL_STORE_OK) {
return AMDUAT_FED_COORD_ERR_STORE;
}
rc = amduat_fed_resolve(&coord->last_view,
coord->cfg.authoritative_store,
ref,
out_artifact);
if (rc == AMDUAT_FED_RESOLVE_OK) {
return AMDUAT_FED_COORD_OK;
}
}
if (rc == AMDUAT_FED_RESOLVE_POLICY_DENIED) {
return AMDUAT_FED_COORD_ERR_INVALID;
}
if (rc == AMDUAT_FED_RESOLVE_NOT_FOUND) {
return AMDUAT_FED_COORD_ERR_INVALID;
}
return AMDUAT_FED_COORD_ERR_STORE;
}
void amduat_fed_coord_get_status(const amduat_fed_coord_t *coord,
amduat_fed_coord_status_t *out_status) {
if (out_status == NULL) {
return;
}
memset(out_status, 0, sizeof(*out_status));
if (coord == NULL) {
return;
}
out_status->domain_id = coord->cfg.local_domain_id;
if (!amduat_reference_clone(coord->cfg.registry_ref,
&out_status->registry_ref)) {
out_status->registry_ref = amduat_reference(0u, amduat_octets(NULL, 0u));
}
out_status->last_tick_ms = coord->last_tick_ms;
}

95
federation/coord.h Normal file
View file

@ -0,0 +1,95 @@
#ifndef AMDUAT_FED_COORD_H
#define AMDUAT_FED_COORD_H
#include "amduat/asl/core.h"
#include "amduat/asl/store.h"
#include "amduat/fed/registry.h"
#include "amduat/fed/view.h"
#include <stdbool.h>
#include <stddef.h>
#include <stdint.h>
#ifdef __cplusplus
extern "C" {
#endif
typedef struct amduat_fed_coord amduat_fed_coord_t;
typedef enum {
AMDUAT_FED_COORD_OK = 0,
AMDUAT_FED_COORD_ERR_INVALID = 1,
AMDUAT_FED_COORD_ERR_OOM = 2,
AMDUAT_FED_COORD_ERR_STORE = 3,
AMDUAT_FED_COORD_ERR_CODEC = 4,
AMDUAT_FED_COORD_ERR_NOT_IMPLEMENTED = 5
} amduat_fed_coord_error_t;
typedef struct {
void *ctx;
int (*get_records)(void *ctx,
uint32_t domain_id,
uint64_t snapshot_id,
uint64_t log_prefix,
uint64_t from_logseq,
amduat_fed_record_t **out_records,
size_t *out_len);
void (*free_records)(void *ctx, amduat_fed_record_t *records, size_t len);
int (*get_artifact)(void *ctx,
amduat_reference_t ref,
amduat_octets_t *out_bytes);
} amduat_fed_transport_t;
typedef struct {
void *ctx;
bool (*record_allowed)(void *ctx,
const amduat_fed_record_t *record,
amduat_fed_policy_deny_t *out_deny);
} amduat_fed_policy_hooks_t;
typedef struct {
uint32_t local_domain_id;
amduat_asl_store_t *authoritative_store;
amduat_asl_store_t *cache_store;
amduat_reference_t registry_ref;
amduat_fed_transport_t transport;
amduat_fed_policy_hooks_t policy_hooks;
} amduat_fed_coord_cfg_t;
typedef struct {
uint32_t domain_id;
amduat_reference_t registry_ref;
uint64_t last_tick_ms;
} amduat_fed_coord_status_t;
amduat_fed_coord_error_t amduat_fed_coord_open(
const amduat_fed_coord_cfg_t *cfg,
amduat_fed_coord_t **out_coord);
amduat_fed_coord_error_t amduat_fed_coord_close(amduat_fed_coord_t *coord);
amduat_fed_coord_error_t amduat_fed_coord_load_registry(
amduat_fed_coord_t *coord);
amduat_fed_coord_error_t amduat_fed_coord_set_admitted(
amduat_fed_coord_t *coord,
uint32_t domain_id,
bool admitted);
amduat_fed_coord_error_t amduat_fed_coord_tick(
amduat_fed_coord_t *coord,
uint64_t now_ms);
amduat_fed_coord_error_t amduat_fed_coord_resolve(
amduat_fed_coord_t *coord,
amduat_reference_t ref,
amduat_artifact_t *out_artifact);
void amduat_fed_coord_get_status(const amduat_fed_coord_t *coord,
amduat_fed_coord_status_t *out_status);
#ifdef __cplusplus
} /* extern "C" */
#endif
#endif /* AMDUAT_FED_COORD_H */

View file

@ -0,0 +1,72 @@
#include "federation/transport_stub.h"
#include <stdlib.h>
#include <string.h>
static int amduat_fed_transport_stub_get_records(void *ctx,
uint32_t domain_id,
uint64_t snapshot_id,
uint64_t log_prefix,
uint64_t from_logseq,
amduat_fed_record_t **out_records,
size_t *out_len) {
(void)ctx;
(void)domain_id;
(void)snapshot_id;
(void)log_prefix;
(void)from_logseq;
if (out_records != NULL) {
*out_records = NULL;
}
if (out_len != NULL) {
*out_len = 0;
}
return 0;
}
static void amduat_fed_transport_stub_free_records(void *ctx,
amduat_fed_record_t *records,
size_t len) {
(void)ctx;
(void)len;
free(records);
}
static int amduat_fed_transport_stub_get_artifact(void *ctx,
amduat_reference_t ref,
amduat_octets_t *out_bytes) {
amduat_fed_transport_stub_t *stub = (amduat_fed_transport_stub_t *)ctx;
(void)ref;
if (out_bytes == NULL || stub == NULL) {
return -1;
}
if (!amduat_octets_clone(stub->artifact_bytes, out_bytes)) {
return -1;
}
return 0;
}
void amduat_fed_transport_stub_init(amduat_fed_transport_stub_t *stub) {
if (stub == NULL) {
return;
}
stub->artifact_bytes = amduat_octets(NULL, 0u);
}
amduat_fed_transport_t amduat_fed_transport_stub_ops(
amduat_fed_transport_stub_t *stub) {
amduat_fed_transport_t ops;
memset(&ops, 0, sizeof(ops));
ops.ctx = stub;
ops.get_records = amduat_fed_transport_stub_get_records;
ops.free_records = amduat_fed_transport_stub_free_records;
ops.get_artifact = amduat_fed_transport_stub_get_artifact;
return ops;
}
void amduat_fed_transport_stub_free(amduat_fed_transport_stub_t *stub) {
if (stub == NULL) {
return;
}
amduat_octets_free(&stub->artifact_bytes);
}

View file

@ -0,0 +1,25 @@
#ifndef AMDUAT_FED_TRANSPORT_STUB_H
#define AMDUAT_FED_TRANSPORT_STUB_H
#include "federation/coord.h"
#ifdef __cplusplus
extern "C" {
#endif
typedef struct {
amduat_octets_t artifact_bytes;
} amduat_fed_transport_stub_t;
void amduat_fed_transport_stub_init(amduat_fed_transport_stub_t *stub);
amduat_fed_transport_t amduat_fed_transport_stub_ops(
amduat_fed_transport_stub_t *stub);
void amduat_fed_transport_stub_free(amduat_fed_transport_stub_t *stub);
#ifdef __cplusplus
} /* extern "C" */
#endif
#endif /* AMDUAT_FED_TRANSPORT_STUB_H */

852
federation/transport_unix.c Normal file
View file

@ -0,0 +1,852 @@
#define _POSIX_C_SOURCE 200809L
#include "federation/transport_unix.h"
#include "amduat/asl/ref_text.h"
#include <errno.h>
#include <stdbool.h>
#include <stddef.h>
#include <stdint.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/socket.h>
#include <sys/un.h>
#include <unistd.h>
static const char *amduat_fed_json_skip_ws(const char *p, const char *end) {
while (p < end) {
if (*p == ' ' || *p == '\n' || *p == '\r' || *p == '\t') {
p++;
continue;
}
break;
}
return p;
}
static bool amduat_fed_json_expect(const char **p, const char *end, char c) {
const char *cur;
if (p == NULL || *p == NULL) {
return false;
}
cur = amduat_fed_json_skip_ws(*p, end);
if (cur >= end || *cur != c) {
return false;
}
*p = cur + 1;
return true;
}
static bool amduat_fed_json_parse_string_noesc(const char **p,
const char *end,
const char **out_str,
size_t *out_len) {
const char *cur;
const char *start;
if (p == NULL || *p == NULL || out_str == NULL || out_len == NULL) {
return false;
}
cur = amduat_fed_json_skip_ws(*p, end);
if (cur >= end || *cur != '"') {
return false;
}
start = ++cur;
while (cur < end && *cur != '"') {
if (*cur == '\\') {
return false;
}
cur++;
}
if (cur >= end) {
return false;
}
*out_str = start;
*out_len = (size_t)(cur - start);
*p = cur + 1;
return true;
}
static bool amduat_fed_json_parse_u64(const char **p,
const char *end,
uint64_t *out) {
const char *cur;
char *next = NULL;
uint64_t v;
if (p == NULL || *p == NULL || out == NULL) {
return false;
}
cur = amduat_fed_json_skip_ws(*p, end);
if (cur >= end) {
return false;
}
errno = 0;
v = (uint64_t)strtoull(cur, &next, 10);
if (errno != 0 || next == cur) {
return false;
}
*out = v;
*p = next;
return true;
}
static bool amduat_fed_json_parse_u32(const char **p,
const char *end,
uint32_t *out) {
uint64_t tmp = 0;
if (!amduat_fed_json_parse_u64(p, end, &tmp)) {
return false;
}
if (tmp > UINT32_MAX) {
return false;
}
*out = (uint32_t)tmp;
return true;
}
static bool amduat_fed_json_parse_bool(const char **p,
const char *end,
bool *out) {
const char *cur;
if (p == NULL || *p == NULL || out == NULL) {
return false;
}
cur = amduat_fed_json_skip_ws(*p, end);
if (cur + 4 <= end && strncmp(cur, "true", 4) == 0) {
*out = true;
*p = cur + 4;
return true;
}
if (cur + 5 <= end && strncmp(cur, "false", 5) == 0) {
*out = false;
*p = cur + 5;
return true;
}
return false;
}
static bool amduat_fed_json_skip_string(const char **p, const char *end) {
const char *cur;
if (p == NULL || *p == NULL) {
return false;
}
cur = amduat_fed_json_skip_ws(*p, end);
if (cur >= end || *cur != '"') {
return false;
}
cur++;
while (cur < end && *cur != '"') {
if (*cur == '\\') {
return false;
}
cur++;
}
if (cur >= end) {
return false;
}
*p = cur + 1;
return true;
}
static bool amduat_fed_json_skip_value(const char **p, const char *end, int depth);
static bool amduat_fed_json_skip_array(const char **p,
const char *end,
int depth) {
const char *cur;
if (!amduat_fed_json_expect(p, end, '[')) {
return false;
}
cur = amduat_fed_json_skip_ws(*p, end);
if (cur < end && *cur == ']') {
*p = cur + 1;
return true;
}
for (;;) {
if (!amduat_fed_json_skip_value(p, end, depth + 1)) {
return false;
}
cur = amduat_fed_json_skip_ws(*p, end);
if (cur >= end) {
return false;
}
if (*cur == ',') {
*p = cur + 1;
continue;
}
if (*cur == ']') {
*p = cur + 1;
return true;
}
return false;
}
}
static bool amduat_fed_json_skip_object(const char **p,
const char *end,
int depth) {
const char *cur;
if (!amduat_fed_json_expect(p, end, '{')) {
return false;
}
cur = amduat_fed_json_skip_ws(*p, end);
if (cur < end && *cur == '}') {
*p = cur + 1;
return true;
}
for (;;) {
if (!amduat_fed_json_skip_string(p, end)) {
return false;
}
if (!amduat_fed_json_expect(p, end, ':')) {
return false;
}
if (!amduat_fed_json_skip_value(p, end, depth + 1)) {
return false;
}
cur = amduat_fed_json_skip_ws(*p, end);
if (cur >= end) {
return false;
}
if (*cur == ',') {
*p = cur + 1;
continue;
}
if (*cur == '}') {
*p = cur + 1;
return true;
}
return false;
}
}
static bool amduat_fed_json_skip_value(const char **p,
const char *end,
int depth) {
const char *cur;
if (depth > 64) {
return false;
}
cur = amduat_fed_json_skip_ws(*p, end);
if (cur >= end) {
return false;
}
if (*cur == '"') {
return amduat_fed_json_skip_string(p, end);
}
if (*cur == '{') {
return amduat_fed_json_skip_object(p, end, depth);
}
if (*cur == '[') {
return amduat_fed_json_skip_array(p, end, depth);
}
if (strncmp(cur, "true", 4) == 0) {
*p = cur + 4;
return true;
}
if (strncmp(cur, "false", 5) == 0) {
*p = cur + 5;
return true;
}
if (strncmp(cur, "null", 4) == 0) {
*p = cur + 4;
return true;
}
if ((*cur >= '0' && *cur <= '9') || *cur == '-') {
char *next = NULL;
(void)strtoull(cur, &next, 10);
if (next == cur) {
return false;
}
*p = next;
return true;
}
return false;
}
static bool amduat_fed_transport_parse_record(const char **p,
const char *end,
uint32_t default_domain_id,
amduat_fed_record_t *out) {
const char *key = NULL;
size_t key_len = 0;
const char *sv = NULL;
size_t sv_len = 0;
uint32_t domain_id = default_domain_id;
uint32_t type = 0;
uint32_t visibility = 0;
uint32_t source_domain = 0;
bool has_domain = false;
bool has_type = false;
bool has_ref = false;
bool has_logseq = false;
bool has_snapshot = false;
bool has_log_prefix = false;
bool has_source = false;
amduat_reference_t ref;
memset(&ref, 0, sizeof(ref));
if (!amduat_fed_json_expect(p, end, '{')) {
return false;
}
for (;;) {
const char *cur = amduat_fed_json_skip_ws(*p, end);
if (cur < end && *cur == '}') {
*p = cur + 1;
break;
}
if (!amduat_fed_json_parse_string_noesc(p, end, &key, &key_len) ||
!amduat_fed_json_expect(p, end, ':')) {
return false;
}
if (key_len == strlen("domain_id") &&
memcmp(key, "domain_id", key_len) == 0) {
if (!amduat_fed_json_parse_u32(p, end, &domain_id)) {
return false;
}
has_domain = true;
} else if (key_len == strlen("type") &&
memcmp(key, "type", key_len) == 0) {
if (!amduat_fed_json_parse_u32(p, end, &type)) {
return false;
}
has_type = true;
} else if (key_len == strlen("ref") &&
memcmp(key, "ref", key_len) == 0) {
if (!amduat_fed_json_parse_string_noesc(p, end, &sv, &sv_len)) {
return false;
}
{
char *tmp = (char *)malloc(sv_len + 1u);
if (tmp == NULL) {
return false;
}
memcpy(tmp, sv, sv_len);
tmp[sv_len] = '\0';
if (!amduat_asl_ref_decode_hex(tmp, &ref)) {
free(tmp);
return false;
}
free(tmp);
}
has_ref = true;
} else if (key_len == strlen("logseq") &&
memcmp(key, "logseq", key_len) == 0) {
if (!amduat_fed_json_parse_u64(p, end, &out->logseq)) {
return false;
}
has_logseq = true;
} else if (key_len == strlen("snapshot_id") &&
memcmp(key, "snapshot_id", key_len) == 0) {
if (!amduat_fed_json_parse_u64(p, end, &out->snapshot_id)) {
return false;
}
has_snapshot = true;
} else if (key_len == strlen("log_prefix") &&
memcmp(key, "log_prefix", key_len) == 0) {
if (!amduat_fed_json_parse_u64(p, end, &out->log_prefix)) {
return false;
}
has_log_prefix = true;
} else if (key_len == strlen("visibility") &&
memcmp(key, "visibility", key_len) == 0) {
if (!amduat_fed_json_parse_u32(p, end, &visibility)) {
return false;
}
} else if (key_len == strlen("has_source") &&
memcmp(key, "has_source", key_len) == 0) {
bool tmp = false;
if (!amduat_fed_json_parse_bool(p, end, &tmp)) {
return false;
}
has_source = tmp;
} else if (key_len == strlen("source_domain") &&
memcmp(key, "source_domain", key_len) == 0) {
if (!amduat_fed_json_parse_u32(p, end, &source_domain)) {
return false;
}
} else {
if (!amduat_fed_json_skip_value(p, end, 0)) {
return false;
}
}
{
const char *cur = amduat_fed_json_skip_ws(*p, end);
if (cur >= end) {
return false;
}
if (*cur == ',') {
*p = cur + 1;
continue;
}
if (*cur == '}') {
*p = cur + 1;
break;
}
return false;
}
}
if (!has_ref) {
return false;
}
if (!has_type || !has_logseq || !has_snapshot || !has_log_prefix) {
amduat_reference_free(&ref);
return false;
}
out->meta.domain_id = domain_id;
out->meta.visibility = (uint8_t)visibility;
out->meta.has_source = has_source ? 1u : 0u;
out->meta.source_domain = source_domain;
out->id.type = (amduat_fed_record_type_t)type;
out->id.ref = ref;
(void)has_domain;
return true;
}
static bool amduat_fed_transport_parse_records(const char *body,
size_t body_len,
amduat_fed_record_t **out_records,
size_t *out_len) {
const char *p = body;
const char *end = body + body_len;
const char *key = NULL;
size_t key_len = 0;
uint32_t domain_id = 0;
uint64_t snapshot_id = 0;
uint64_t log_prefix = 0;
uint64_t next_logseq = 0;
bool have_domain = false;
bool have_snapshot = false;
bool have_log_prefix = false;
bool have_next = false;
amduat_fed_record_t *records = NULL;
size_t records_len = 0;
size_t records_cap = 0;
if (out_records == NULL || out_len == NULL) {
return false;
}
*out_records = NULL;
*out_len = 0;
if (!amduat_fed_json_expect(&p, end, '{')) {
return false;
}
for (;;) {
const char *cur = amduat_fed_json_skip_ws(p, end);
if (cur < end && *cur == '}') {
p = cur + 1;
break;
}
if (!amduat_fed_json_parse_string_noesc(&p, end, &key, &key_len) ||
!amduat_fed_json_expect(&p, end, ':')) {
goto parse_fail;
}
if (key_len == strlen("domain_id") &&
memcmp(key, "domain_id", key_len) == 0) {
if (!amduat_fed_json_parse_u32(&p, end, &domain_id)) {
goto parse_fail;
}
have_domain = true;
} else if (key_len == strlen("snapshot_id") &&
memcmp(key, "snapshot_id", key_len) == 0) {
if (!amduat_fed_json_parse_u64(&p, end, &snapshot_id)) {
goto parse_fail;
}
have_snapshot = true;
} else if (key_len == strlen("log_prefix") &&
memcmp(key, "log_prefix", key_len) == 0) {
if (!amduat_fed_json_parse_u64(&p, end, &log_prefix)) {
goto parse_fail;
}
have_log_prefix = true;
} else if (key_len == strlen("next_logseq") &&
memcmp(key, "next_logseq", key_len) == 0) {
if (!amduat_fed_json_parse_u64(&p, end, &next_logseq)) {
goto parse_fail;
}
have_next = true;
} else if (key_len == strlen("records") &&
memcmp(key, "records", key_len) == 0) {
if (!amduat_fed_json_expect(&p, end, '[')) {
goto parse_fail;
}
cur = amduat_fed_json_skip_ws(p, end);
if (cur < end && *cur == ']') {
p = cur + 1;
} else {
for (;;) {
amduat_fed_record_t record;
memset(&record, 0, sizeof(record));
if (!amduat_fed_transport_parse_record(&p, end, domain_id, &record)) {
goto parse_fail;
}
if (records_len == records_cap) {
size_t next_cap = records_cap != 0 ? records_cap * 2u : 64u;
amduat_fed_record_t *next =
(amduat_fed_record_t *)realloc(
records, next_cap * sizeof(*records));
if (next == NULL) {
amduat_reference_free(&record.id.ref);
goto parse_fail;
}
records = next;
records_cap = next_cap;
}
records[records_len++] = record;
cur = amduat_fed_json_skip_ws(p, end);
if (cur < end && *cur == ',') {
p = cur + 1;
continue;
}
if (cur < end && *cur == ']') {
p = cur + 1;
break;
}
goto parse_fail;
}
}
} else {
if (!amduat_fed_json_skip_value(&p, end, 0)) {
goto parse_fail;
}
}
cur = amduat_fed_json_skip_ws(p, end);
if (cur >= end) {
goto parse_fail;
}
if (*cur == ',') {
p = cur + 1;
continue;
}
if (*cur == '}') {
p = cur + 1;
break;
}
goto parse_fail;
}
if (!have_domain || !have_snapshot || !have_log_prefix || !have_next) {
goto parse_fail;
}
(void)snapshot_id;
(void)log_prefix;
(void)next_logseq;
*out_records = records;
*out_len = records_len;
return true;
parse_fail:
if (records != NULL) {
size_t i;
for (i = 0; i < records_len; ++i) {
amduat_reference_free(&records[i].id.ref);
}
free(records);
}
return false;
}
static int amduat_fed_transport_unix_connect(const char *path) {
int fd;
struct sockaddr_un addr;
if (path == NULL || path[0] == '\0') {
return -1;
}
fd = socket(AF_UNIX, SOCK_STREAM, 0);
if (fd < 0) {
return -1;
}
memset(&addr, 0, sizeof(addr));
addr.sun_family = AF_UNIX;
strncpy(addr.sun_path, path, sizeof(addr.sun_path) - 1);
if (connect(fd, (struct sockaddr *)&addr, sizeof(addr)) != 0) {
close(fd);
return -1;
}
return fd;
}
static bool amduat_fed_transport_unix_send_all(int fd,
const char *buf,
size_t len) {
size_t off = 0;
while (off < len) {
ssize_t n = write(fd, buf + off, len - off);
if (n < 0) {
if (errno == EINTR) {
continue;
}
return false;
}
off += (size_t)n;
}
return true;
}
static bool amduat_fed_transport_unix_read_all(int fd,
uint8_t **out_buf,
size_t *out_len) {
uint8_t *buf = NULL;
size_t len = 0;
size_t cap = 0;
if (out_buf == NULL || out_len == NULL) {
return false;
}
for (;;) {
uint8_t tmp[4096];
ssize_t n = read(fd, tmp, sizeof(tmp));
if (n < 0) {
if (errno == EINTR) {
continue;
}
free(buf);
return false;
}
if (n == 0) {
break;
}
if (len + (size_t)n > cap) {
size_t next_cap = cap != 0 ? cap * 2u : 8192u;
while (next_cap < len + (size_t)n) {
next_cap *= 2u;
}
{
uint8_t *next = (uint8_t *)realloc(buf, next_cap);
if (next == NULL) {
free(buf);
return false;
}
buf = next;
cap = next_cap;
}
}
memcpy(buf + len, tmp, (size_t)n);
len += (size_t)n;
}
*out_buf = buf;
*out_len = len;
return true;
}
static bool amduat_fed_transport_unix_split_response(const uint8_t *buf,
size_t len,
const uint8_t **out_body,
size_t *out_body_len,
int *out_status) {
size_t i;
const uint8_t *body = NULL;
if (out_body == NULL || out_body_len == NULL || out_status == NULL) {
return false;
}
*out_body = NULL;
*out_body_len = 0;
*out_status = 0;
if (buf == NULL || len < 12) {
return false;
}
if (memcmp(buf, "HTTP/1.", 7) != 0) {
return false;
}
for (i = 0; i + 3 < len; ++i) {
if (buf[i] == '\r' && buf[i + 1] == '\n' &&
buf[i + 2] == '\r' && buf[i + 3] == '\n') {
body = buf + i + 4u;
*out_body = body;
*out_body_len = len - (i + 4u);
break;
}
}
if (body == NULL) {
return false;
}
{
int status = 0;
if (sscanf((const char *)buf, "HTTP/1.%*c %d", &status) != 1) {
return false;
}
*out_status = status;
}
return true;
}
static int amduat_fed_transport_unix_get_records(void *ctx,
uint32_t domain_id,
uint64_t snapshot_id,
uint64_t log_prefix,
uint64_t from_logseq,
amduat_fed_record_t **out_records,
size_t *out_len) {
amduat_fed_transport_unix_t *transport = (amduat_fed_transport_unix_t *)ctx;
char req[512];
int fd;
uint8_t *buf = NULL;
size_t buf_len = 0;
const uint8_t *body = NULL;
size_t body_len = 0;
int status = 0;
(void)snapshot_id;
(void)log_prefix;
if (transport == NULL || out_records == NULL || out_len == NULL) {
return -1;
}
*out_records = NULL;
*out_len = 0;
snprintf(req, sizeof(req),
"GET /v1/fed/records?domain_id=%u&from_logseq=%llu HTTP/1.1\r\n"
"Host: localhost\r\n"
"Connection: close\r\n"
"\r\n",
(unsigned int)domain_id,
(unsigned long long)from_logseq);
fd = amduat_fed_transport_unix_connect(transport->socket_path);
if (fd < 0) {
return -1;
}
if (!amduat_fed_transport_unix_send_all(fd, req, strlen(req))) {
close(fd);
return -1;
}
if (!amduat_fed_transport_unix_read_all(fd, &buf, &buf_len)) {
close(fd);
return -1;
}
close(fd);
if (!amduat_fed_transport_unix_split_response(buf,
buf_len,
&body,
&body_len,
&status)) {
free(buf);
return -1;
}
if (status != 200) {
free(buf);
return -1;
}
if (!amduat_fed_transport_parse_records((const char *)body,
body_len,
out_records,
out_len)) {
free(buf);
return -1;
}
free(buf);
return 0;
}
static void amduat_fed_transport_unix_free_records(void *ctx,
amduat_fed_record_t *records,
size_t len) {
size_t i;
(void)ctx;
if (records == NULL) {
return;
}
for (i = 0; i < len; ++i) {
amduat_reference_free(&records[i].id.ref);
}
free(records);
}
static int amduat_fed_transport_unix_get_artifact(void *ctx,
amduat_reference_t ref,
amduat_octets_t *out_bytes) {
amduat_fed_transport_unix_t *transport = (amduat_fed_transport_unix_t *)ctx;
char *ref_hex = NULL;
char req[512];
int fd;
uint8_t *buf = NULL;
size_t buf_len = 0;
const uint8_t *body = NULL;
size_t body_len = 0;
int status = 0;
bool ok;
if (transport == NULL || out_bytes == NULL) {
return -1;
}
*out_bytes = amduat_octets(NULL, 0u);
if (!amduat_asl_ref_encode_hex(ref, &ref_hex)) {
return -1;
}
snprintf(req, sizeof(req),
"GET /v1/fed/artifacts/%s HTTP/1.1\r\n"
"Host: localhost\r\n"
"Connection: close\r\n"
"\r\n",
ref_hex);
free(ref_hex);
fd = amduat_fed_transport_unix_connect(transport->socket_path);
if (fd < 0) {
return -1;
}
ok = amduat_fed_transport_unix_send_all(fd, req, strlen(req));
if (!ok) {
close(fd);
return -1;
}
if (!amduat_fed_transport_unix_read_all(fd, &buf, &buf_len)) {
close(fd);
return -1;
}
close(fd);
if (!amduat_fed_transport_unix_split_response(buf,
buf_len,
&body,
&body_len,
&status)) {
free(buf);
return -1;
}
if (status != 200) {
free(buf);
return -1;
}
if (!amduat_octets_clone(amduat_octets(body, body_len), out_bytes)) {
free(buf);
return -1;
}
free(buf);
return 0;
}
bool amduat_fed_transport_unix_init(amduat_fed_transport_unix_t *transport,
const char *socket_path) {
if (transport == NULL || socket_path == NULL) {
return false;
}
if (strlen(socket_path) >= AMDUAT_FED_TRANSPORT_UNIX_PATH_MAX) {
return false;
}
memset(transport->socket_path, 0, sizeof(transport->socket_path));
strncpy(transport->socket_path, socket_path,
AMDUAT_FED_TRANSPORT_UNIX_PATH_MAX - 1u);
return true;
}
amduat_fed_transport_t amduat_fed_transport_unix_ops(
amduat_fed_transport_unix_t *transport) {
amduat_fed_transport_t ops;
memset(&ops, 0, sizeof(ops));
ops.ctx = transport;
ops.get_records = amduat_fed_transport_unix_get_records;
ops.free_records = amduat_fed_transport_unix_free_records;
ops.get_artifact = amduat_fed_transport_unix_get_artifact;
return ops;
}

View file

@ -0,0 +1,26 @@
#ifndef AMDUAT_FED_TRANSPORT_UNIX_H
#define AMDUAT_FED_TRANSPORT_UNIX_H
#include "federation/coord.h"
#ifdef __cplusplus
extern "C" {
#endif
enum { AMDUAT_FED_TRANSPORT_UNIX_PATH_MAX = 1024 };
typedef struct {
char socket_path[AMDUAT_FED_TRANSPORT_UNIX_PATH_MAX];
} amduat_fed_transport_unix_t;
bool amduat_fed_transport_unix_init(amduat_fed_transport_unix_t *transport,
const char *socket_path);
amduat_fed_transport_t amduat_fed_transport_unix_ops(
amduat_fed_transport_unix_t *transport);
#ifdef __cplusplus
} /* extern "C" */
#endif
#endif /* AMDUAT_FED_TRANSPORT_UNIX_H */

View file

@ -18,3 +18,7 @@ acts as the version identifier.
- `api-contract.jsonl` — manifest of published contracts.
- `amduatd-api-contract.v1.json` — contract bytes (v1).
Receipt note:
- `/v1/pel/run` accepts optional receipt v1.1 fields (executor fingerprint, run id,
limits, logs, determinism, rng seed, signature) and emits `receipt_ref` when
provided.

View file

@ -1 +1,180 @@
{"contract":"AMDUATD/API/1","base_path":"/v1","endpoints":[{"method":"GET","path":"/v1/ui"},{"method":"GET","path":"/v1/meta"},{"method":"HEAD","path":"/v1/meta"},{"method":"GET","path":"/v1/contract"},{"method":"POST","path":"/v1/concepts"},{"method":"GET","path":"/v1/concepts"},{"method":"GET","path":"/v1/concepts/{name}"},{"method":"POST","path":"/v1/concepts/{name}/publish"},{"method":"GET","path":"/v1/resolve/{name}"},{"method":"POST","path":"/v1/artifacts"},{"method":"GET","path":"/v1/artifacts/{ref}"},{"method":"HEAD","path":"/v1/artifacts/{ref}"},{"method":"GET","path":"/v1/artifacts/{ref}?format=info"},{"method":"POST","path":"/v1/pel/run"},{"method":"POST","path":"/v1/pel/programs"}],"schemas":{"pel_run_request":{"type":"object","required":["program_ref","input_refs"],"properties":{"program_ref":{"type":"string","description":"hex ref or concept name"},"input_refs":{"type":"array","items":{"type":"string","description":"hex ref or concept name"}},"params_ref":{"type":"string","description":"hex ref or concept name"},"scheme_ref":{"type":"string","description":"hex ref or 'dag'"}}},"pel_run_response":{"type":"object","required":["result_ref","output_refs","status"],"properties":{"result_ref":{"type":"string","description":"hex ref"},"trace_ref":{"type":"string","description":"hex ref"},"output_refs":{"type":"array","items":{"type":"string","description":"hex ref"}},"status":{"type":"string"}}},"pel_program_author_request":{"type":"object","required":["nodes","roots"],"properties":{"nodes":{"type":"array"},"roots":{"type":"array"}}},"concept_create_request":{"type":"object","required":["name"],"properties":{"name":{"type":"string"},"ref":{"type":"string","description":"hex ref"}}},"artifact_info_response":{"type":"object","required":["len","has_type_tag","type_tag"],"properties":{"len":{"type":"integer"},"has_type_tag":{"type":"boolean"},"type_tag":{"type":"string"}}}}}
{
"contract": "AMDUATD/API/1",
"base_path": "/v1",
"endpoints": [
{"method": "GET", "path": "/v1/ui"},
{"method": "GET", "path": "/v1/meta"},
{"method": "HEAD", "path": "/v1/meta"},
{"method": "GET", "path": "/v1/contract"},
{"method": "GET", "path": "/v1/fed/records"},
{"method": "GET", "path": "/v1/fed/artifacts/{ref}"},
{"method": "GET", "path": "/v1/fed/status"},
{"method": "POST", "path": "/v1/concepts"},
{"method": "GET", "path": "/v1/concepts"},
{"method": "GET", "path": "/v1/concepts/{name}"},
{"method": "POST", "path": "/v1/concepts/{name}/publish"},
{"method": "GET", "path": "/v1/resolve/{name}"},
{"method": "POST", "path": "/v1/artifacts"},
{"method": "GET", "path": "/v1/relations"},
{"method": "GET", "path": "/v1/artifacts/{ref}"},
{"method": "HEAD", "path": "/v1/artifacts/{ref}"},
{"method": "GET", "path": "/v1/artifacts/{ref}?format=info"},
{"method": "POST", "path": "/v1/pel/run"},
{"method": "POST", "path": "/v1/pel/programs"},
{"method": "POST", "path": "/v1/context_frames"}
],
"schemas": {
"pel_run_request": {
"type": "object",
"required": ["program_ref", "input_refs"],
"properties": {
"program_ref": {"type": "string", "description": "hex ref or concept name"},
"input_refs": {"type": "array", "items": {"type": "string", "description": "hex ref or concept name"}},
"params_ref": {"type": "string", "description": "hex ref or concept name"},
"scheme_ref": {"type": "string", "description": "hex ref or 'dag'"},
"receipt": {
"type": "object",
"required": ["input_manifest_ref", "environment_ref", "evaluator_id", "executor_ref", "started_at", "completed_at"],
"properties": {
"input_manifest_ref": {"type": "string", "description": "hex ref or concept name"},
"environment_ref": {"type": "string", "description": "hex ref or concept name"},
"evaluator_id": {"type": "string", "description": "opaque evaluator bytes (utf-8)"},
"executor_ref": {"type": "string", "description": "hex ref or concept name"},
"sbom_ref": {"type": "string", "description": "hex ref or concept name"},
"parity_digest_hex": {"type": "string", "description": "hex bytes"},
"executor_fingerprint_ref": {"type": "string", "description": "hex ref or concept name"},
"run_id_hex": {"type": "string", "description": "hex bytes"},
"limits": {
"type": "object",
"required": ["cpu_ms", "wall_ms", "max_rss_kib", "io_reads", "io_writes"],
"properties": {
"cpu_ms": {"type": "integer"},
"wall_ms": {"type": "integer"},
"max_rss_kib": {"type": "integer"},
"io_reads": {"type": "integer"},
"io_writes": {"type": "integer"}
}
},
"logs": {
"type": "array",
"items": {
"type": "object",
"required": ["kind", "log_ref", "sha256_hex"],
"properties": {
"kind": {"type": "integer"},
"log_ref": {"type": "string", "description": "hex ref or concept name"},
"sha256_hex": {"type": "string", "description": "hex bytes"}
}
}
},
"determinism_level": {"type": "integer", "description": "0-255"},
"rng_seed_hex": {"type": "string", "description": "hex bytes"},
"signature_hex": {"type": "string", "description": "hex bytes"},
"started_at": {"type": "integer"},
"completed_at": {"type": "integer"}
}
}
}
},
"pel_run_response": {
"type": "object",
"required": ["result_ref", "output_refs", "status"],
"properties": {
"result_ref": {"type": "string", "description": "hex ref"},
"trace_ref": {"type": "string", "description": "hex ref"},
"receipt_ref": {"type": "string", "description": "hex ref"},
"output_refs": {"type": "array", "items": {"type": "string", "description": "hex ref"}},
"status": {"type": "string"}
}
},
"fed_records_response": {
"type": "object",
"required": ["domain_id", "snapshot_id", "log_prefix", "next_logseq", "records"],
"properties": {
"domain_id": {"type": "integer"},
"snapshot_id": {"type": "integer"},
"log_prefix": {"type": "integer"},
"next_logseq": {"type": "integer", "description": "Paging cursor; last emitted logseq + 1, or from_logseq if no records emitted."},
"records": {
"type": "array",
"items": {
"type": "object",
"required": ["domain_id", "type", "ref", "logseq", "snapshot_id", "log_prefix"],
"properties": {
"domain_id": {"type": "integer"},
"type": {"type": "integer"},
"ref": {"type": "string"},
"logseq": {"type": "integer"},
"snapshot_id": {"type": "integer"},
"log_prefix": {"type": "integer"},
"visibility": {"type": "integer"},
"has_source": {"type": "boolean"},
"source_domain": {"type": "integer"},
"notes": {"type": "string", "description": "Type mapping: ARTIFACT_PUBLISH -> ARTIFACT, PER when type_tag=FER1_RECEIPT_1, TGK_EDGE when type_tag=TGK1_EDGE_V1; ARTIFACT_UNPUBLISH -> TOMBSTONE."}
}
}
}
}
},
"fed_status_response": {
"type": "object",
"required": ["status", "domain_id", "registry_ref", "last_tick_ms"],
"properties": {
"status": {"type": "string"},
"domain_id": {"type": "integer"},
"registry_ref": {"type": ["string", "null"]},
"last_tick_ms": {"type": "integer"}
}
},
"context_frame_request": {
"type": "object",
"required": ["bindings"],
"properties": {
"bindings": {
"type": "array",
"items": {
"type": "object",
"required": ["key"],
"properties": {
"key": {"type": "string", "description": "concept name or hex ref"},
"value": {"type": "string", "description": "hex ref or concept name"},
"value_ref": {"type": "string", "description": "hex ref or concept name"},
"value_scalar": {
"type": "object",
"properties": {
"int": {"type": "integer"},
"enum": {"type": "string", "description": "concept name or hex ref"}
}
}
}
}
}
}
},
"pel_program_author_request": {
"type": "object",
"required": ["nodes", "roots"],
"properties": {
"nodes": {"type": "array"},
"roots": {"type": "array"}
}
},
"concept_create_request": {
"type": "object",
"required": ["name"],
"properties": {
"name": {"type": "string"},
"ref": {"type": "string", "description": "hex ref"}
}
},
"artifact_info_response": {
"type": "object",
"required": ["len", "has_type_tag", "type_tag"],
"properties": {
"len": {"type": "integer"},
"has_type_tag": {"type": "boolean"},
"type_tag": {"type": "string"}
}
}
}
}

View file

@ -1 +1 @@
{"registry":"AMDUATD/API","contract":"AMDUATD/API/1","handle":"amduat.api.amduatd.contract.v1@1","media_type":"application/json","status":"active","bytes_sha256":"0072ad1a308bfa52c7578a1ff4fbfc85b662b41f37a839f4390bdb4c24ecef0c","notes":"Seeded into the ASL store at amduatd startup; ref is advertised via /v1/meta."}
{"registry":"AMDUATD/API","contract":"AMDUATD/API/1","handle":"amduat.api.amduatd.contract.v1@1","media_type":"application/json","status":"active","bytes_sha256":"34db2a9929eefe4c4d8d95314c0828746c0484dc178d3f63467a8c3d24c17110","notes":"Seeded into the ASL store at amduatd startup; ref is advertised via /v1/meta."}

View file

@ -18,3 +18,8 @@ as stored in the corresponding `registry/*.json` file.
- `bytes_sha256` (string, required): sha256 of the bytes file.
- `notes` (string, optional): human notes.
## Contract Notes
- `amduatd-api-contract.v1.json` includes optional receipt v1.1 fields for
`/v1/pel/run` and may emit `receipt_ref` in responses.
- `/v1/fed/records` supports `limit` for paging (default 256, max 10000).

File diff suppressed because it is too large Load diff