Add mount-aware /v1/space/mounts/sync/until with tests and docs

This commit is contained in:
Carl Niklas Rydberg 2026-01-24 20:16:53 +01:00
parent e54a9009a6
commit ee4397b0d6
7 changed files with 1549 additions and 1 deletions

View file

@ -33,7 +33,8 @@ set(amduatd_sources src/amduatd.c src/amduatd_http.c src/amduatd_caps.c
src/amduatd_fed_pull_apply.c src/amduatd_fed_push_apply.c
src/amduatd_space_doctor.c src/amduatd_space_roots.c
src/amduatd_space_manifest.c
src/amduatd_space_mounts.c)
src/amduatd_space_mounts.c
src/amduatd_space_mounts_sync.c)
if(AMDUATD_ENABLE_UI)
list(APPEND amduatd_sources src/amduatd_ui.c)
endif()
@ -414,6 +415,37 @@ target_link_libraries(amduatd_test_space_mounts
add_test(NAME amduatd_space_mounts COMMAND amduatd_test_space_mounts)
add_executable(amduatd_test_space_mounts_sync
tests/test_amduatd_space_mounts_sync.c
src/amduatd_space_mounts_sync.c
src/amduatd_space_manifest.c
src/amduatd_fed_cursor.c
src/amduatd_fed_pull_plan.c
src/amduatd_fed_pull_apply.c
src/amduatd_fed_push_apply.c
src/amduatd_fed_push_plan.c
src/amduatd_fed_until.c
src/amduatd_fed.c
src/amduatd_http.c
src/amduatd_space.c
src/amduatd_store.c
)
target_include_directories(amduatd_test_space_mounts_sync
PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}
PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}/src
PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}/vendor/amduat/include
)
target_link_libraries(amduatd_test_space_mounts_sync
PRIVATE amduat_asl_store_fs amduat_asl_pointer_fs amduat_asl_record
amduat_asl_store_index_fs amduat_asl amduat_enc amduat_util
amduat_asl_log_store
amduat_hash_asl1 amduat_fed
)
add_test(NAME amduatd_space_mounts_sync COMMAND amduatd_test_space_mounts_sync)
add_executable(amduatd_test_space_sync_status
tests/test_amduatd_space_sync_status.c
src/amduatd_space_roots.c

View file

@ -320,6 +320,21 @@ curl --unix-socket amduatd.sock \
-H 'X-Amduat-Space: demo'
```
## Space mounts sync (track mounts)
`/v1/space/mounts/sync/until` runs the federation pull/until loop for every
`track` mount in the current manifest using v2 cursor keying
(`remote_space_id = mount.space_id`). It is bounded by `limit`, `max_rounds`,
and `max_mounts`, returns per-mount status, and continues after errors.
Requires federation enabled and the index backend. If no manifest head is
present, the endpoint returns a 404.
```sh
curl --unix-socket amduatd.sock -X POST \
'http://localhost/v1/space/mounts/sync/until?limit=128&max_rounds=10&max_mounts=32' \
-H 'X-Amduat-Space: demo'
```
To fail `/v1/pel/run` if the derivation index write fails:
```sh
@ -485,6 +500,7 @@ When the daemon uses the `fs` store backend, index-only checks are reported as
- `GET /v1/space/manifest``{effective_space, manifest_ref, manifest}`
- `PUT /v1/space/manifest``{effective_space, manifest_ref, updated, previous_ref?, manifest}`
- `GET /v1/space/mounts/resolve``{effective_space, manifest_ref, mounts}`
- `POST /v1/space/mounts/sync/until?limit=...&max_rounds=...&max_mounts=...``{effective_space, manifest_ref, limit, max_rounds, max_mounts, mounts_total, mounts_synced, ok, results}`
- `GET /v1/space/sync/status``{effective_space, store_backend, federation, peers}`
- `GET /v1/ui` → browser UI for authoring/running programs
- `GET /v1/fed/records?domain_id=...&from_logseq=...&limit=...``{domain_id, snapshot_id, log_prefix, next_logseq, records[]}` (published artifacts + tombstones + PER + TGK edges)

View file

@ -11,6 +11,7 @@
{"method": "GET", "path": "/v1/space/manifest"},
{"method": "PUT", "path": "/v1/space/manifest"},
{"method": "GET", "path": "/v1/space/mounts/resolve"},
{"method": "POST", "path": "/v1/space/mounts/sync/until"},
{"method": "GET", "path": "/v1/space/sync/status"},
{"method": "POST", "path": "/v1/capabilities"},
{"method": "GET", "path": "/v1/cap/resolve"},
@ -288,6 +289,58 @@
"mounts": {"type": "array", "items": {"$ref": "#/schemas/space_mounts_resolved_mount"}}
}
},
"space_mounts_sync_error": {
"type": "object",
"required": ["code", "message"],
"properties": {
"code": {"type": "string"},
"message": {"type": "string"}
}
},
"space_mounts_sync_cursor": {
"type": "object",
"properties": {
"last_logseq": {"type": "integer"},
"ref": {"type": "string"}
}
},
"space_mounts_sync_result": {
"type": "object",
"required": ["name", "peer_key", "remote_space_id", "status"],
"properties": {
"name": {"type": "string"},
"peer_key": {"type": "string"},
"remote_space_id": {"type": "string"},
"status": {"type": "string"},
"caught_up": {"type": "boolean"},
"rounds_executed": {"type": "integer"},
"applied": {
"type": "object",
"required": ["records", "artifacts"],
"properties": {
"records": {"type": "integer"},
"artifacts": {"type": "integer"}
}
},
"cursor": {"$ref": "#/schemas/space_mounts_sync_cursor"},
"error": {"$ref": "#/schemas/space_mounts_sync_error"}
}
},
"space_mounts_sync_response": {
"type": "object",
"required": ["effective_space", "manifest_ref", "limit", "max_rounds", "max_mounts", "mounts_total", "mounts_synced", "ok", "results"],
"properties": {
"effective_space": {"type": "object"},
"manifest_ref": {"type": "string"},
"limit": {"type": "integer"},
"max_rounds": {"type": "integer"},
"max_mounts": {"type": "integer"},
"mounts_total": {"type": "integer"},
"mounts_synced": {"type": "integer"},
"ok": {"type": "boolean"},
"results": {"type": "array", "items": {"$ref": "#/schemas/space_mounts_sync_result"}}
}
},
"space_sync_status_cursor": {
"type": "object",
"required": ["present"],

View file

@ -53,6 +53,7 @@
#include "amduatd_space_roots.h"
#include "amduatd_space_manifest.h"
#include "amduatd_space_mounts.h"
#include "amduatd_space_mounts_sync.h"
#include <errno.h>
#include <signal.h>
@ -110,6 +111,20 @@ static uint64_t amduatd_now_ms(void) {
static void amduatd_strbuf_free(amduatd_strbuf_t *b);
static bool amduatd_strbuf_append_cstr(amduatd_strbuf_t *b, const char *s);
static bool amduatd_strbuf_append_char(amduatd_strbuf_t *b, char c);
static bool amduatd_parse_u64_str(const char *s, uint64_t *out);
static bool amduatd_fed_pull_unix_get_records(void *ctx,
uint32_t domain_id,
uint64_t from_logseq,
uint64_t limit,
int *out_status,
amduat_fed_record_t **out_records,
size_t *out_len,
char **out_body);
static bool amduatd_fed_pull_unix_get_artifact(void *ctx,
amduat_reference_t ref,
int *out_status,
amduat_octets_t *out_bytes,
char **out_body);
static const char *const AMDUATD_DEFAULT_ROOT = ".amduat-asl";
static const char *const AMDUATD_DEFAULT_SOCK = "amduatd.sock";
@ -131,6 +146,7 @@ static const char k_amduatd_contract_v1_json[] =
"{\"method\":\"GET\",\"path\":\"/v1/space/manifest\"},"
"{\"method\":\"PUT\",\"path\":\"/v1/space/manifest\"},"
"{\"method\":\"GET\",\"path\":\"/v1/space/mounts/resolve\"},"
"{\"method\":\"POST\",\"path\":\"/v1/space/mounts/sync/until\"},"
"{\"method\":\"GET\",\"path\":\"/v1/space/sync/status\"},"
"{\"method\":\"POST\",\"path\":\"/v1/capabilities\"},"
"{\"method\":\"GET\",\"path\":\"/v1/cap/resolve\"},"
@ -374,6 +390,59 @@ static const char k_amduatd_contract_v1_json[] =
"\"mounts\":{\"type\":\"array\",\"items\":{\"$ref\":\"#/schemas/space_mounts_resolved_mount\"}}"
"}"
"},"
"\"space_mounts_sync_error\":{"
"\"type\":\"object\","
"\"required\":[\"code\",\"message\"],"
"\"properties\":{"
"\"code\":{\"type\":\"string\"},"
"\"message\":{\"type\":\"string\"}"
"}"
"},"
"\"space_mounts_sync_cursor\":{"
"\"type\":\"object\","
"\"properties\":{"
"\"last_logseq\":{\"type\":\"integer\"},"
"\"ref\":{\"type\":\"string\"}"
"}"
"},"
"\"space_mounts_sync_result\":{"
"\"type\":\"object\","
"\"required\":[\"name\",\"peer_key\",\"remote_space_id\",\"status\"],"
"\"properties\":{"
"\"name\":{\"type\":\"string\"},"
"\"peer_key\":{\"type\":\"string\"},"
"\"remote_space_id\":{\"type\":\"string\"},"
"\"status\":{\"type\":\"string\"},"
"\"caught_up\":{\"type\":\"boolean\"},"
"\"rounds_executed\":{\"type\":\"integer\"},"
"\"applied\":{"
"\"type\":\"object\","
"\"required\":[\"records\",\"artifacts\"],"
"\"properties\":{"
"\"records\":{\"type\":\"integer\"},"
"\"artifacts\":{\"type\":\"integer\"}"
"}"
"},"
"\"cursor\":{\"$ref\":\"#/schemas/space_mounts_sync_cursor\"},"
"\"error\":{\"$ref\":\"#/schemas/space_mounts_sync_error\"}"
"}"
"},"
"\"space_mounts_sync_response\":{"
"\"type\":\"object\","
"\"required\":[\"effective_space\",\"manifest_ref\",\"limit\",\"max_rounds\","
"\"max_mounts\",\"mounts_total\",\"mounts_synced\",\"ok\",\"results\"],"
"\"properties\":{"
"\"effective_space\":{\"type\":\"object\"},"
"\"manifest_ref\":{\"type\":\"string\"},"
"\"limit\":{\"type\":\"integer\"},"
"\"max_rounds\":{\"type\":\"integer\"},"
"\"max_mounts\":{\"type\":\"integer\"},"
"\"mounts_total\":{\"type\":\"integer\"},"
"\"mounts_synced\":{\"type\":\"integer\"},"
"\"ok\":{\"type\":\"boolean\"},"
"\"results\":{\"type\":\"array\",\"items\":{\"$ref\":\"#/schemas/space_mounts_sync_result\"}}"
"}"
"},"
"\"space_sync_status_cursor\":{"
"\"type\":\"object\","
"\"required\":[\"present\"],"
@ -1530,6 +1599,253 @@ mounts_cleanup:
return ok;
}
static bool amduatd_handle_post_space_mounts_sync_until(
int fd,
amduat_asl_store_t *store,
const amduatd_fed_cfg_t *fed_cfg,
const amduatd_caps_t *caps,
const amduatd_cfg_t *dcfg,
const amduatd_http_req_t *req,
const char *root_path) {
char limit_buf[32];
char rounds_buf[32];
char mounts_buf[32];
uint64_t limit = 128u;
uint64_t max_rounds = 10u;
uint64_t max_mounts = 32u;
amduat_asl_pointer_store_t pointer_store;
amduat_fed_transport_unix_t transport;
amduat_fed_transport_t ops;
amduatd_fed_pull_transport_t pull_transport;
amduatd_space_mounts_sync_report_t report;
amduatd_space_mounts_sync_status_t status;
amduatd_strbuf_t b;
char *manifest_ref_hex = NULL;
bool ok = false;
memset(&report, 0, sizeof(report));
memset(&b, 0, sizeof(b));
if (store == NULL || fed_cfg == NULL || req == NULL || root_path == NULL ||
dcfg == NULL) {
return amduatd_send_json_error(fd, 500, "Internal Server Error",
"internal error");
}
{
amduatd_fed_pull_plan_status_t check =
amduatd_fed_pull_plan_check(fed_cfg, store);
if (check == AMDUATD_FED_PULL_PLAN_ERR_DISABLED) {
return amduatd_send_json_error(fd, 503, "Service Unavailable",
"federation disabled");
}
if (check == AMDUATD_FED_PULL_PLAN_ERR_UNSUPPORTED) {
return amduatd_send_json_error(fd, 501, "Not Implemented",
"requires index backend");
}
if (check != AMDUATD_FED_PULL_PLAN_OK) {
return amduatd_send_json_error(fd, 500, "Internal Server Error",
"plan unavailable");
}
}
if (caps != NULL && caps->enabled && req->x_capability[0] != '\0') {
const char *reason = NULL;
if (!amduatd_caps_check_space(caps, dcfg, req, &reason)) {
if (reason != NULL && strcmp(reason, "wrong-space") == 0) {
return amduatd_send_json_error(fd, 403, "Forbidden",
"space not permitted by capability");
}
return amduatd_send_json_error(fd, 403, "Forbidden",
"invalid capability");
}
}
if (amduatd_query_param(req->path, "limit",
limit_buf, sizeof(limit_buf)) != NULL) {
if (!amduatd_parse_u64_str(limit_buf, &limit) || limit == 0u ||
limit > 10000u) {
return amduatd_send_json_error(fd, 400, "Bad Request", "invalid limit");
}
}
if (amduatd_query_param(req->path, "max_rounds",
rounds_buf, sizeof(rounds_buf)) != NULL) {
if (!amduatd_parse_u64_str(rounds_buf, &max_rounds) ||
max_rounds == 0u || max_rounds > 100u) {
return amduatd_send_json_error(fd, 400, "Bad Request",
"invalid max_rounds");
}
}
if (amduatd_query_param(req->path, "max_mounts",
mounts_buf, sizeof(mounts_buf)) != NULL) {
if (!amduatd_parse_u64_str(mounts_buf, &max_mounts) ||
max_mounts == 0u || max_mounts > 256u) {
return amduatd_send_json_error(fd, 400, "Bad Request",
"invalid max_mounts");
}
}
if (!amduat_asl_pointer_store_init(&pointer_store, root_path)) {
return amduatd_send_json_error(fd, 500, "Internal Server Error",
"pointer store error");
}
if (fed_cfg->transport_kind != AMDUATD_FED_TRANSPORT_UNIX ||
!fed_cfg->unix_socket_set) {
return amduatd_send_json_error(fd, 501, "Not Implemented",
"federation transport unavailable");
}
if (!amduat_fed_transport_unix_init(&transport, fed_cfg->unix_socket_path)) {
return amduatd_send_json_error(fd, 500, "Internal Server Error",
"transport init failed");
}
if (req->effective_space != NULL && req->effective_space->enabled &&
req->effective_space->space_id.data != NULL) {
(void)amduat_fed_transport_unix_set_space(
&transport, (const char *)req->effective_space->space_id.data);
}
ops = amduat_fed_transport_unix_ops(&transport);
memset(&pull_transport, 0, sizeof(pull_transport));
pull_transport.ctx = &transport;
pull_transport.get_records = amduatd_fed_pull_unix_get_records;
pull_transport.free_records = ops.free_records;
pull_transport.get_artifact = amduatd_fed_pull_unix_get_artifact;
status = amduatd_space_mounts_sync_until(store,
&pointer_store,
req->effective_space,
fed_cfg,
&pull_transport,
limit,
max_rounds,
(size_t)max_mounts,
&report);
if (status == AMDUATD_SPACE_MOUNTS_SYNC_ERR_NOT_FOUND) {
amduatd_space_mounts_sync_report_free(&report);
return amduatd_send_json_error(fd, 404, "Not Found",
"manifest not found");
}
if (status == AMDUATD_SPACE_MOUNTS_SYNC_ERR_STORE) {
amduatd_space_mounts_sync_report_free(&report);
return amduatd_send_json_error(fd, 500, "Internal Server Error",
"store error");
}
if (status == AMDUATD_SPACE_MOUNTS_SYNC_ERR_CODEC) {
amduatd_space_mounts_sync_report_free(&report);
return amduatd_send_json_error(fd, 500, "Internal Server Error",
"manifest decode failed");
}
if (status != AMDUATD_SPACE_MOUNTS_SYNC_OK) {
amduatd_space_mounts_sync_report_free(&report);
return amduatd_send_json_error(fd, 500, "Internal Server Error",
"sync failed");
}
if (!amduat_asl_ref_encode_hex(report.manifest_ref, &manifest_ref_hex)) {
amduatd_space_mounts_sync_report_free(&report);
return amduatd_send_json_error(fd, 500, "Internal Server Error",
"encode error");
}
if (!amduatd_strbuf_append_cstr(&b, "{\"effective_space\":{")) {
goto mounts_sync_cleanup;
}
if (req->effective_space != NULL && req->effective_space->enabled &&
req->effective_space->space_id.data != NULL) {
const char *space_id = (const char *)req->effective_space->space_id.data;
if (!amduatd_strbuf_append_cstr(&b, "\"mode\":\"scoped\",") ||
!amduatd_strbuf_append_cstr(&b, "\"space_id\":\"") ||
!amduatd_strbuf_append_cstr(&b, space_id) ||
!amduatd_strbuf_append_cstr(&b, "\"")) {
goto mounts_sync_cleanup;
}
} else {
if (!amduatd_strbuf_append_cstr(&b, "\"mode\":\"unscoped\",") ||
!amduatd_strbuf_append_cstr(&b, "\"space_id\":null")) {
goto mounts_sync_cleanup;
}
}
if (!amduatd_strbuf_append_cstr(&b, "},\"manifest_ref\":\"") ||
!amduatd_strbuf_append_cstr(&b, manifest_ref_hex) ||
!amduatd_strbuf_append_cstr(&b, "\",\"limit\":")) {
goto mounts_sync_cleanup;
}
{
char tmp[32];
int n = snprintf(tmp, sizeof(tmp), "%llu",
(unsigned long long)limit);
if (n <= 0 || (size_t)n >= sizeof(tmp) ||
!amduatd_strbuf_append_cstr(&b, tmp)) {
goto mounts_sync_cleanup;
}
}
if (!amduatd_strbuf_append_cstr(&b, ",\"max_rounds\":")) {
goto mounts_sync_cleanup;
}
{
char tmp[32];
int n = snprintf(tmp, sizeof(tmp), "%llu",
(unsigned long long)max_rounds);
if (n <= 0 || (size_t)n >= sizeof(tmp) ||
!amduatd_strbuf_append_cstr(&b, tmp)) {
goto mounts_sync_cleanup;
}
}
if (!amduatd_strbuf_append_cstr(&b, ",\"max_mounts\":")) {
goto mounts_sync_cleanup;
}
{
char tmp[32];
int n = snprintf(tmp, sizeof(tmp), "%llu",
(unsigned long long)max_mounts);
if (n <= 0 || (size_t)n >= sizeof(tmp) ||
!amduatd_strbuf_append_cstr(&b, tmp)) {
goto mounts_sync_cleanup;
}
}
if (!amduatd_strbuf_append_cstr(&b, ",\"mounts_total\":")) {
goto mounts_sync_cleanup;
}
{
char tmp[32];
int n = snprintf(tmp, sizeof(tmp), "%zu", report.mounts_total);
if (n <= 0 || (size_t)n >= sizeof(tmp) ||
!amduatd_strbuf_append_cstr(&b, tmp)) {
goto mounts_sync_cleanup;
}
}
if (!amduatd_strbuf_append_cstr(&b, ",\"mounts_synced\":")) {
goto mounts_sync_cleanup;
}
{
char tmp[32];
int n = snprintf(tmp, sizeof(tmp), "%zu", report.mounts_synced);
if (n <= 0 || (size_t)n >= sizeof(tmp) ||
!amduatd_strbuf_append_cstr(&b, tmp)) {
goto mounts_sync_cleanup;
}
}
if (!amduatd_strbuf_append_cstr(&b, ",\"ok\":") ||
!amduatd_strbuf_append_cstr(&b, report.ok ? "true" : "false") ||
!amduatd_strbuf_append_cstr(&b, ",\"results\":") ||
!amduatd_strbuf_append(&b, report.results_json, report.results_len) ||
!amduatd_strbuf_append_cstr(&b, "}\n")) {
goto mounts_sync_cleanup;
}
ok = amduatd_http_send_json(fd, 200, "OK", b.data, false);
mounts_sync_cleanup:
if (!ok) {
ok = amduatd_send_json_error(fd, 500, "Internal Server Error", "error");
}
amduatd_space_mounts_sync_report_free(&report);
free(manifest_ref_hex);
amduatd_strbuf_free(&b);
return ok;
}
static bool amduatd_handle_put_space_manifest(
int fd,
amduat_asl_store_t *store,
@ -8775,6 +9091,17 @@ static bool amduatd_handle_conn(int fd,
root_path);
goto conn_cleanup;
}
if (strcmp(req.method, "POST") == 0 &&
strcmp(no_query, "/v1/space/mounts/sync/until") == 0) {
ok = amduatd_handle_post_space_mounts_sync_until(fd,
store,
fed_cfg,
caps,
effective_cfg,
&req,
root_path);
goto conn_cleanup;
}
if (strcmp(req.method, "GET") == 0 &&
strcmp(no_query, "/v1/space/sync/status") == 0) {
ok = amduatd_handle_get_space_sync_status(fd,

View file

@ -0,0 +1,484 @@
#include "amduatd_space_mounts_sync.h"
#include "amduat/asl/ref_text.h"
#include "amduatd_fed_until.h"
#include "amduatd_space_manifest.h"
#include <errno.h>
#include <stdio.h>
#include <stdbool.h>
#include <stdlib.h>
#include <string.h>
typedef struct {
char *data;
size_t len;
size_t cap;
} amduatd_mounts_sync_buf_t;
static void amduatd_mounts_sync_buf_free(amduatd_mounts_sync_buf_t *b) {
if (b == NULL) {
return;
}
free(b->data);
b->data = NULL;
b->len = 0;
b->cap = 0;
}
static bool amduatd_mounts_sync_buf_reserve(amduatd_mounts_sync_buf_t *b,
size_t extra) {
size_t need;
size_t next_cap;
char *next;
if (b == NULL) {
return false;
}
if (extra > (SIZE_MAX - b->len)) {
return false;
}
need = b->len + extra;
if (need <= b->cap) {
return true;
}
next_cap = b->cap != 0u ? b->cap : 256u;
while (next_cap < need) {
if (next_cap > (SIZE_MAX / 2u)) {
next_cap = need;
break;
}
next_cap *= 2u;
}
next = (char *)realloc(b->data, next_cap);
if (next == NULL) {
return false;
}
b->data = next;
b->cap = next_cap;
return true;
}
static bool amduatd_mounts_sync_buf_append(amduatd_mounts_sync_buf_t *b,
const char *s,
size_t n) {
if (b == NULL) {
return false;
}
if (n == 0u) {
return true;
}
if (s == NULL) {
return false;
}
if (!amduatd_mounts_sync_buf_reserve(b, n + 1u)) {
return false;
}
memcpy(b->data + b->len, s, n);
b->len += n;
b->data[b->len] = '\0';
return true;
}
static bool amduatd_mounts_sync_buf_append_cstr(amduatd_mounts_sync_buf_t *b,
const char *s) {
return amduatd_mounts_sync_buf_append(
b, s != NULL ? s : "", s != NULL ? strlen(s) : 0u);
}
static bool amduatd_mounts_sync_buf_append_char(amduatd_mounts_sync_buf_t *b,
char c) {
return amduatd_mounts_sync_buf_append(b, &c, 1u);
}
static bool amduatd_space_mounts_parse_u32(const char *s, uint32_t *out) {
unsigned long val;
char *endp = NULL;
if (s == NULL) {
return false;
}
errno = 0;
val = strtoul(s, &endp, 10);
if (errno != 0 || endp == s || *endp != '\0' || val > UINT32_MAX) {
return false;
}
if (out != NULL) {
*out = (uint32_t)val;
}
return true;
}
static const char *amduatd_space_mounts_sync_status_code(
amduatd_fed_pull_apply_status_t status) {
switch (status) {
case AMDUATD_FED_PULL_APPLY_ERR_INVALID:
return "invalid";
case AMDUATD_FED_PULL_APPLY_ERR_DISABLED:
return "disabled";
case AMDUATD_FED_PULL_APPLY_ERR_UNSUPPORTED:
return "unsupported";
case AMDUATD_FED_PULL_APPLY_ERR_REMOTE:
return "remote";
case AMDUATD_FED_PULL_APPLY_ERR_STORE:
return "store";
case AMDUATD_FED_PULL_APPLY_ERR_CONFLICT:
return "conflict";
case AMDUATD_FED_PULL_APPLY_ERR_OOM:
return "oom";
case AMDUATD_FED_PULL_APPLY_OK:
default:
return "error";
}
}
void amduatd_space_mounts_sync_report_free(
amduatd_space_mounts_sync_report_t *report) {
if (report == NULL) {
return;
}
amduat_reference_free(&report->manifest_ref);
free(report->results_json);
report->results_json = NULL;
report->results_len = 0u;
report->mounts_total = 0u;
report->mounts_synced = 0u;
report->ok = false;
}
amduatd_space_mounts_sync_status_t amduatd_space_mounts_sync_until(
amduat_asl_store_t *store,
amduat_asl_pointer_store_t *pointer_store,
const amduatd_space_t *effective_space,
const amduatd_fed_cfg_t *fed_cfg,
const amduatd_fed_pull_transport_t *transport,
uint64_t limit,
uint64_t max_rounds,
size_t max_mounts,
amduatd_space_mounts_sync_report_t *out_report) {
amduatd_space_manifest_t manifest;
amduat_reference_t manifest_ref;
amduatd_space_manifest_status_t status;
amduatd_mounts_sync_buf_t b;
size_t track_total = 0u;
size_t attempted = 0u;
bool ok_all = true;
if (out_report != NULL) {
memset(out_report, 0, sizeof(*out_report));
out_report->manifest_ref = amduat_reference(0u, amduat_octets(NULL, 0u));
}
if (store == NULL || pointer_store == NULL || out_report == NULL ||
fed_cfg == NULL || transport == NULL || max_rounds == 0u) {
return AMDUATD_SPACE_MOUNTS_SYNC_ERR_INVALID;
}
memset(&manifest, 0, sizeof(manifest));
memset(&manifest_ref, 0, sizeof(manifest_ref));
memset(&b, 0, sizeof(b));
status = amduatd_space_manifest_get(store,
pointer_store,
effective_space,
&manifest_ref,
&manifest);
if (status == AMDUATD_SPACE_MANIFEST_ERR_NOT_FOUND) {
return AMDUATD_SPACE_MOUNTS_SYNC_ERR_NOT_FOUND;
}
if (status == AMDUATD_SPACE_MANIFEST_ERR_STORE) {
amduat_reference_free(&manifest_ref);
return AMDUATD_SPACE_MOUNTS_SYNC_ERR_STORE;
}
if (status != AMDUATD_SPACE_MANIFEST_OK) {
amduat_reference_free(&manifest_ref);
return AMDUATD_SPACE_MOUNTS_SYNC_ERR_CODEC;
}
if (!amduatd_mounts_sync_buf_append_char(&b, '[')) {
amduatd_space_manifest_free(&manifest);
amduat_reference_free(&manifest_ref);
amduatd_mounts_sync_buf_free(&b);
return AMDUATD_SPACE_MOUNTS_SYNC_ERR_OOM;
}
for (size_t i = 0u; i < manifest.mounts_len; ++i) {
const amduatd_space_manifest_mount_t *mount = &manifest.mounts[i];
bool mount_ok = true;
bool peer_ok = true;
bool remote_ok = true;
if (mount->mode != AMDUATD_SPACE_MANIFEST_MOUNT_TRACK) {
continue;
}
track_total++;
if (attempted >= max_mounts) {
continue;
}
if (attempted != 0u) {
if (!amduatd_mounts_sync_buf_append_char(&b, ',')) {
amduatd_space_manifest_free(&manifest);
amduat_reference_free(&manifest_ref);
amduatd_mounts_sync_buf_free(&b);
return AMDUATD_SPACE_MOUNTS_SYNC_ERR_OOM;
}
}
if (!amduatd_mounts_sync_buf_append_cstr(&b, "{\"name\":\"") ||
!amduatd_mounts_sync_buf_append_cstr(&b, mount->name) ||
!amduatd_mounts_sync_buf_append_cstr(&b, "\",\"peer_key\":\"") ||
!amduatd_mounts_sync_buf_append_cstr(&b, mount->peer_key) ||
!amduatd_mounts_sync_buf_append_cstr(&b, "\",\"remote_space_id\":\"") ||
!amduatd_mounts_sync_buf_append_cstr(&b, mount->space_id) ||
!amduatd_mounts_sync_buf_append_cstr(&b, "\",\"status\":\"")) {
amduatd_space_manifest_free(&manifest);
amduat_reference_free(&manifest_ref);
amduatd_mounts_sync_buf_free(&b);
return AMDUATD_SPACE_MOUNTS_SYNC_ERR_OOM;
}
peer_ok = amduatd_space_mounts_parse_u32(mount->peer_key, NULL);
remote_ok = amduatd_space_space_id_is_valid(mount->space_id);
if (!peer_ok || !remote_ok) {
mount_ok = false;
}
if (!mount_ok) {
ok_all = false;
if (!amduatd_mounts_sync_buf_append_cstr(&b, "error") ||
!amduatd_mounts_sync_buf_append_cstr(&b, "\",\"error\":{")) {
amduatd_space_manifest_free(&manifest);
amduat_reference_free(&manifest_ref);
amduatd_mounts_sync_buf_free(&b);
return AMDUATD_SPACE_MOUNTS_SYNC_ERR_OOM;
}
if (!peer_ok) {
if (!amduatd_mounts_sync_buf_append_cstr(&b,
"\"code\":\"invalid_peer\","
"\"message\":\"invalid peer\""
"}")) {
amduatd_space_manifest_free(&manifest);
amduat_reference_free(&manifest_ref);
amduatd_mounts_sync_buf_free(&b);
return AMDUATD_SPACE_MOUNTS_SYNC_ERR_OOM;
}
} else if (!remote_ok) {
if (!amduatd_mounts_sync_buf_append_cstr(
&b,
"\"code\":\"invalid_remote_space_id\","
"\"message\":\"invalid remote_space_id\""
"}")) {
amduatd_space_manifest_free(&manifest);
amduat_reference_free(&manifest_ref);
amduatd_mounts_sync_buf_free(&b);
return AMDUATD_SPACE_MOUNTS_SYNC_ERR_OOM;
}
} else if (!amduatd_mounts_sync_buf_append_cstr(
&b,
"\"code\":\"invalid_mount\","
"\"message\":\"invalid mount\""
"}")) {
amduatd_space_manifest_free(&manifest);
amduat_reference_free(&manifest_ref);
amduatd_mounts_sync_buf_free(&b);
return AMDUATD_SPACE_MOUNTS_SYNC_ERR_OOM;
}
if (!amduatd_mounts_sync_buf_append_cstr(&b, "}")) {
amduatd_space_manifest_free(&manifest);
amduat_reference_free(&manifest_ref);
amduatd_mounts_sync_buf_free(&b);
return AMDUATD_SPACE_MOUNTS_SYNC_ERR_OOM;
}
attempted++;
continue;
}
{
amduatd_fed_until_report_t report;
amduatd_fed_pull_apply_status_t sync_status;
char *cursor_ref_hex = NULL;
sync_status = amduatd_fed_pull_until(store,
pointer_store,
effective_space,
mount->peer_key,
mount->space_id,
limit,
max_rounds,
fed_cfg,
transport,
&report);
if (sync_status != AMDUATD_FED_PULL_APPLY_OK) {
const char *code = amduatd_space_mounts_sync_status_code(sync_status);
const char *message =
report.error[0] != '\0' ? report.error : "error";
ok_all = false;
if (!amduatd_mounts_sync_buf_append_cstr(&b, "error") ||
!amduatd_mounts_sync_buf_append_cstr(&b, "\",\"error\":{") ||
!amduatd_mounts_sync_buf_append_cstr(&b, "\"code\":\"") ||
!amduatd_mounts_sync_buf_append_cstr(&b, code) ||
!amduatd_mounts_sync_buf_append_cstr(&b, "\",\"message\":\"") ||
!amduatd_mounts_sync_buf_append_cstr(&b, message) ||
!amduatd_mounts_sync_buf_append_cstr(&b, "\"}}")) {
amduatd_fed_until_report_free(&report);
amduatd_space_manifest_free(&manifest);
amduat_reference_free(&manifest_ref);
amduatd_mounts_sync_buf_free(&b);
return AMDUATD_SPACE_MOUNTS_SYNC_ERR_OOM;
}
amduatd_fed_until_report_free(&report);
attempted++;
continue;
}
if (!amduatd_mounts_sync_buf_append_cstr(&b, "ok") ||
!amduatd_mounts_sync_buf_append_cstr(
&b,
"\",\"caught_up\":") ||
!amduatd_mounts_sync_buf_append_cstr(&b,
report.caught_up ? "true"
: "false") ||
!amduatd_mounts_sync_buf_append_cstr(&b, ",\"rounds_executed\":")) {
amduatd_fed_until_report_free(&report);
amduatd_space_manifest_free(&manifest);
amduat_reference_free(&manifest_ref);
amduatd_mounts_sync_buf_free(&b);
return AMDUATD_SPACE_MOUNTS_SYNC_ERR_OOM;
}
{
char tmp[32];
int n = snprintf(tmp, sizeof(tmp), "%llu",
(unsigned long long)report.rounds_executed);
if (n <= 0 || (size_t)n >= sizeof(tmp) ||
!amduatd_mounts_sync_buf_append_cstr(&b, tmp)) {
amduatd_fed_until_report_free(&report);
amduatd_space_manifest_free(&manifest);
amduat_reference_free(&manifest_ref);
amduatd_mounts_sync_buf_free(&b);
return AMDUATD_SPACE_MOUNTS_SYNC_ERR_OOM;
}
}
if (!amduatd_mounts_sync_buf_append_cstr(
&b,
",\"applied\":{\"records\":")) {
amduatd_fed_until_report_free(&report);
amduatd_space_manifest_free(&manifest);
amduat_reference_free(&manifest_ref);
amduatd_mounts_sync_buf_free(&b);
return AMDUATD_SPACE_MOUNTS_SYNC_ERR_OOM;
}
{
char tmp[32];
int n = snprintf(tmp, sizeof(tmp), "%zu", report.total_records);
if (n <= 0 || (size_t)n >= sizeof(tmp) ||
!amduatd_mounts_sync_buf_append_cstr(&b, tmp)) {
amduatd_fed_until_report_free(&report);
amduatd_space_manifest_free(&manifest);
amduat_reference_free(&manifest_ref);
amduatd_mounts_sync_buf_free(&b);
return AMDUATD_SPACE_MOUNTS_SYNC_ERR_OOM;
}
}
if (!amduatd_mounts_sync_buf_append_cstr(
&b,
",\"artifacts\":")) {
amduatd_fed_until_report_free(&report);
amduatd_space_manifest_free(&manifest);
amduat_reference_free(&manifest_ref);
amduatd_mounts_sync_buf_free(&b);
return AMDUATD_SPACE_MOUNTS_SYNC_ERR_OOM;
}
{
char tmp[32];
int n = snprintf(tmp, sizeof(tmp), "%zu", report.total_artifacts);
if (n <= 0 || (size_t)n >= sizeof(tmp) ||
!amduatd_mounts_sync_buf_append_cstr(&b, tmp)) {
amduatd_fed_until_report_free(&report);
amduatd_space_manifest_free(&manifest);
amduat_reference_free(&manifest_ref);
amduatd_mounts_sync_buf_free(&b);
return AMDUATD_SPACE_MOUNTS_SYNC_ERR_OOM;
}
}
if (!amduatd_mounts_sync_buf_append_cstr(&b, "},\"cursor\":{")) {
amduatd_fed_until_report_free(&report);
amduatd_space_manifest_free(&manifest);
amduat_reference_free(&manifest_ref);
amduatd_mounts_sync_buf_free(&b);
return AMDUATD_SPACE_MOUNTS_SYNC_ERR_OOM;
}
{
bool need_comma = false;
if (report.cursor_has_logseq) {
char tmp[32];
int n = snprintf(tmp, sizeof(tmp), "%llu",
(unsigned long long)report.cursor_logseq);
if (n <= 0 || (size_t)n >= sizeof(tmp) ||
!amduatd_mounts_sync_buf_append_cstr(&b, "\"last_logseq\":") ||
!amduatd_mounts_sync_buf_append_cstr(&b, tmp)) {
amduatd_fed_until_report_free(&report);
amduatd_space_manifest_free(&manifest);
amduat_reference_free(&manifest_ref);
amduatd_mounts_sync_buf_free(&b);
return AMDUATD_SPACE_MOUNTS_SYNC_ERR_OOM;
}
need_comma = true;
}
if (report.cursor_ref_set) {
if (!amduat_asl_ref_encode_hex(report.cursor_ref, &cursor_ref_hex)) {
amduatd_fed_until_report_free(&report);
amduatd_space_manifest_free(&manifest);
amduat_reference_free(&manifest_ref);
amduatd_mounts_sync_buf_free(&b);
return AMDUATD_SPACE_MOUNTS_SYNC_ERR_OOM;
}
if (need_comma) {
if (!amduatd_mounts_sync_buf_append_char(&b, ',')) {
free(cursor_ref_hex);
amduatd_fed_until_report_free(&report);
amduatd_space_manifest_free(&manifest);
amduat_reference_free(&manifest_ref);
amduatd_mounts_sync_buf_free(&b);
return AMDUATD_SPACE_MOUNTS_SYNC_ERR_OOM;
}
}
if (!amduatd_mounts_sync_buf_append_cstr(&b, "\"ref\":\"") ||
!amduatd_mounts_sync_buf_append_cstr(&b, cursor_ref_hex) ||
!amduatd_mounts_sync_buf_append_cstr(&b, "\"")) {
free(cursor_ref_hex);
amduatd_fed_until_report_free(&report);
amduatd_space_manifest_free(&manifest);
amduat_reference_free(&manifest_ref);
amduatd_mounts_sync_buf_free(&b);
return AMDUATD_SPACE_MOUNTS_SYNC_ERR_OOM;
}
free(cursor_ref_hex);
cursor_ref_hex = NULL;
}
}
if (!amduatd_mounts_sync_buf_append_cstr(&b, "}}")) {
amduatd_fed_until_report_free(&report);
amduatd_space_manifest_free(&manifest);
amduat_reference_free(&manifest_ref);
amduatd_mounts_sync_buf_free(&b);
return AMDUATD_SPACE_MOUNTS_SYNC_ERR_OOM;
}
amduatd_fed_until_report_free(&report);
}
attempted++;
}
if (!amduatd_mounts_sync_buf_append_char(&b, ']')) {
amduatd_space_manifest_free(&manifest);
amduat_reference_free(&manifest_ref);
amduatd_mounts_sync_buf_free(&b);
return AMDUATD_SPACE_MOUNTS_SYNC_ERR_OOM;
}
out_report->mounts_total = track_total;
out_report->mounts_synced = attempted;
out_report->ok = ok_all;
out_report->results_json = b.data;
out_report->results_len = b.len;
out_report->manifest_ref = manifest_ref;
amduatd_space_manifest_free(&manifest);
return AMDUATD_SPACE_MOUNTS_SYNC_OK;
}

View file

@ -0,0 +1,54 @@
#ifndef AMDUATD_SPACE_MOUNTS_SYNC_H
#define AMDUATD_SPACE_MOUNTS_SYNC_H
#include "amduat/asl/asl_pointer_fs.h"
#include "amduat/asl/store.h"
#include "amduatd_fed.h"
#include "amduatd_fed_pull_apply.h"
#include "amduatd_space.h"
#include <stdbool.h>
#include <stddef.h>
#include <stdint.h>
#ifdef __cplusplus
extern "C" {
#endif
typedef enum {
AMDUATD_SPACE_MOUNTS_SYNC_OK = 0,
AMDUATD_SPACE_MOUNTS_SYNC_ERR_INVALID = 1,
AMDUATD_SPACE_MOUNTS_SYNC_ERR_NOT_FOUND = 2,
AMDUATD_SPACE_MOUNTS_SYNC_ERR_STORE = 3,
AMDUATD_SPACE_MOUNTS_SYNC_ERR_CODEC = 4,
AMDUATD_SPACE_MOUNTS_SYNC_ERR_OOM = 5
} amduatd_space_mounts_sync_status_t;
typedef struct {
amduat_reference_t manifest_ref;
size_t mounts_total;
size_t mounts_synced;
bool ok;
char *results_json;
size_t results_len;
} amduatd_space_mounts_sync_report_t;
void amduatd_space_mounts_sync_report_free(
amduatd_space_mounts_sync_report_t *report);
amduatd_space_mounts_sync_status_t amduatd_space_mounts_sync_until(
amduat_asl_store_t *store,
amduat_asl_pointer_store_t *pointer_store,
const amduatd_space_t *effective_space,
const amduatd_fed_cfg_t *fed_cfg,
const amduatd_fed_pull_transport_t *transport,
uint64_t limit,
uint64_t max_rounds,
size_t max_mounts,
amduatd_space_mounts_sync_report_t *out_report);
#ifdef __cplusplus
} /* extern "C" */
#endif
#endif /* AMDUATD_SPACE_MOUNTS_SYNC_H */

View file

@ -0,0 +1,582 @@
#ifndef _POSIX_C_SOURCE
#define _POSIX_C_SOURCE 200809L
#endif
#include "amduatd_space_mounts_sync.h"
#include "amduatd_fed_cursor.h"
#include "amduatd_space.h"
#include "amduatd_space_manifest.h"
#include "amduatd_store.h"
#include "amduat/asl/asl_pointer_fs.h"
#include "amduat/asl/asl_store_fs_meta.h"
#include "amduat/asl/ref_text.h"
#include "amduat/hash/asl1.h"
#include <stdbool.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
typedef struct {
size_t calls;
} amduatd_test_sync_transport_t;
static int failures = 0;
static void expect(bool cond, const char *msg) {
if (!cond) {
fprintf(stderr, "FAIL: %s\n", msg);
failures++;
}
}
static char *amduatd_test_make_temp_dir(void) {
char tmpl[] = "/tmp/amduatd-space-mounts-sync-XXXXXX";
char *dir = mkdtemp(tmpl);
size_t len;
char *copy;
if (dir == NULL) {
perror("mkdtemp");
return NULL;
}
len = strlen(dir);
copy = (char *)malloc(len + 1u);
if (copy == NULL) {
fprintf(stderr, "failed to allocate temp dir copy\n");
return NULL;
}
memcpy(copy, dir, len + 1u);
return copy;
}
static bool amduatd_make_test_ref(uint8_t fill, amduat_reference_t *out_ref) {
uint8_t digest_bytes[32];
amduat_octets_t digest;
if (out_ref == NULL) {
return false;
}
memset(digest_bytes, fill, sizeof(digest_bytes));
if (!amduat_octets_clone(amduat_octets(digest_bytes, sizeof(digest_bytes)),
&digest)) {
return false;
}
*out_ref = amduat_reference(AMDUAT_HASH_ASL1_ID_SHA256, digest);
return true;
}
static bool amduatd_test_get_records(void *ctx,
uint32_t domain_id,
uint64_t from_logseq,
uint64_t limit,
int *out_status,
amduat_fed_record_t **out_records,
size_t *out_len,
char **out_body) {
amduatd_test_sync_transport_t *t = (amduatd_test_sync_transport_t *)ctx;
(void)domain_id;
(void)from_logseq;
(void)limit;
if (out_status == NULL || out_records == NULL || out_len == NULL) {
return false;
}
*out_status = 200;
*out_records = NULL;
*out_len = 0u;
if (out_body != NULL) {
*out_body = NULL;
}
if (t != NULL) {
t->calls++;
}
return true;
}
static void amduatd_test_free_records(void *ctx,
amduat_fed_record_t *records,
size_t len) {
(void)ctx;
(void)records;
(void)len;
}
static bool amduatd_test_get_artifact(void *ctx,
amduat_reference_t ref,
int *out_status,
amduat_octets_t *out_bytes,
char **out_body) {
(void)ctx;
(void)ref;
if (out_status == NULL || out_bytes == NULL) {
return false;
}
*out_status = 404;
*out_bytes = amduat_octets(NULL, 0u);
if (out_body != NULL) {
*out_body = NULL;
}
return true;
}
static bool amduatd_test_seed_manifest(amduat_asl_store_t *store,
amduat_asl_pointer_store_t *pointer_store,
const amduatd_space_t *space,
const char *json) {
amduat_reference_t ref;
amduatd_space_manifest_t manifest;
amduat_octets_t payload = amduat_octets(json, strlen(json));
amduatd_space_manifest_status_t status;
memset(&ref, 0, sizeof(ref));
memset(&manifest, 0, sizeof(manifest));
status = amduatd_space_manifest_put(store,
pointer_store,
space,
payload,
NULL,
&ref,
&manifest);
if (status != AMDUATD_SPACE_MANIFEST_OK) {
amduatd_space_manifest_free(&manifest);
amduat_reference_free(&ref);
return false;
}
amduatd_space_manifest_free(&manifest);
amduat_reference_free(&ref);
return true;
}
static bool amduatd_test_seed_cursor(amduat_asl_store_t *store,
amduat_asl_pointer_store_t *pointer_store,
const amduatd_space_t *space,
const char *peer_key,
const char *remote_space_id,
uint64_t logseq,
uint8_t fill) {
amduatd_fed_cursor_record_t cursor;
amduat_reference_t ref;
amduatd_fed_cursor_status_t status;
const char *space_id = NULL;
bool ok = false;
amduatd_fed_cursor_record_init(&cursor);
cursor.peer_key = strdup(peer_key);
if (space != NULL && space->enabled && space->space_id.data != NULL) {
space_id = (const char *)space->space_id.data;
}
cursor.space_id = space_id != NULL ? strdup(space_id) : NULL;
if (cursor.peer_key == NULL || cursor.space_id == NULL) {
amduatd_fed_cursor_record_free(&cursor);
return false;
}
cursor.has_logseq = true;
cursor.last_logseq = logseq;
if (!amduatd_make_test_ref(fill, &ref)) {
amduatd_fed_cursor_record_free(&cursor);
return false;
}
cursor.has_record_ref = true;
cursor.last_record_ref = ref;
status = amduatd_fed_cursor_cas_set_remote(store,
pointer_store,
space,
peer_key,
remote_space_id,
NULL,
&cursor,
NULL);
ok = (status == AMDUATD_FED_CURSOR_OK);
amduatd_fed_cursor_record_free(&cursor);
return ok;
}
static int amduatd_test_sync_missing_manifest(void) {
char *root = amduatd_test_make_temp_dir();
amduat_asl_store_fs_config_t cfg;
amduatd_store_ctx_t store_ctx;
amduat_asl_store_t store;
amduat_asl_pointer_store_t pointer_store;
amduatd_space_t space;
amduatd_fed_cfg_t fed_cfg;
amduatd_test_sync_transport_t transport_state;
amduatd_fed_pull_transport_t transport;
amduatd_space_mounts_sync_report_t report;
amduatd_space_mounts_sync_status_t status;
if (root == NULL) {
return 1;
}
memset(&cfg, 0, sizeof(cfg));
if (!amduat_asl_store_fs_init_root(root, NULL, &cfg)) {
fprintf(stderr, "failed to init store root\n");
free(root);
return 1;
}
memset(&store_ctx, 0, sizeof(store_ctx));
memset(&store, 0, sizeof(store));
if (!amduatd_store_init(&store,
&cfg,
&store_ctx,
root,
AMDUATD_STORE_BACKEND_INDEX)) {
fprintf(stderr, "failed to init store\n");
free(root);
return 1;
}
if (!amduat_asl_pointer_store_init(&pointer_store, root)) {
fprintf(stderr, "failed to init pointer store\n");
free(root);
return 1;
}
if (!amduatd_space_init(&space, "alpha", false)) {
fprintf(stderr, "failed to init space\n");
free(root);
return 1;
}
amduatd_fed_cfg_init(&fed_cfg);
fed_cfg.enabled = true;
memset(&transport_state, 0, sizeof(transport_state));
memset(&transport, 0, sizeof(transport));
transport.ctx = &transport_state;
transport.get_records = amduatd_test_get_records;
transport.free_records = amduatd_test_free_records;
transport.get_artifact = amduatd_test_get_artifact;
status = amduatd_space_mounts_sync_until(&store,
&pointer_store,
&space,
&fed_cfg,
&transport,
128u,
10u,
32u,
&report);
expect(status == AMDUATD_SPACE_MOUNTS_SYNC_ERR_NOT_FOUND,
"missing manifest");
free(root);
return failures == 0 ? 0 : 1;
}
static int amduatd_test_sync_no_track_mounts(void) {
char *root = amduatd_test_make_temp_dir();
amduat_asl_store_fs_config_t cfg;
amduatd_store_ctx_t store_ctx;
amduat_asl_store_t store;
amduat_asl_pointer_store_t pointer_store;
amduatd_space_t space;
amduatd_fed_cfg_t fed_cfg;
amduatd_test_sync_transport_t transport_state;
amduatd_fed_pull_transport_t transport;
amduat_reference_t pinned_ref;
char *pinned_hex = NULL;
char json[512];
int n;
amduatd_space_mounts_sync_report_t report;
amduatd_space_mounts_sync_status_t status;
if (root == NULL) {
return 1;
}
memset(&cfg, 0, sizeof(cfg));
if (!amduat_asl_store_fs_init_root(root, NULL, &cfg)) {
fprintf(stderr, "failed to init store root\n");
free(root);
return 1;
}
memset(&store_ctx, 0, sizeof(store_ctx));
memset(&store, 0, sizeof(store));
if (!amduatd_store_init(&store,
&cfg,
&store_ctx,
root,
AMDUATD_STORE_BACKEND_INDEX)) {
fprintf(stderr, "failed to init store\n");
free(root);
return 1;
}
if (!amduat_asl_pointer_store_init(&pointer_store, root)) {
fprintf(stderr, "failed to init pointer store\n");
free(root);
return 1;
}
if (!amduatd_space_init(&space, "alpha", false)) {
fprintf(stderr, "failed to init space\n");
free(root);
return 1;
}
amduatd_fed_cfg_init(&fed_cfg);
fed_cfg.enabled = true;
if (!amduatd_make_test_ref(0x11, &pinned_ref) ||
!amduat_asl_ref_encode_hex(pinned_ref, &pinned_hex)) {
fprintf(stderr, "failed to build pinned ref\n");
amduat_reference_free(&pinned_ref);
free(root);
return 1;
}
n = snprintf(json, sizeof(json),
"{\"version\":1,\"mounts\":[{\"name\":\"p1\","
"\"peer_key\":\"1\",\"space_id\":\"beta\","
"\"mode\":\"pinned\",\"pinned_root_ref\":\"%s\"}]}",
pinned_hex);
free(pinned_hex);
amduat_reference_free(&pinned_ref);
if (n <= 0 || (size_t)n >= sizeof(json)) {
fprintf(stderr, "failed to format manifest json\n");
free(root);
return 1;
}
if (!amduatd_test_seed_manifest(&store, &pointer_store, &space, json)) {
fprintf(stderr, "failed to seed manifest\n");
free(root);
return 1;
}
memset(&transport_state, 0, sizeof(transport_state));
memset(&transport, 0, sizeof(transport));
transport.ctx = &transport_state;
transport.get_records = amduatd_test_get_records;
transport.free_records = amduatd_test_free_records;
transport.get_artifact = amduatd_test_get_artifact;
status = amduatd_space_mounts_sync_until(&store,
&pointer_store,
&space,
&fed_cfg,
&transport,
128u,
10u,
32u,
&report);
expect(status == AMDUATD_SPACE_MOUNTS_SYNC_OK, "sync no-track ok");
expect(report.mounts_total == 0u, "no-track total");
expect(report.mounts_synced == 0u, "no-track synced");
expect(report.ok, "no-track ok flag");
expect(report.results_json != NULL &&
strcmp(report.results_json, "[]") == 0,
"no-track results empty");
amduatd_space_mounts_sync_report_free(&report);
free(root);
return failures == 0 ? 0 : 1;
}
static int amduatd_test_sync_two_mounts(void) {
char *root = amduatd_test_make_temp_dir();
amduat_asl_store_fs_config_t cfg;
amduatd_store_ctx_t store_ctx;
amduat_asl_store_t store;
amduat_asl_pointer_store_t pointer_store;
amduatd_space_t space;
amduatd_fed_cfg_t fed_cfg;
amduatd_test_sync_transport_t transport_state;
amduatd_fed_pull_transport_t transport;
const char *json =
"{\"version\":1,\"mounts\":["
"{\"name\":\"b\",\"peer_key\":\"2\",\"space_id\":\"beta\",\"mode\":\"track\"},"
"{\"name\":\"a\",\"peer_key\":\"1\",\"space_id\":\"alpha\",\"mode\":\"track\"}"
"]}";
amduatd_space_mounts_sync_report_t report;
amduatd_space_mounts_sync_status_t status;
const char *first_a = NULL;
const char *second_b = NULL;
if (root == NULL) {
return 1;
}
memset(&cfg, 0, sizeof(cfg));
if (!amduat_asl_store_fs_init_root(root, NULL, &cfg)) {
fprintf(stderr, "failed to init store root\n");
free(root);
return 1;
}
memset(&store_ctx, 0, sizeof(store_ctx));
memset(&store, 0, sizeof(store));
if (!amduatd_store_init(&store,
&cfg,
&store_ctx,
root,
AMDUATD_STORE_BACKEND_INDEX)) {
fprintf(stderr, "failed to init store\n");
free(root);
return 1;
}
if (!amduat_asl_pointer_store_init(&pointer_store, root)) {
fprintf(stderr, "failed to init pointer store\n");
free(root);
return 1;
}
if (!amduatd_space_init(&space, "alpha", false)) {
fprintf(stderr, "failed to init space\n");
free(root);
return 1;
}
amduatd_fed_cfg_init(&fed_cfg);
fed_cfg.enabled = true;
if (!amduatd_test_seed_manifest(&store, &pointer_store, &space, json)) {
fprintf(stderr, "failed to seed manifest\n");
free(root);
return 1;
}
if (!amduatd_test_seed_cursor(&store,
&pointer_store,
&space,
"1",
"alpha",
5u,
0x22) ||
!amduatd_test_seed_cursor(&store,
&pointer_store,
&space,
"2",
"beta",
7u,
0x33)) {
fprintf(stderr, "failed to seed cursors\n");
free(root);
return 1;
}
memset(&transport_state, 0, sizeof(transport_state));
memset(&transport, 0, sizeof(transport));
transport.ctx = &transport_state;
transport.get_records = amduatd_test_get_records;
transport.free_records = amduatd_test_free_records;
transport.get_artifact = amduatd_test_get_artifact;
status = amduatd_space_mounts_sync_until(&store,
&pointer_store,
&space,
&fed_cfg,
&transport,
128u,
1u,
32u,
&report);
expect(status == AMDUATD_SPACE_MOUNTS_SYNC_OK, "sync two ok");
expect(report.mounts_total == 2u, "sync two total");
expect(report.mounts_synced == 2u, "sync two synced");
expect(report.ok, "sync two ok flag");
expect(transport_state.calls == 2u, "sync two transport calls");
expect(report.results_json != NULL, "sync two results present");
if (report.results_json != NULL) {
first_a = strstr(report.results_json, "\"name\":\"a\"");
second_b = strstr(report.results_json, "\"name\":\"b\"");
}
expect(first_a != NULL && second_b != NULL && first_a < second_b,
"sync results canonical order");
expect(strstr(report.results_json, "\"last_logseq\":5") != NULL,
"sync cursor logseq a");
expect(strstr(report.results_json, "\"last_logseq\":7") != NULL,
"sync cursor logseq b");
amduatd_space_mounts_sync_report_free(&report);
free(root);
return failures == 0 ? 0 : 1;
}
static int amduatd_test_sync_partial_error(void) {
char *root = amduatd_test_make_temp_dir();
amduat_asl_store_fs_config_t cfg;
amduatd_store_ctx_t store_ctx;
amduat_asl_store_t store;
amduat_asl_pointer_store_t pointer_store;
amduatd_space_t space;
amduatd_fed_cfg_t fed_cfg;
amduatd_test_sync_transport_t transport_state;
amduatd_fed_pull_transport_t transport;
const char *json =
"{\"version\":1,\"mounts\":["
"{\"name\":\"a\",\"peer_key\":\"peer-a\",\"space_id\":\"alpha\","
"\"mode\":\"track\"},"
"{\"name\":\"b\",\"peer_key\":\"2\",\"space_id\":\"beta\",\"mode\":\"track\"}"
"]}";
amduatd_space_mounts_sync_report_t report;
amduatd_space_mounts_sync_status_t status;
if (root == NULL) {
return 1;
}
memset(&cfg, 0, sizeof(cfg));
if (!amduat_asl_store_fs_init_root(root, NULL, &cfg)) {
fprintf(stderr, "failed to init store root\n");
free(root);
return 1;
}
memset(&store_ctx, 0, sizeof(store_ctx));
memset(&store, 0, sizeof(store));
if (!amduatd_store_init(&store,
&cfg,
&store_ctx,
root,
AMDUATD_STORE_BACKEND_INDEX)) {
fprintf(stderr, "failed to init store\n");
free(root);
return 1;
}
if (!amduat_asl_pointer_store_init(&pointer_store, root)) {
fprintf(stderr, "failed to init pointer store\n");
free(root);
return 1;
}
if (!amduatd_space_init(&space, "alpha", false)) {
fprintf(stderr, "failed to init space\n");
free(root);
return 1;
}
amduatd_fed_cfg_init(&fed_cfg);
fed_cfg.enabled = true;
if (!amduatd_test_seed_manifest(&store, &pointer_store, &space, json)) {
fprintf(stderr, "failed to seed manifest\n");
free(root);
return 1;
}
memset(&transport_state, 0, sizeof(transport_state));
memset(&transport, 0, sizeof(transport));
transport.ctx = &transport_state;
transport.get_records = amduatd_test_get_records;
transport.free_records = amduatd_test_free_records;
transport.get_artifact = amduatd_test_get_artifact;
status = amduatd_space_mounts_sync_until(&store,
&pointer_store,
&space,
&fed_cfg,
&transport,
128u,
1u,
32u,
&report);
expect(status == AMDUATD_SPACE_MOUNTS_SYNC_OK, "sync partial ok");
expect(!report.ok, "sync partial ok flag false");
expect(report.mounts_total == 2u, "sync partial total");
expect(report.mounts_synced == 2u, "sync partial synced");
expect(transport_state.calls == 1u, "sync partial transport calls");
expect(report.results_json != NULL &&
strstr(report.results_json, "\"code\":\"invalid_peer\"") != NULL,
"sync partial invalid peer");
amduatd_space_mounts_sync_report_free(&report);
free(root);
return failures == 0 ? 0 : 1;
}
int main(void) {
if (amduatd_test_sync_missing_manifest() != 0) {
return 1;
}
if (amduatd_test_sync_no_track_mounts() != 0) {
return 1;
}
if (amduatd_test_sync_two_mounts() != 0) {
return 1;
}
if (amduatd_test_sync_partial_error() != 0) {
return 1;
}
return failures == 0 ? 0 : 1;
}