Extend fed smoke test to cover push and pull

This commit is contained in:
Carl Niklas Rydberg 2026-01-24 17:10:02 +01:00
parent 8a490ef09e
commit d74884b442
15 changed files with 2736 additions and 47 deletions

View file

@ -28,7 +28,8 @@ set(amduatd_sources src/amduatd.c src/amduatd_http.c src/amduatd_caps.c
src/amduatd_space.c src/amduatd_concepts.c src/amduatd_space.c src/amduatd_concepts.c
src/amduatd_store.c src/amduatd_derivation_index.c src/amduatd_store.c src/amduatd_derivation_index.c
src/amduatd_fed.c src/amduatd_fed_cursor.c src/amduatd_fed.c src/amduatd_fed_cursor.c
src/amduatd_fed_pull_plan.c src/amduatd_fed_pull_apply.c src/amduatd_fed_pull_plan.c src/amduatd_fed_push_plan.c
src/amduatd_fed_pull_apply.c src/amduatd_fed_push_apply.c
src/amduatd_space_doctor.c) src/amduatd_space_doctor.c)
if(AMDUATD_ENABLE_UI) if(AMDUATD_ENABLE_UI)
list(APPEND amduatd_sources src/amduatd_ui.c) list(APPEND amduatd_sources src/amduatd_ui.c)
@ -176,12 +177,57 @@ target_include_directories(amduatd_test_fed_pull_plan
) )
target_link_libraries(amduatd_test_fed_pull_plan target_link_libraries(amduatd_test_fed_pull_plan
PRIVATE amduat_asl amduat_asl_record amduat_enc amduat_util amduat_hash_asl1 PRIVATE amduat_asl amduat_asl_record amduat_asl_log_store amduat_enc amduat_util amduat_hash_asl1
amduat_asl_pointer_fs amduat_asl_pointer_fs
) )
add_test(NAME amduatd_fed_pull_plan COMMAND amduatd_test_fed_pull_plan) add_test(NAME amduatd_fed_pull_plan COMMAND amduatd_test_fed_pull_plan)
add_executable(amduatd_test_fed_push_plan
tests/test_amduatd_fed_push_plan.c
src/amduatd_fed_push_plan.c
src/amduatd_fed_cursor.c
src/amduatd_fed.c
src/amduatd_space.c
)
target_include_directories(amduatd_test_fed_push_plan
PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}
PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}/src
PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}/vendor/amduat/include
)
target_link_libraries(amduatd_test_fed_push_plan
PRIVATE amduat_asl amduat_asl_record amduat_asl_log_store amduat_enc amduat_util amduat_hash_asl1
amduat_asl_pointer_fs
)
add_test(NAME amduatd_fed_push_plan COMMAND amduatd_test_fed_push_plan)
add_executable(amduatd_test_fed_push_apply
tests/test_amduatd_fed_push_apply.c
src/amduatd_fed_push_apply.c
src/amduatd_fed_push_plan.c
src/amduatd_fed_cursor.c
src/amduatd_fed.c
src/amduatd_space.c
src/amduatd_store.c
)
target_include_directories(amduatd_test_fed_push_apply
PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}
PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}/src
PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}/vendor/amduat/include
)
target_link_libraries(amduatd_test_fed_push_apply
PRIVATE amduat_asl_store_fs amduat_asl_store_index_fs amduat_asl_record
amduat_asl amduat_asl_log_store amduat_enc amduat_asl_pointer_fs amduat_util
amduat_hash_asl1
)
add_test(NAME amduatd_fed_push_apply COMMAND amduatd_test_fed_push_apply)
add_executable(amduatd_test_fed_pull_apply add_executable(amduatd_test_fed_pull_apply
tests/test_amduatd_fed_pull_apply.c tests/test_amduatd_fed_pull_apply.c
src/amduatd_fed_pull_apply.c src/amduatd_fed_pull_apply.c

View file

@ -45,7 +45,7 @@ Run the daemon with the index-backed store:
./build/amduatd --root .amduat-asl --sock amduatd.sock --store-backend index ./build/amduatd --root .amduat-asl --sock amduatd.sock --store-backend index
``` ```
Note: `/v1/fed/records` requires the index backend. Note: `/v1/fed/records` and `/v1/fed/push/plan` require the index backend.
## Federation (dev) ## Federation (dev)
@ -79,6 +79,9 @@ records (`fed/cursor` schema). A peer key should be a stable identifier for the
remote (for example a federation registry domain id rendered as a decimal remote (for example a federation registry domain id rendered as a decimal
string). string).
Push cursors are separate from pull cursors and live under
`fed/push_cursor/<peer>/head` (space scoped).
Read the current cursor for a peer: Read the current cursor for a peer:
```sh ```sh
@ -129,6 +132,34 @@ curl --unix-socket amduatd.sock -X POST \
`/v1/fed/pull` requires the index backend and will not advance the cursor on `/v1/fed/pull` requires the index backend and will not advance the cursor on
partial failure. partial failure.
### Federation push plan (sender dry run)
Compute a read-only plan of what would be sent to a peer from the local log
since the push cursor (does not advance the cursor):
```sh
curl --unix-socket amduatd.sock \
'http://localhost/v1/fed/push/plan?peer=2&limit=128' \
-H 'X-Amduat-Space: demo'
```
`/v1/fed/push/plan` requires the index backend and uses a push cursor separate
from the pull cursor.
### Federation push (sender apply)
Send local records to a peer (advances the push cursor only after all records
apply successfully on the peer):
```sh
curl --unix-socket amduatd.sock -X POST \
'http://localhost/v1/fed/push?peer=2&limit=128' \
-H 'X-Amduat-Space: demo'
```
`/v1/fed/push` uses `/v1/fed/ingest` on the peer and only advances the push
cursor after the batch completes. It requires the index backend.
### Federation ingest (receiver) ### Federation ingest (receiver)
`/v1/fed/ingest` applies a single incoming record (push receiver). The request `/v1/fed/ingest` applies a single incoming record (push receiver). The request
@ -182,8 +213,8 @@ Dev loop (build + restart):
## Federation smoke test ## Federation smoke test
Run the end-to-end federation smoke test (starts two local daemons, pulls a Run the end-to-end federation smoke test (starts two local daemons, verifies
record, and verifies the artifact was replicated): pull replication A→B and push replication B→A, and checks cursors):
```sh ```sh
./scripts/test_fed_smoke.sh ./scripts/test_fed_smoke.sh
@ -335,7 +366,9 @@ When the daemon uses the `fs` store backend, index-only checks are reported as
- `GET /v1/fed/cursor?peer=...``{peer_key, space_id, last_logseq, last_record_hash, ref}` - `GET /v1/fed/cursor?peer=...``{peer_key, space_id, last_logseq, last_record_hash, ref}`
- `POST /v1/fed/cursor?peer=...``{ref}` (CAS update; `expected_ref` in body) - `POST /v1/fed/cursor?peer=...``{ref}` (CAS update; `expected_ref` in body)
- `GET /v1/fed/pull/plan?peer=...&limit=...``{peer, effective_space, cursor, remote_scan, records, next_cursor_candidate, ...}` - `GET /v1/fed/pull/plan?peer=...&limit=...``{peer, effective_space, cursor, remote_scan, records, next_cursor_candidate, ...}`
- `GET /v1/fed/push/plan?peer=...&limit=...``{peer, domain_id, effective_space, cursor, scan, records, required_artifacts, next_cursor_candidate}`
- `POST /v1/fed/pull?peer=...&limit=...``{peer, effective_space, cursor_before, plan_summary, applied, cursor_after, errors}` - `POST /v1/fed/pull?peer=...&limit=...``{peer, effective_space, cursor_before, plan_summary, applied, cursor_after, errors}`
- `POST /v1/fed/push?peer=...&limit=...``{peer, domain_id, effective_space, cursor_before, plan_summary, sent, cursor_after, errors}`
- `GET /v1/fed/artifacts/{ref}` → raw bytes for federation resolve - `GET /v1/fed/artifacts/{ref}` → raw bytes for federation resolve
- `GET /v1/fed/status``{status, domain_id, registry_ref, last_tick_ms}` - `GET /v1/fed/status``{status, domain_id, registry_ref, last_tick_ms}`
- `POST /v1/artifacts` - `POST /v1/artifacts`

View file

@ -1102,3 +1102,145 @@ bool amduat_fed_transport_unix_get_artifact_with_status(
free(buf); free(buf);
return true; return true;
} }
static const char *amduat_fed_transport_unix_record_type_name(
amduat_fed_record_type_t type) {
switch (type) {
case AMDUAT_FED_REC_ARTIFACT:
return "artifact";
case AMDUAT_FED_REC_PER:
return "per";
case AMDUAT_FED_REC_TGK_EDGE:
return "tgk_edge";
case AMDUAT_FED_REC_TOMBSTONE:
return "tombstone";
default:
return NULL;
}
}
bool amduat_fed_transport_unix_post_ingest(
amduat_fed_transport_unix_t *transport,
amduat_fed_record_type_t record_type,
amduat_reference_t ref,
amduat_octets_t bytes,
int *out_status,
char **out_body) {
char *ref_hex = NULL;
const char *type_name = NULL;
char space_header[AMDUAT_ASL_POINTER_NAME_MAX + 32u];
const char *space_line = "";
char req[2048];
char *body_copy = NULL;
const uint8_t *resp_body = NULL;
size_t resp_len = 0;
int status = 0;
int fd;
uint8_t *buf = NULL;
size_t buf_len = 0;
const char *body = NULL;
size_t body_len = 0;
const char *content_type = "application/octet-stream";
char json_body[512];
if (out_status != NULL) {
*out_status = 0;
}
if (out_body != NULL) {
*out_body = NULL;
}
if (transport == NULL || out_status == NULL) {
return false;
}
type_name = amduat_fed_transport_unix_record_type_name(record_type);
if (type_name == NULL) {
return false;
}
if (!amduat_asl_ref_encode_hex(ref, &ref_hex)) {
return false;
}
if (transport->has_space) {
snprintf(space_header,
sizeof(space_header),
"X-Amduat-Space: %s\r\n",
transport->space_id);
space_line = space_header;
}
if (record_type == AMDUAT_FED_REC_TOMBSTONE) {
int n = snprintf(json_body,
sizeof(json_body),
"{\"record_type\":\"tombstone\",\"ref\":\"%s\"}",
ref_hex);
if (n <= 0 || (size_t)n >= sizeof(json_body)) {
free(ref_hex);
return false;
}
content_type = "application/json";
body = json_body;
body_len = (size_t)n;
} else {
body = (const char *)bytes.data;
body_len = bytes.len;
if (body_len != 0u && body == NULL) {
free(ref_hex);
return false;
}
}
snprintf(req, sizeof(req),
"POST /v1/fed/ingest?record_type=%s&ref=%s HTTP/1.1\r\n"
"Host: localhost\r\n"
"%s"
"Content-Type: %s\r\n"
"Content-Length: %zu\r\n"
"Connection: close\r\n"
"\r\n",
type_name,
ref_hex,
space_line,
content_type,
body_len);
free(ref_hex);
fd = amduat_fed_transport_unix_connect(transport->socket_path);
if (fd < 0) {
return false;
}
if (!amduat_fed_transport_unix_send_all(fd, req, strlen(req)) ||
(body_len != 0u &&
!amduat_fed_transport_unix_send_all(fd, body, body_len))) {
close(fd);
return false;
}
if (!amduat_fed_transport_unix_read_all(fd, &buf, &buf_len)) {
close(fd);
return false;
}
close(fd);
if (!amduat_fed_transport_unix_split_response(buf,
buf_len,
&resp_body,
&resp_len,
&status)) {
free(buf);
return false;
}
*out_status = status;
if (out_body != NULL && resp_body != NULL) {
body_copy = (char *)malloc(resp_len + 1u);
if (body_copy == NULL) {
free(buf);
return false;
}
if (resp_len != 0u) {
memcpy(body_copy, resp_body, resp_len);
}
body_copy[resp_len] = '\0';
*out_body = body_copy;
}
free(buf);
return true;
}

View file

@ -3,6 +3,7 @@
#include "federation/coord.h" #include "federation/coord.h"
#include "amduat/asl/asl_pointer_fs.h" #include "amduat/asl/asl_pointer_fs.h"
#include "amduat/fed/replay.h"
#ifdef __cplusplus #ifdef __cplusplus
extern "C" { extern "C" {
@ -46,6 +47,15 @@ bool amduat_fed_transport_unix_get_artifact_with_status(
amduat_octets_t *out_bytes, amduat_octets_t *out_bytes,
char **out_body); char **out_body);
/* Returns true on successful HTTP exchange. Caller frees out_body with free. */
bool amduat_fed_transport_unix_post_ingest(
amduat_fed_transport_unix_t *transport,
amduat_fed_record_type_t record_type,
amduat_reference_t ref,
amduat_octets_t bytes,
int *out_status,
char **out_body);
#ifdef __cplusplus #ifdef __cplusplus
} /* extern "C" */ } /* extern "C" */
#endif #endif

View file

@ -123,6 +123,24 @@ http_post() {
fi fi
} }
http_post_allow() {
local sock="$1"
local path="$2"
local data="$3"
shift 3
if [[ "${USE_HTTP_HELPER}" -eq 1 ]]; then
"${HTTP_HELPER}" --sock "${sock}" --method POST --path "${path}" \
--data "${data}" --allow-status \
"$@"
else
curl --silent --show-error \
--unix-socket "${sock}" \
"$@" \
--data-binary "${data}" \
"http://localhost${path}"
fi
}
wait_for_ready() { wait_for_ready() {
local sock="$1" local sock="$1"
local pid="$2" local pid="$2"
@ -195,6 +213,11 @@ plan_resp="$(
fi fi
exit 1 exit 1
} }
if ! echo "${plan_resp}" | grep -q "\"record_count\":"; then
echo "pull plan malformed" >&2
echo "plan response: ${plan_resp}" >&2
exit 1
fi
if echo "${plan_resp}" | grep -q "\"record_count\":0"; then if echo "${plan_resp}" | grep -q "\"record_count\":0"; then
echo "pull plan empty" >&2 echo "pull plan empty" >&2
echo "plan response: ${plan_resp}" >&2 echo "plan response: ${plan_resp}" >&2
@ -253,4 +276,113 @@ if [[ "${payload_b}" != "${payload}" ]]; then
exit 1 exit 1
fi fi
payload_push="fed-smoke-push"
artifact_resp_push="$(
http_post "${sock_b}" "/v1/artifacts" "${payload_push}" \
--header "Content-Type: application/octet-stream" \
--header "X-Amduat-Space: ${space_id}"
)" || {
echo "artifact POST failed on B" >&2
if [[ -f "${log_b}" ]]; then
cat "${log_b}" >&2
fi
exit 1
}
ref_push="$(
printf '%s' "${artifact_resp_push}" \
| tr -d '\r\n' \
| awk 'match($0, /"ref":"[^"]+"/) {print substr($0, RSTART+7, RLENGTH-8)}'
)"
if [[ -z "${ref_push}" ]]; then
echo "failed to parse ref from daemon B" >&2
echo "artifact response: ${artifact_resp_push}" >&2
if [[ -f "${log_b}" ]]; then
cat "${log_b}" >&2
fi
exit 1
fi
push_plan_resp="$(
http_get_allow "${sock_b}" "/v1/fed/push/plan?peer=1&limit=8" \
--header "X-Amduat-Space: ${space_id}"
)" || {
echo "push plan failed" >&2
if [[ -f "${log_b}" ]]; then
cat "${log_b}" >&2
fi
exit 1
}
if ! echo "${push_plan_resp}" | grep -q "\"record_count\":"; then
echo "push plan malformed (missing endpoint?)" >&2
echo "push plan response: ${push_plan_resp}" >&2
exit 1
fi
if echo "${push_plan_resp}" | grep -q "\"record_count\":0"; then
echo "push plan empty" >&2
echo "push plan response: ${push_plan_resp}" >&2
exit 1
fi
push_cursor_before="$(
printf '%s' "${push_plan_resp}" \
| tr -d '\r\n' \
| awk 'match($0, /"cursor":\{[^}]*\}/) {seg=substr($0, RSTART, RLENGTH); if (match(seg, /"last_logseq":[0-9]+/)) {print substr(seg, RSTART+14, RLENGTH-14)}}'
)"
push_resp="$(
http_post_allow "${sock_b}" "/v1/fed/push?peer=1&limit=8" "" \
--header "X-Amduat-Space: ${space_id}"
)" || {
echo "push apply failed" >&2
if [[ -f "${log_b}" ]]; then
cat "${log_b}" >&2
fi
exit 1
}
if ! echo "${push_resp}" | grep -q "\"advanced\":true"; then
echo "push did not advance cursor" >&2
echo "push response: ${push_resp}" >&2
exit 1
fi
payload_a="$(
http_get "${sock_a}" "/v1/artifacts/${ref_push}" \
--header "X-Amduat-Space: ${space_id}"
)" || {
echo "artifact fetch failed on A" >&2
if [[ -f "${log_a}" ]]; then
cat "${log_a}" >&2
fi
exit 1
}
if [[ "${payload_a}" != "${payload_push}" ]]; then
echo "payload mismatch after push" >&2
exit 1
fi
push_plan_after="$(
http_get_allow "${sock_b}" "/v1/fed/push/plan?peer=1&limit=1" \
--header "X-Amduat-Space: ${space_id}"
)" || {
echo "push plan after failed" >&2
if [[ -f "${log_b}" ]]; then
cat "${log_b}" >&2
fi
exit 1
}
push_cursor_after="$(
printf '%s' "${push_plan_after}" \
| tr -d '\r\n' \
| awk 'match($0, /"cursor":\{[^}]*\}/) {seg=substr($0, RSTART, RLENGTH); if (match(seg, /"last_logseq":[0-9]+/)) {print substr(seg, RSTART+14, RLENGTH-14)}}'
)"
if [[ -n "${push_cursor_before}" && -n "${push_cursor_after}" ]]; then
if [[ "${push_cursor_after}" -lt "${push_cursor_before}" ]]; then
echo "push cursor did not advance" >&2
echo "cursor before: ${push_cursor_before}" >&2
echo "cursor after: ${push_cursor_after}" >&2
exit 1
fi
fi
echo "fed smoke ok" echo "fed smoke ok"

View file

@ -43,6 +43,8 @@
#include "amduatd_fed.h" #include "amduatd_fed.h"
#include "amduatd_fed_cursor.h" #include "amduatd_fed_cursor.h"
#include "amduatd_fed_pull_plan.h" #include "amduatd_fed_pull_plan.h"
#include "amduatd_fed_push_plan.h"
#include "amduatd_fed_push_apply.h"
#include "amduatd_fed_pull_apply.h" #include "amduatd_fed_pull_apply.h"
#include "amduatd_store.h" #include "amduatd_store.h"
#include "amduatd_derivation_index.h" #include "amduatd_derivation_index.h"
@ -68,11 +70,6 @@
#include <sys/un.h> #include <sys/un.h>
#include <unistd.h> #include <unistd.h>
enum {
AMDUATD_FED_LOG_KIND_ARTIFACT = 1u,
AMDUATD_FED_LOG_KIND_TOMBSTONE = 2u
};
typedef struct amduatd_strbuf { typedef struct amduatd_strbuf {
char *data; char *data;
size_t len; size_t len;
@ -131,7 +128,9 @@ static const char k_amduatd_contract_v1_json[] =
"{\"method\":\"GET\",\"path\":\"/v1/fed/cursor\"}," "{\"method\":\"GET\",\"path\":\"/v1/fed/cursor\"},"
"{\"method\":\"POST\",\"path\":\"/v1/fed/cursor\"}," "{\"method\":\"POST\",\"path\":\"/v1/fed/cursor\"},"
"{\"method\":\"GET\",\"path\":\"/v1/fed/pull/plan\"}," "{\"method\":\"GET\",\"path\":\"/v1/fed/pull/plan\"},"
"{\"method\":\"GET\",\"path\":\"/v1/fed/push/plan\"},"
"{\"method\":\"POST\",\"path\":\"/v1/fed/pull\"}," "{\"method\":\"POST\",\"path\":\"/v1/fed/pull\"},"
"{\"method\":\"POST\",\"path\":\"/v1/fed/push\"},"
"{\"method\":\"POST\",\"path\":\"/v1/fed/pull\"}," "{\"method\":\"POST\",\"path\":\"/v1/fed/pull\"},"
"{\"method\":\"POST\",\"path\":\"/v1/fed/ingest\"}," "{\"method\":\"POST\",\"path\":\"/v1/fed/ingest\"},"
"{\"method\":\"GET\",\"path\":\"/v1/fed/artifacts/{ref}\"}," "{\"method\":\"GET\",\"path\":\"/v1/fed/artifacts/{ref}\"},"
@ -2890,6 +2889,133 @@ static bool amduatd_handle_get_fed_pull_plan(int fd,
} }
} }
static bool amduatd_handle_get_fed_push_plan(int fd,
amduat_asl_store_t *store,
const amduatd_fed_cfg_t *fed_cfg,
const amduatd_caps_t *caps,
const amduatd_cfg_t *dcfg,
const amduatd_http_req_t *req,
const char *root_path) {
char peer_buf[AMDUAT_ASL_POINTER_NAME_MAX + 1u];
char limit_buf[32];
uint64_t limit = 128u;
uint32_t domain_id = 0u;
amduatd_fed_push_plan_status_t plan_status;
amduat_asl_pointer_store_t pointer_store;
amduat_asl_index_state_t state;
amduatd_fed_push_plan_scan_t scan;
char *json = NULL;
amduatd_fed_push_plan_input_t input;
if (store == NULL || fed_cfg == NULL || req == NULL || root_path == NULL ||
dcfg == NULL) {
return amduatd_send_json_error(fd, 500, "Internal Server Error",
"internal error");
}
if (!amduatd_fed_require_space(fd, fed_cfg, req)) {
return false;
}
plan_status = amduatd_fed_push_plan_check(fed_cfg, store);
if (plan_status == AMDUATD_FED_PUSH_PLAN_ERR_DISABLED) {
return amduatd_send_json_error(fd, 503, "Service Unavailable",
"federation disabled");
}
if (plan_status == AMDUATD_FED_PUSH_PLAN_ERR_UNSUPPORTED) {
return amduatd_send_json_error(fd, 501, "Not Implemented",
"requires index backend");
}
if (plan_status != AMDUATD_FED_PUSH_PLAN_OK) {
return amduatd_send_json_error(fd, 500, "Internal Server Error",
"plan unavailable");
}
if (caps != NULL && caps->enabled && req->x_capability[0] != '\0') {
const char *reason = NULL;
if (!amduatd_caps_check_space(caps, dcfg, req, &reason)) {
if (reason != NULL && strcmp(reason, "wrong-space") == 0) {
return amduatd_send_json_error(fd, 403, "Forbidden",
"space not permitted by capability");
}
return amduatd_send_json_error(fd, 403, "Forbidden",
"invalid capability");
}
}
if (amduatd_query_param(req->path, "peer",
peer_buf, sizeof(peer_buf)) == NULL ||
peer_buf[0] == '\0') {
return amduatd_send_json_error(fd, 400, "Bad Request",
"missing peer");
}
{
amduat_octets_t scoped = amduat_octets(NULL, 0u);
if (!amduatd_fed_push_cursor_pointer_name(req->effective_space,
peer_buf,
&scoped)) {
return amduatd_send_json_error(fd, 400, "Bad Request", "invalid peer");
}
amduat_octets_free(&scoped);
}
if (!amduatd_parse_u32_str(peer_buf, &domain_id)) {
return amduatd_send_json_error(fd, 400, "Bad Request", "invalid peer");
}
if (amduatd_query_param(req->path, "limit",
limit_buf, sizeof(limit_buf)) != NULL) {
if (!amduatd_parse_u64_str(limit_buf, &limit) || limit == 0u ||
limit > 2048u) {
return amduatd_send_json_error(fd, 400, "Bad Request", "invalid limit");
}
}
if (!amduat_asl_pointer_store_init(&pointer_store, root_path)) {
return amduatd_send_json_error(fd, 500, "Internal Server Error",
"pointer store error");
}
if (!amduat_asl_index_current_state(store, &state)) {
return amduatd_send_json_error(fd, 500, "Internal Server Error",
"store error");
}
(void)state;
plan_status = amduatd_fed_push_plan_scan(store,
&pointer_store,
req->effective_space,
peer_buf,
limit,
root_path,
&scan);
if (plan_status != AMDUATD_FED_PUSH_PLAN_OK) {
amduatd_fed_push_plan_scan_free(&scan);
return amduatd_send_json_error(fd, 500, "Internal Server Error",
"plan scan failed");
}
memset(&input, 0, sizeof(input));
input.peer_key = peer_buf;
input.domain_id = domain_id;
input.effective_space = req->effective_space;
input.cursor_present = scan.cursor_present;
input.cursor = scan.cursor_present ? &scan.cursor : NULL;
input.cursor_ref = scan.cursor_present ? &scan.cursor_ref : NULL;
input.records = scan.records;
input.record_count = scan.record_count;
plan_status = amduatd_fed_push_plan_json(&input, &json);
if (plan_status != AMDUATD_FED_PUSH_PLAN_OK || json == NULL) {
amduatd_fed_push_plan_scan_free(&scan);
return amduatd_send_json_error(fd, 500, "Internal Server Error",
"plan encode failed");
}
{
bool ok = amduatd_http_send_json(fd, 200, "OK", json, false);
free(json);
amduatd_fed_push_plan_scan_free(&scan);
return ok;
}
}
static bool amduatd_handle_post_artifacts(int fd, static bool amduatd_handle_post_artifacts(int fd,
amduat_asl_store_t *store, amduat_asl_store_t *store,
const amduatd_http_req_t *req, const amduatd_http_req_t *req,
@ -4359,6 +4485,22 @@ static bool amduatd_fed_pull_unix_get_artifact(void *ctx,
out_body); out_body);
} }
static bool amduatd_fed_push_unix_post_ingest(
void *ctx,
amduat_fed_record_type_t record_type,
amduat_reference_t ref,
amduat_octets_t bytes,
int *out_status,
char **out_body) {
return amduat_fed_transport_unix_post_ingest(
(amduat_fed_transport_unix_t *)ctx,
record_type,
ref,
bytes,
out_status,
out_body);
}
static bool amduatd_handle_post_fed_pull(int fd, static bool amduatd_handle_post_fed_pull(int fd,
amduat_asl_store_t *store, amduat_asl_store_t *store,
const amduatd_fed_cfg_t *fed_cfg, const amduatd_fed_cfg_t *fed_cfg,
@ -4730,6 +4872,432 @@ fed_pull_cleanup:
} }
return true; return true;
} }
static bool amduatd_handle_post_fed_push(int fd,
amduat_asl_store_t *store,
const amduatd_fed_cfg_t *fed_cfg,
const amduatd_caps_t *caps,
const amduatd_cfg_t *dcfg,
const amduatd_http_req_t *req,
const char *root_path) {
char peer_buf[AMDUAT_ASL_POINTER_NAME_MAX + 1u];
char limit_buf[32];
uint64_t limit = 128u;
uint32_t domain_id = 0u;
amduat_asl_pointer_store_t pointer_store;
amduat_fed_transport_unix_t transport;
amduatd_fed_push_transport_t push_transport;
amduatd_fed_push_apply_report_t report;
amduatd_fed_push_apply_status_t status;
amduatd_strbuf_t b;
char *cursor_ref_hex = NULL;
char *cursor_after_hex = NULL;
char *candidate_ref_hex = NULL;
bool ok = false;
int http_status = 200;
const char *http_reason = "OK";
if (store == NULL || fed_cfg == NULL || req == NULL || root_path == NULL ||
dcfg == NULL) {
return amduatd_send_json_error(fd, 500, "Internal Server Error",
"internal error");
}
if (!amduatd_fed_require_space(fd, fed_cfg, req)) {
return false;
}
{
amduatd_fed_push_plan_status_t check =
amduatd_fed_push_plan_check(fed_cfg, store);
if (check == AMDUATD_FED_PUSH_PLAN_ERR_DISABLED) {
return amduatd_send_json_error(fd, 503, "Service Unavailable",
"federation disabled");
}
if (check == AMDUATD_FED_PUSH_PLAN_ERR_UNSUPPORTED) {
return amduatd_send_json_error(fd, 501, "Not Implemented",
"requires index backend");
}
}
if (caps != NULL && caps->enabled && req->x_capability[0] != '\0') {
const char *reason = NULL;
if (!amduatd_caps_check_space(caps, dcfg, req, &reason)) {
if (reason != NULL && strcmp(reason, "wrong-space") == 0) {
return amduatd_send_json_error(fd, 403, "Forbidden",
"space not permitted by capability");
}
return amduatd_send_json_error(fd, 403, "Forbidden",
"invalid capability");
}
}
if (amduatd_query_param(req->path, "peer",
peer_buf, sizeof(peer_buf)) == NULL ||
peer_buf[0] == '\0') {
return amduatd_send_json_error(fd, 400, "Bad Request",
"missing peer");
}
{
amduat_octets_t scoped = amduat_octets(NULL, 0u);
if (!amduatd_fed_push_cursor_pointer_name(req->effective_space,
peer_buf,
&scoped)) {
return amduatd_send_json_error(fd, 400, "Bad Request", "invalid peer");
}
amduat_octets_free(&scoped);
}
if (!amduatd_parse_u32_str(peer_buf, &domain_id)) {
return amduatd_send_json_error(fd, 400, "Bad Request", "invalid peer");
}
if (amduatd_query_param(req->path, "limit",
limit_buf, sizeof(limit_buf)) != NULL) {
if (!amduatd_parse_u64_str(limit_buf, &limit) || limit == 0u ||
limit > 2048u) {
return amduatd_send_json_error(fd, 400, "Bad Request", "invalid limit");
}
}
if (!amduat_asl_pointer_store_init(&pointer_store, root_path)) {
return amduatd_send_json_error(fd, 500, "Internal Server Error",
"pointer store error");
}
if (fed_cfg->transport_kind != AMDUATD_FED_TRANSPORT_UNIX ||
!fed_cfg->unix_socket_set) {
return amduatd_send_json_error(fd, 501, "Not Implemented",
"federation transport unavailable");
}
if (!amduat_fed_transport_unix_init(&transport, fed_cfg->unix_socket_path)) {
return amduatd_send_json_error(fd, 500, "Internal Server Error",
"transport init failed");
}
if (req->effective_space != NULL && req->effective_space->enabled &&
req->effective_space->space_id.data != NULL) {
(void)amduat_fed_transport_unix_set_space(
&transport, (const char *)req->effective_space->space_id.data);
}
memset(&push_transport, 0, sizeof(push_transport));
push_transport.ctx = &transport;
push_transport.post_ingest = amduatd_fed_push_unix_post_ingest;
amduatd_fed_push_apply_report_init(&report);
status = amduatd_fed_push_apply(store,
&pointer_store,
req->effective_space,
peer_buf,
limit,
root_path,
fed_cfg,
&push_transport,
&report);
if (status == AMDUATD_FED_PUSH_APPLY_ERR_DISABLED) {
http_status = 503;
http_reason = "Service Unavailable";
} else if (status == AMDUATD_FED_PUSH_APPLY_ERR_UNSUPPORTED) {
http_status = 501;
http_reason = "Not Implemented";
} else if (status == AMDUATD_FED_PUSH_APPLY_ERR_INVALID) {
http_status = 400;
http_reason = "Bad Request";
} else if (status == AMDUATD_FED_PUSH_APPLY_ERR_REMOTE) {
http_status = 502;
http_reason = "Bad Gateway";
} else if (status == AMDUATD_FED_PUSH_APPLY_ERR_CONFLICT) {
http_status = 409;
http_reason = "Conflict";
} else if (status != AMDUATD_FED_PUSH_APPLY_OK) {
http_status = 500;
http_reason = "Internal Server Error";
}
if (report.cursor_ref_set) {
(void)amduat_asl_ref_encode_hex(report.cursor_ref, &cursor_ref_hex);
}
if (report.cursor_after_ref_set) {
(void)amduat_asl_ref_encode_hex(report.cursor_after_ref,
&cursor_after_hex);
}
if (report.plan_candidate.has_ref) {
(void)amduat_asl_ref_encode_hex(report.plan_candidate.ref,
&candidate_ref_hex);
}
memset(&b, 0, sizeof(b));
if (!amduatd_strbuf_append_cstr(&b, "{")) {
goto fed_push_cleanup;
}
if (!amduatd_strbuf_append_cstr(&b, "\"peer\":\"") ||
!amduatd_strbuf_append_cstr(&b, peer_buf) ||
!amduatd_strbuf_append_cstr(&b, "\",")) {
goto fed_push_cleanup;
}
{
char tmp[64];
snprintf(tmp, sizeof(tmp), "%u", (unsigned int)domain_id);
if (!amduatd_strbuf_append_cstr(&b, "\"domain_id\":") ||
!amduatd_strbuf_append_cstr(&b, tmp) ||
!amduatd_strbuf_append_cstr(&b, ",")) {
goto fed_push_cleanup;
}
}
if (!amduatd_strbuf_append_cstr(&b, "\"effective_space\":{")) {
goto fed_push_cleanup;
}
if (req->effective_space != NULL && req->effective_space->enabled &&
req->effective_space->space_id.data != NULL) {
const char *space_id = (const char *)req->effective_space->space_id.data;
if (!amduatd_strbuf_append_cstr(&b, "\"mode\":\"scoped\",") ||
!amduatd_strbuf_append_cstr(&b, "\"space_id\":\"") ||
!amduatd_strbuf_append_cstr(&b, space_id) ||
!amduatd_strbuf_append_cstr(&b, "\"")) {
goto fed_push_cleanup;
}
} else {
if (!amduatd_strbuf_append_cstr(&b, "\"mode\":\"unscoped\",") ||
!amduatd_strbuf_append_cstr(&b, "\"space_id\":null")) {
goto fed_push_cleanup;
}
}
if (!amduatd_strbuf_append_cstr(&b, "},")) {
goto fed_push_cleanup;
}
{
char tmp[64];
snprintf(tmp, sizeof(tmp), "%llu", (unsigned long long)limit);
if (!amduatd_strbuf_append_cstr(&b, "\"limit\":") ||
!amduatd_strbuf_append_cstr(&b, tmp) ||
!amduatd_strbuf_append_cstr(&b, ",")) {
goto fed_push_cleanup;
}
}
if (!amduatd_strbuf_append_cstr(&b, "\"cursor_before\":{")) {
goto fed_push_cleanup;
}
if (!amduatd_strbuf_append_cstr(&b, "\"present\":") ||
!amduatd_strbuf_append_cstr(&b,
report.cursor_present ? "true" : "false")) {
goto fed_push_cleanup;
}
if (!amduatd_strbuf_append_cstr(&b, ",\"last_logseq\":")) {
goto fed_push_cleanup;
}
if (report.cursor_present && report.cursor_has_logseq) {
char tmp[64];
snprintf(tmp, sizeof(tmp), "%llu",
(unsigned long long)report.cursor_logseq);
if (!amduatd_strbuf_append_cstr(&b, tmp)) {
goto fed_push_cleanup;
}
} else {
if (!amduatd_strbuf_append_cstr(&b, "null")) {
goto fed_push_cleanup;
}
}
if (!amduatd_strbuf_append_cstr(&b, ",\"ref\":")) {
goto fed_push_cleanup;
}
if (cursor_ref_hex != NULL) {
if (!amduatd_strbuf_append_cstr(&b, "\"") ||
!amduatd_strbuf_append_cstr(&b, cursor_ref_hex) ||
!amduatd_strbuf_append_cstr(&b, "\"")) {
goto fed_push_cleanup;
}
} else {
if (!amduatd_strbuf_append_cstr(&b, "null")) {
goto fed_push_cleanup;
}
}
if (!amduatd_strbuf_append_cstr(&b, "},")) {
goto fed_push_cleanup;
}
if (!amduatd_strbuf_append_cstr(&b, "\"plan_summary\":{")) {
goto fed_push_cleanup;
}
{
char tmp[64];
snprintf(tmp, sizeof(tmp), "%zu", report.plan_record_count);
if (!amduatd_strbuf_append_cstr(&b, "\"record_count\":") ||
!amduatd_strbuf_append_cstr(&b, tmp) ||
!amduatd_strbuf_append_cstr(&b, ",\"next_cursor_candidate\":{")) {
goto fed_push_cleanup;
}
}
if (!amduatd_strbuf_append_cstr(&b, "\"last_logseq\":")) {
goto fed_push_cleanup;
}
if (report.plan_candidate.has_logseq) {
char tmp[64];
snprintf(tmp, sizeof(tmp), "%llu",
(unsigned long long)report.plan_candidate.logseq);
if (!amduatd_strbuf_append_cstr(&b, tmp)) {
goto fed_push_cleanup;
}
} else {
if (!amduatd_strbuf_append_cstr(&b, "null")) {
goto fed_push_cleanup;
}
}
if (!amduatd_strbuf_append_cstr(&b, ",\"ref\":")) {
goto fed_push_cleanup;
}
if (candidate_ref_hex != NULL) {
if (!amduatd_strbuf_append_cstr(&b, "\"") ||
!amduatd_strbuf_append_cstr(&b, candidate_ref_hex) ||
!amduatd_strbuf_append_cstr(&b, "\"")) {
goto fed_push_cleanup;
}
} else {
if (!amduatd_strbuf_append_cstr(&b, "null")) {
goto fed_push_cleanup;
}
}
if (!amduatd_strbuf_append_cstr(&b, "}},")) {
goto fed_push_cleanup;
}
if (!amduatd_strbuf_append_cstr(&b, "\"sent\":{")) {
goto fed_push_cleanup;
}
{
char tmp[64];
snprintf(tmp, sizeof(tmp), "%zu", report.sent_record_count);
if (!amduatd_strbuf_append_cstr(&b, "\"record_count\":") ||
!amduatd_strbuf_append_cstr(&b, tmp) ||
!amduatd_strbuf_append_cstr(&b, ",")) {
goto fed_push_cleanup;
}
snprintf(tmp, sizeof(tmp), "%llu",
(unsigned long long)report.sent_bytes_total);
if (!amduatd_strbuf_append_cstr(&b, "\"bytes_total\":") ||
!amduatd_strbuf_append_cstr(&b, tmp) ||
!amduatd_strbuf_append_cstr(&b, ",\"by_type\":{")) {
goto fed_push_cleanup;
}
snprintf(tmp, sizeof(tmp), "%zu", report.sent_artifact_count);
if (!amduatd_strbuf_append_cstr(&b, "\"artifact\":") ||
!amduatd_strbuf_append_cstr(&b, tmp) ||
!amduatd_strbuf_append_cstr(&b, ",")) {
goto fed_push_cleanup;
}
snprintf(tmp, sizeof(tmp), "%zu", report.sent_per_count);
if (!amduatd_strbuf_append_cstr(&b, "\"per\":") ||
!amduatd_strbuf_append_cstr(&b, tmp) ||
!amduatd_strbuf_append_cstr(&b, ",")) {
goto fed_push_cleanup;
}
snprintf(tmp, sizeof(tmp), "%zu", report.sent_tgk_edge_count);
if (!amduatd_strbuf_append_cstr(&b, "\"tgk_edge\":") ||
!amduatd_strbuf_append_cstr(&b, tmp) ||
!amduatd_strbuf_append_cstr(&b, ",")) {
goto fed_push_cleanup;
}
snprintf(tmp, sizeof(tmp), "%zu", report.sent_tombstone_count);
if (!amduatd_strbuf_append_cstr(&b, "\"tombstone\":") ||
!amduatd_strbuf_append_cstr(&b, tmp) ||
!amduatd_strbuf_append_cstr(&b, "},")) {
goto fed_push_cleanup;
}
snprintf(tmp, sizeof(tmp), "%zu", report.peer_ok_count);
if (!amduatd_strbuf_append_cstr(&b, "\"peer_ok_count\":") ||
!amduatd_strbuf_append_cstr(&b, tmp) ||
!amduatd_strbuf_append_cstr(&b, ",")) {
goto fed_push_cleanup;
}
snprintf(tmp, sizeof(tmp), "%zu", report.peer_already_present_count);
if (!amduatd_strbuf_append_cstr(&b, "\"peer_already_present_count\":") ||
!amduatd_strbuf_append_cstr(&b, tmp) ||
!amduatd_strbuf_append_cstr(&b, "},")) {
goto fed_push_cleanup;
}
}
if (!amduatd_strbuf_append_cstr(&b, "\"cursor_after\":{")) {
goto fed_push_cleanup;
}
if (!amduatd_strbuf_append_cstr(&b, "\"advanced\":") ||
!amduatd_strbuf_append_cstr(&b,
report.cursor_advanced ? "true" : "false")) {
goto fed_push_cleanup;
}
if (!amduatd_strbuf_append_cstr(&b, ",\"last_logseq\":")) {
goto fed_push_cleanup;
}
if (report.cursor_advanced && report.cursor_after_has_logseq) {
char tmp[64];
snprintf(tmp, sizeof(tmp), "%llu",
(unsigned long long)report.cursor_after_logseq);
if (!amduatd_strbuf_append_cstr(&b, tmp)) {
goto fed_push_cleanup;
}
} else {
if (!amduatd_strbuf_append_cstr(&b, "null")) {
goto fed_push_cleanup;
}
}
if (!amduatd_strbuf_append_cstr(&b, ",\"ref\":")) {
goto fed_push_cleanup;
}
if (report.cursor_advanced && cursor_after_hex != NULL) {
if (!amduatd_strbuf_append_cstr(&b, "\"") ||
!amduatd_strbuf_append_cstr(&b, cursor_after_hex) ||
!amduatd_strbuf_append_cstr(&b, "\"")) {
goto fed_push_cleanup;
}
} else {
if (!amduatd_strbuf_append_cstr(&b, "null")) {
goto fed_push_cleanup;
}
}
if (!amduatd_strbuf_append_cstr(&b, "},")) {
goto fed_push_cleanup;
}
if (!amduatd_strbuf_append_cstr(&b, "\"errors\":[")) {
goto fed_push_cleanup;
}
if (status == AMDUATD_FED_PUSH_APPLY_OK) {
if (!amduatd_strbuf_append_cstr(&b, "]}\n")) {
goto fed_push_cleanup;
}
} else {
char tmp[64];
if (!amduatd_strbuf_append_cstr(&b, "{\"message\":\"") ||
!amduatd_strbuf_append_cstr(&b,
report.error[0] != '\0'
? report.error
: "error") ||
!amduatd_strbuf_append_cstr(&b, "\"")) {
goto fed_push_cleanup;
}
if (report.remote_status != 0) {
snprintf(tmp, sizeof(tmp), "%d", report.remote_status);
if (!amduatd_strbuf_append_cstr(&b, ",\"remote_status\":") ||
!amduatd_strbuf_append_cstr(&b, tmp)) {
goto fed_push_cleanup;
}
}
if (!amduatd_strbuf_append_cstr(&b, "}]}\n")) {
goto fed_push_cleanup;
}
}
ok = amduatd_http_send_json(fd, http_status, http_reason, b.data, false);
fed_push_cleanup:
amduatd_strbuf_free(&b);
free(cursor_ref_hex);
free(cursor_after_hex);
free(candidate_ref_hex);
amduatd_fed_push_apply_report_free(&report);
if (!ok) {
return amduatd_send_json_error(fd, 500, "Internal Server Error",
"encode error");
}
return true;
}
static bool amduatd_handle_post_pel_programs(int fd, static bool amduatd_handle_post_pel_programs(int fd,
amduat_asl_store_t *store, amduat_asl_store_t *store,
const amduatd_http_req_t *req) { const amduatd_http_req_t *req) {
@ -6078,6 +6646,17 @@ static bool amduatd_handle_conn(int fd,
root_path); root_path);
goto conn_cleanup; goto conn_cleanup;
} }
if (strcmp(req.method, "GET") == 0 &&
strcmp(no_query, "/v1/fed/push/plan") == 0) {
ok = amduatd_handle_get_fed_push_plan(fd,
store,
fed_cfg,
caps,
effective_cfg,
&req,
root_path);
goto conn_cleanup;
}
if (strcmp(req.method, "GET") == 0 && if (strcmp(req.method, "GET") == 0 &&
strcmp(no_query, "/v1/fed/status") == 0) { strcmp(no_query, "/v1/fed/status") == 0) {
ok = amduatd_handle_get_fed_status(fd, coord, fed_cfg, &req); ok = amduatd_handle_get_fed_status(fd, coord, fed_cfg, &req);
@ -6099,6 +6678,17 @@ static bool amduatd_handle_conn(int fd,
root_path); root_path);
goto conn_cleanup; goto conn_cleanup;
} }
if (strcmp(req.method, "POST") == 0 &&
strcmp(no_query, "/v1/fed/push") == 0) {
ok = amduatd_handle_post_fed_push(fd,
store,
fed_cfg,
caps,
effective_cfg,
&req,
root_path);
goto conn_cleanup;
}
if (strcmp(req.method, "POST") == 0 && if (strcmp(req.method, "POST") == 0 &&
strcmp(no_query, "/v1/fed/ingest") == 0) { strcmp(no_query, "/v1/fed/ingest") == 0) {
ok = amduatd_handle_post_fed_ingest(fd, ok = amduatd_handle_post_fed_ingest(fd,

View file

@ -19,6 +19,11 @@ typedef enum {
AMDUATD_FED_TRANSPORT_UNIX = 1 AMDUATD_FED_TRANSPORT_UNIX = 1
} amduatd_fed_transport_kind_t; } amduatd_fed_transport_kind_t;
enum {
AMDUATD_FED_LOG_KIND_ARTIFACT = 1u,
AMDUATD_FED_LOG_KIND_TOMBSTONE = 2u
};
typedef struct { typedef struct {
bool enabled; bool enabled;
bool require_space; bool require_space;

View file

@ -110,6 +110,45 @@ static bool amduatd_fed_cursor_peer_key_is_valid(const char *peer_key) {
return amduat_asl_pointer_name_is_valid(peer_key); return amduat_asl_pointer_name_is_valid(peer_key);
} }
static bool amduatd_fed_cursor_pointer_name_with_prefix(
const amduatd_space_t *space,
const char *peer_key,
const char *prefix,
amduat_octets_t *out_name) {
const char suffix[] = "/head";
size_t peer_len;
size_t total_len;
size_t prefix_len;
char *base = NULL;
bool ok;
if (out_name != NULL) {
*out_name = amduat_octets(NULL, 0u);
}
if (out_name == NULL || prefix == NULL ||
!amduatd_fed_cursor_peer_key_is_valid(peer_key)) {
return false;
}
peer_len = strlen(peer_key);
prefix_len = strlen(prefix);
if (peer_len > SIZE_MAX - prefix_len - (sizeof(suffix) - 1u)) {
return false;
}
total_len = prefix_len + peer_len + (sizeof(suffix) - 1u);
base = (char *)malloc(total_len + 1u);
if (base == NULL) {
return false;
}
memcpy(base, prefix, prefix_len);
memcpy(base + prefix_len, peer_key, peer_len);
memcpy(base + prefix_len + peer_len, suffix, sizeof(suffix) - 1u);
base[total_len] = '\0';
ok = amduatd_space_scope_name(space, base, out_name);
free(base);
return ok;
}
static bool amduatd_fed_cursor_record_encode( static bool amduatd_fed_cursor_record_encode(
const amduatd_fed_cursor_record_t *record, const amduatd_fed_cursor_record_t *record,
amduat_octets_t *out_payload) { amduat_octets_t *out_payload) {
@ -358,36 +397,19 @@ void amduatd_fed_cursor_record_free(amduatd_fed_cursor_record_t *record) {
bool amduatd_fed_cursor_pointer_name(const amduatd_space_t *space, bool amduatd_fed_cursor_pointer_name(const amduatd_space_t *space,
const char *peer_key, const char *peer_key,
amduat_octets_t *out_name) { amduat_octets_t *out_name) {
const char prefix[] = "fed/cursor/"; return amduatd_fed_cursor_pointer_name_with_prefix(space,
const char suffix[] = "/head"; peer_key,
size_t peer_len; "fed/cursor/",
size_t total_len; out_name);
char *base = NULL; }
bool ok;
if (out_name != NULL) { bool amduatd_fed_push_cursor_pointer_name(const amduatd_space_t *space,
*out_name = amduat_octets(NULL, 0u); const char *peer_key,
} amduat_octets_t *out_name) {
if (out_name == NULL || !amduatd_fed_cursor_peer_key_is_valid(peer_key)) { return amduatd_fed_cursor_pointer_name_with_prefix(space,
return false; peer_key,
} "fed/push_cursor/",
peer_len = strlen(peer_key); out_name);
if (peer_len > SIZE_MAX - (sizeof(prefix) - 1u) - (sizeof(suffix) - 1u)) {
return false;
}
total_len = (sizeof(prefix) - 1u) + peer_len + (sizeof(suffix) - 1u);
base = (char *)malloc(total_len + 1u);
if (base == NULL) {
return false;
}
memcpy(base, prefix, sizeof(prefix) - 1u);
memcpy(base + sizeof(prefix) - 1u, peer_key, peer_len);
memcpy(base + sizeof(prefix) - 1u + peer_len, suffix, sizeof(suffix) - 1u);
base[total_len] = '\0';
ok = amduatd_space_scope_name(space, base, out_name);
free(base);
return ok;
} }
amduatd_fed_cursor_status_t amduatd_fed_cursor_check_enabled( amduatd_fed_cursor_status_t amduatd_fed_cursor_check_enabled(
@ -401,11 +423,12 @@ amduatd_fed_cursor_status_t amduatd_fed_cursor_check_enabled(
return AMDUATD_FED_CURSOR_OK; return AMDUATD_FED_CURSOR_OK;
} }
amduatd_fed_cursor_status_t amduatd_fed_cursor_get( static amduatd_fed_cursor_status_t amduatd_fed_cursor_get_with_prefix(
amduat_asl_store_t *store, amduat_asl_store_t *store,
amduat_asl_pointer_store_t *pointer_store, amduat_asl_pointer_store_t *pointer_store,
const amduatd_space_t *effective_space, const amduatd_space_t *effective_space,
const char *peer_key, const char *peer_key,
const char *prefix,
amduatd_fed_cursor_record_t *out_cursor, amduatd_fed_cursor_record_t *out_cursor,
amduat_reference_t *out_ref) { amduat_reference_t *out_ref) {
amduat_octets_t pointer_name = amduat_octets(NULL, 0u); amduat_octets_t pointer_name = amduat_octets(NULL, 0u);
@ -423,8 +446,9 @@ amduatd_fed_cursor_status_t amduatd_fed_cursor_get(
return AMDUATD_FED_CURSOR_ERR_INVALID; return AMDUATD_FED_CURSOR_ERR_INVALID;
} }
if (!amduatd_fed_cursor_pointer_name(effective_space, if (!amduatd_fed_cursor_pointer_name_with_prefix(effective_space,
peer_key, peer_key,
prefix,
&pointer_name)) { &pointer_name)) {
return AMDUATD_FED_CURSOR_ERR_INVALID; return AMDUATD_FED_CURSOR_ERR_INVALID;
} }
@ -494,11 +518,44 @@ amduatd_fed_cursor_status_t amduatd_fed_cursor_get(
return AMDUATD_FED_CURSOR_OK; return AMDUATD_FED_CURSOR_OK;
} }
amduatd_fed_cursor_status_t amduatd_fed_cursor_cas_set( amduatd_fed_cursor_status_t amduatd_fed_cursor_get(
amduat_asl_store_t *store, amduat_asl_store_t *store,
amduat_asl_pointer_store_t *pointer_store, amduat_asl_pointer_store_t *pointer_store,
const amduatd_space_t *effective_space, const amduatd_space_t *effective_space,
const char *peer_key, const char *peer_key,
amduatd_fed_cursor_record_t *out_cursor,
amduat_reference_t *out_ref) {
return amduatd_fed_cursor_get_with_prefix(store,
pointer_store,
effective_space,
peer_key,
"fed/cursor/",
out_cursor,
out_ref);
}
amduatd_fed_cursor_status_t amduatd_fed_push_cursor_get(
amduat_asl_store_t *store,
amduat_asl_pointer_store_t *pointer_store,
const amduatd_space_t *effective_space,
const char *peer_key,
amduatd_fed_cursor_record_t *out_cursor,
amduat_reference_t *out_ref) {
return amduatd_fed_cursor_get_with_prefix(store,
pointer_store,
effective_space,
peer_key,
"fed/push_cursor/",
out_cursor,
out_ref);
}
static amduatd_fed_cursor_status_t amduatd_fed_cursor_cas_set_with_prefix(
amduat_asl_store_t *store,
amduat_asl_pointer_store_t *pointer_store,
const amduatd_space_t *effective_space,
const char *peer_key,
const char *prefix,
const amduat_reference_t *expected_ref, const amduat_reference_t *expected_ref,
const amduatd_fed_cursor_record_t *new_cursor, const amduatd_fed_cursor_record_t *new_cursor,
amduat_reference_t *out_new_ref) { amduat_reference_t *out_new_ref) {
@ -534,8 +591,9 @@ amduatd_fed_cursor_status_t amduatd_fed_cursor_cas_set(
return AMDUATD_FED_CURSOR_ERR_INVALID; return AMDUATD_FED_CURSOR_ERR_INVALID;
} }
if (!amduatd_fed_cursor_pointer_name(effective_space, if (!amduatd_fed_cursor_pointer_name_with_prefix(effective_space,
peer_key, peer_key,
prefix,
&pointer_name)) { &pointer_name)) {
return AMDUATD_FED_CURSOR_ERR_INVALID; return AMDUATD_FED_CURSOR_ERR_INVALID;
} }
@ -582,3 +640,39 @@ amduatd_fed_cursor_status_t amduatd_fed_cursor_cas_set(
amduat_reference_free(&record_ref); amduat_reference_free(&record_ref);
return AMDUATD_FED_CURSOR_OK; return AMDUATD_FED_CURSOR_OK;
} }
amduatd_fed_cursor_status_t amduatd_fed_cursor_cas_set(
amduat_asl_store_t *store,
amduat_asl_pointer_store_t *pointer_store,
const amduatd_space_t *effective_space,
const char *peer_key,
const amduat_reference_t *expected_ref,
const amduatd_fed_cursor_record_t *new_cursor,
amduat_reference_t *out_new_ref) {
return amduatd_fed_cursor_cas_set_with_prefix(store,
pointer_store,
effective_space,
peer_key,
"fed/cursor/",
expected_ref,
new_cursor,
out_new_ref);
}
amduatd_fed_cursor_status_t amduatd_fed_push_cursor_cas_set(
amduat_asl_store_t *store,
amduat_asl_pointer_store_t *pointer_store,
const amduatd_space_t *effective_space,
const char *peer_key,
const amduat_reference_t *expected_ref,
const amduatd_fed_cursor_record_t *new_cursor,
amduat_reference_t *out_new_ref) {
return amduatd_fed_cursor_cas_set_with_prefix(store,
pointer_store,
effective_space,
peer_key,
"fed/push_cursor/",
expected_ref,
new_cursor,
out_new_ref);
}

View file

@ -42,6 +42,10 @@ bool amduatd_fed_cursor_pointer_name(const amduatd_space_t *space,
const char *peer_key, const char *peer_key,
amduat_octets_t *out_name); amduat_octets_t *out_name);
bool amduatd_fed_push_cursor_pointer_name(const amduatd_space_t *space,
const char *peer_key,
amduat_octets_t *out_name);
amduatd_fed_cursor_status_t amduatd_fed_cursor_check_enabled( amduatd_fed_cursor_status_t amduatd_fed_cursor_check_enabled(
const amduatd_fed_cfg_t *cfg); const amduatd_fed_cfg_t *cfg);
@ -53,6 +57,14 @@ amduatd_fed_cursor_status_t amduatd_fed_cursor_get(
amduatd_fed_cursor_record_t *out_cursor, amduatd_fed_cursor_record_t *out_cursor,
amduat_reference_t *out_ref); amduat_reference_t *out_ref);
amduatd_fed_cursor_status_t amduatd_fed_push_cursor_get(
amduat_asl_store_t *store,
amduat_asl_pointer_store_t *pointer_store,
const amduatd_space_t *effective_space,
const char *peer_key,
amduatd_fed_cursor_record_t *out_cursor,
amduat_reference_t *out_ref);
amduatd_fed_cursor_status_t amduatd_fed_cursor_cas_set( amduatd_fed_cursor_status_t amduatd_fed_cursor_cas_set(
amduat_asl_store_t *store, amduat_asl_store_t *store,
amduat_asl_pointer_store_t *pointer_store, amduat_asl_pointer_store_t *pointer_store,
@ -62,6 +74,15 @@ amduatd_fed_cursor_status_t amduatd_fed_cursor_cas_set(
const amduatd_fed_cursor_record_t *new_cursor, const amduatd_fed_cursor_record_t *new_cursor,
amduat_reference_t *out_new_ref); amduat_reference_t *out_new_ref);
amduatd_fed_cursor_status_t amduatd_fed_push_cursor_cas_set(
amduat_asl_store_t *store,
amduat_asl_pointer_store_t *pointer_store,
const amduatd_space_t *effective_space,
const char *peer_key,
const amduat_reference_t *expected_ref,
const amduatd_fed_cursor_record_t *new_cursor,
amduat_reference_t *out_new_ref);
#ifdef __cplusplus #ifdef __cplusplus
} /* extern "C" */ } /* extern "C" */
#endif #endif

View file

@ -0,0 +1,342 @@
#include "amduatd_fed_push_apply.h"
#include "amduat/asl/artifact_io.h"
#include "amduat/asl/store.h"
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
static bool amduatd_fed_push_parse_u32(const char *s, uint32_t *out) {
char *end = NULL;
unsigned long val;
if (s == NULL || out == NULL || s[0] == '\0') {
return false;
}
val = strtoul(s, &end, 10);
if (end == s || *end != '\0' || val > UINT32_MAX) {
return false;
}
*out = (uint32_t)val;
return true;
}
static bool amduatd_fed_push_strdup(const char *s, char **out) {
size_t len;
char *buf;
if (out == NULL) {
return false;
}
*out = NULL;
if (s == NULL) {
return false;
}
len = strlen(s);
if (len > SIZE_MAX - 1u) {
return false;
}
buf = (char *)malloc(len + 1u);
if (buf == NULL) {
return false;
}
if (len != 0u) {
memcpy(buf, s, len);
}
buf[len] = '\0';
*out = buf;
return true;
}
static void amduatd_fed_push_report_error(
amduatd_fed_push_apply_report_t *report,
const char *msg) {
if (report == NULL || msg == NULL) {
return;
}
memset(report->error, 0, sizeof(report->error));
strncpy(report->error, msg, sizeof(report->error) - 1u);
}
static bool amduatd_fed_push_body_is_already_present(const char *body) {
if (body == NULL) {
return false;
}
return strstr(body, "\"status\":\"already_present\"") != NULL;
}
void amduatd_fed_push_apply_report_init(
amduatd_fed_push_apply_report_t *report) {
if (report == NULL) {
return;
}
memset(report, 0, sizeof(*report));
report->cursor_ref = amduat_reference(0u, amduat_octets(NULL, 0u));
report->cursor_after_ref = amduat_reference(0u, amduat_octets(NULL, 0u));
amduatd_fed_push_plan_candidate_init(&report->plan_candidate);
}
void amduatd_fed_push_apply_report_free(
amduatd_fed_push_apply_report_t *report) {
if (report == NULL) {
return;
}
if (report->cursor_ref_set) {
amduat_reference_free(&report->cursor_ref);
}
if (report->cursor_after_ref_set) {
amduat_reference_free(&report->cursor_after_ref);
}
amduatd_fed_push_plan_candidate_free(&report->plan_candidate);
memset(report, 0, sizeof(*report));
}
amduatd_fed_push_apply_status_t amduatd_fed_push_apply(
amduat_asl_store_t *store,
amduat_asl_pointer_store_t *pointer_store,
const amduatd_space_t *effective_space,
const char *peer_key,
uint64_t limit,
const char *root_path,
const amduatd_fed_cfg_t *fed_cfg,
const amduatd_fed_push_transport_t *transport,
amduatd_fed_push_apply_report_t *out_report) {
amduatd_fed_push_plan_scan_t scan;
amduatd_fed_push_cursor_candidate_t candidate;
amduatd_fed_cursor_record_t next_cursor;
amduat_reference_t next_ref;
size_t i;
if (out_report == NULL) {
return AMDUATD_FED_PUSH_APPLY_ERR_INVALID;
}
amduatd_fed_push_apply_report_init(out_report);
out_report->peer_key = peer_key;
out_report->effective_space = effective_space;
out_report->limit = limit;
if (store == NULL || pointer_store == NULL || peer_key == NULL ||
fed_cfg == NULL || transport == NULL || root_path == NULL) {
amduatd_fed_push_report_error(out_report, "invalid inputs");
return AMDUATD_FED_PUSH_APPLY_ERR_INVALID;
}
if (!fed_cfg->enabled) {
amduatd_fed_push_report_error(out_report, "federation disabled");
return AMDUATD_FED_PUSH_APPLY_ERR_DISABLED;
}
if (store->ops.log_scan == NULL || store->ops.current_state == NULL) {
amduatd_fed_push_report_error(out_report, "requires index backend");
return AMDUATD_FED_PUSH_APPLY_ERR_UNSUPPORTED;
}
if (transport->post_ingest == NULL) {
amduatd_fed_push_report_error(out_report, "transport unavailable");
return AMDUATD_FED_PUSH_APPLY_ERR_UNSUPPORTED;
}
{
amduat_octets_t scoped = amduat_octets(NULL, 0u);
if (!amduatd_fed_push_cursor_pointer_name(effective_space,
peer_key,
&scoped)) {
amduatd_fed_push_report_error(out_report, "invalid peer");
return AMDUATD_FED_PUSH_APPLY_ERR_INVALID;
}
amduat_octets_free(&scoped);
}
{
uint32_t domain_id = 0u;
if (!amduatd_fed_push_parse_u32(peer_key, &domain_id)) {
amduatd_fed_push_report_error(out_report, "invalid peer");
return AMDUATD_FED_PUSH_APPLY_ERR_INVALID;
}
}
if (amduatd_fed_push_plan_scan(store,
pointer_store,
effective_space,
peer_key,
limit,
root_path,
&scan) != AMDUATD_FED_PUSH_PLAN_OK) {
amduatd_fed_push_plan_scan_free(&scan);
amduatd_fed_push_report_error(out_report, "plan scan failed");
return AMDUATD_FED_PUSH_APPLY_ERR_STORE;
}
out_report->cursor_present = scan.cursor_present;
if (scan.cursor_present) {
out_report->cursor_has_logseq = scan.cursor.has_logseq;
out_report->cursor_logseq = scan.cursor.last_logseq;
if (scan.cursor_ref.digest.data != NULL) {
if (!amduat_reference_clone(scan.cursor_ref, &out_report->cursor_ref)) {
amduatd_fed_push_plan_scan_free(&scan);
amduatd_fed_push_report_error(out_report, "oom");
return AMDUATD_FED_PUSH_APPLY_ERR_OOM;
}
out_report->cursor_ref_set = true;
}
}
if (!amduatd_fed_push_plan_next_cursor_candidate(scan.cursor_present
? &scan.cursor
: NULL,
scan.records,
scan.record_count,
&candidate)) {
amduatd_fed_push_plan_scan_free(&scan);
amduatd_fed_push_report_error(out_report, "oom");
return AMDUATD_FED_PUSH_APPLY_ERR_OOM;
}
out_report->plan_record_count = scan.record_count;
out_report->plan_candidate = candidate;
if (scan.record_count == 0u) {
amduatd_fed_push_plan_scan_free(&scan);
return AMDUATD_FED_PUSH_APPLY_OK;
}
for (i = 0; i < scan.record_count; ++i) {
const amduat_fed_record_t *rec = &scan.records[i];
amduat_octets_t bytes = amduat_octets(NULL, 0u);
amduat_artifact_t artifact;
amduat_asl_store_error_t store_err;
int status = 0;
char *body = NULL;
bool already_present = false;
if (rec->id.type != AMDUAT_FED_REC_TOMBSTONE) {
memset(&artifact, 0, sizeof(artifact));
store_err = amduat_asl_store_get(store, rec->id.ref, &artifact);
if (store_err != AMDUAT_ASL_STORE_OK) {
amduatd_fed_push_plan_scan_free(&scan);
amduatd_fed_push_report_error(out_report, "artifact missing");
return AMDUATD_FED_PUSH_APPLY_ERR_STORE;
}
bytes = artifact.bytes;
out_report->sent_bytes_total += bytes.len;
}
if (!transport->post_ingest(transport->ctx,
rec->id.type,
rec->id.ref,
bytes,
&status,
&body)) {
if (rec->id.type != AMDUAT_FED_REC_TOMBSTONE) {
amduat_asl_artifact_free(&artifact);
}
free(body);
amduatd_fed_push_plan_scan_free(&scan);
amduatd_fed_push_report_error(out_report, "ingest failed");
return AMDUATD_FED_PUSH_APPLY_ERR_REMOTE;
}
if (rec->id.type != AMDUAT_FED_REC_TOMBSTONE) {
amduat_asl_artifact_free(&artifact);
}
out_report->sent_record_count++;
if (rec->id.type == AMDUAT_FED_REC_ARTIFACT) {
out_report->sent_artifact_count++;
} else if (rec->id.type == AMDUAT_FED_REC_PER) {
out_report->sent_per_count++;
} else if (rec->id.type == AMDUAT_FED_REC_TGK_EDGE) {
out_report->sent_tgk_edge_count++;
} else if (rec->id.type == AMDUAT_FED_REC_TOMBSTONE) {
out_report->sent_tombstone_count++;
}
if (status != 200) {
out_report->remote_status = status;
free(body);
amduatd_fed_push_plan_scan_free(&scan);
amduatd_fed_push_report_error(out_report, "peer error");
return AMDUATD_FED_PUSH_APPLY_ERR_REMOTE;
}
already_present = amduatd_fed_push_body_is_already_present(body);
free(body);
if (already_present) {
out_report->peer_already_present_count++;
} else {
out_report->peer_ok_count++;
}
}
amduatd_fed_cursor_record_init(&next_cursor);
if (!amduatd_fed_push_strdup(peer_key, &next_cursor.peer_key)) {
amduatd_fed_cursor_record_free(&next_cursor);
amduatd_fed_push_plan_scan_free(&scan);
amduatd_fed_push_report_error(out_report, "oom");
return AMDUATD_FED_PUSH_APPLY_ERR_OOM;
}
if (effective_space != NULL && effective_space->enabled &&
effective_space->space_id.data != NULL) {
const char *space_id = (const char *)effective_space->space_id.data;
if (!amduatd_fed_push_strdup(space_id, &next_cursor.space_id)) {
amduatd_fed_cursor_record_free(&next_cursor);
amduatd_fed_push_plan_scan_free(&scan);
amduatd_fed_push_report_error(out_report, "oom");
return AMDUATD_FED_PUSH_APPLY_ERR_OOM;
}
} else {
next_cursor.space_id = NULL;
}
if (out_report->plan_candidate.has_logseq) {
next_cursor.has_logseq = true;
next_cursor.last_logseq = out_report->plan_candidate.logseq;
}
if (out_report->plan_candidate.has_ref) {
next_cursor.has_record_ref = true;
if (!amduat_reference_clone(out_report->plan_candidate.ref,
&next_cursor.last_record_ref)) {
amduatd_fed_cursor_record_free(&next_cursor);
amduatd_fed_push_plan_scan_free(&scan);
amduatd_fed_push_report_error(out_report, "oom");
return AMDUATD_FED_PUSH_APPLY_ERR_OOM;
}
}
if (!next_cursor.has_logseq && !next_cursor.has_record_ref) {
amduatd_fed_cursor_record_free(&next_cursor);
amduatd_fed_push_plan_scan_free(&scan);
amduatd_fed_push_report_error(out_report, "invalid cursor");
return AMDUATD_FED_PUSH_APPLY_ERR_INVALID;
}
memset(&next_ref, 0, sizeof(next_ref));
{
amduatd_fed_cursor_status_t st;
st = amduatd_fed_push_cursor_cas_set(store,
pointer_store,
effective_space,
peer_key,
scan.cursor_present
? &scan.cursor_ref
: NULL,
&next_cursor,
&next_ref);
amduatd_fed_cursor_record_free(&next_cursor);
if (st == AMDUATD_FED_CURSOR_ERR_CONFLICT) {
amduatd_fed_push_plan_scan_free(&scan);
amduatd_fed_push_report_error(out_report, "cursor conflict");
return AMDUATD_FED_PUSH_APPLY_ERR_CONFLICT;
}
if (st != AMDUATD_FED_CURSOR_OK) {
amduatd_fed_push_plan_scan_free(&scan);
amduatd_fed_push_report_error(out_report, "cursor update failed");
return AMDUATD_FED_PUSH_APPLY_ERR_STORE;
}
}
out_report->cursor_advanced = true;
if (out_report->plan_candidate.has_logseq) {
out_report->cursor_after_has_logseq = true;
out_report->cursor_after_logseq = out_report->plan_candidate.logseq;
}
if (next_ref.digest.data != NULL) {
out_report->cursor_after_ref_set = true;
out_report->cursor_after_ref = next_ref;
} else {
amduat_reference_free(&next_ref);
}
amduatd_fed_push_plan_scan_free(&scan);
return AMDUATD_FED_PUSH_APPLY_OK;
}

View file

@ -0,0 +1,89 @@
#ifndef AMDUATD_FED_PUSH_APPLY_H
#define AMDUATD_FED_PUSH_APPLY_H
#include "amduat/asl/core.h"
#include "amduat/fed/replay.h"
#include "amduatd_fed.h"
#include "amduatd_fed_cursor.h"
#include "amduatd_fed_push_plan.h"
#include "amduatd_space.h"
#include <stdbool.h>
#include <stddef.h>
#include <stdint.h>
#ifdef __cplusplus
extern "C" {
#endif
typedef struct {
void *ctx;
bool (*post_ingest)(void *ctx,
amduat_fed_record_type_t record_type,
amduat_reference_t ref,
amduat_octets_t bytes,
int *out_status,
char **out_body);
} amduatd_fed_push_transport_t;
typedef enum {
AMDUATD_FED_PUSH_APPLY_OK = 0,
AMDUATD_FED_PUSH_APPLY_ERR_INVALID = 1,
AMDUATD_FED_PUSH_APPLY_ERR_DISABLED = 2,
AMDUATD_FED_PUSH_APPLY_ERR_UNSUPPORTED = 3,
AMDUATD_FED_PUSH_APPLY_ERR_REMOTE = 4,
AMDUATD_FED_PUSH_APPLY_ERR_STORE = 5,
AMDUATD_FED_PUSH_APPLY_ERR_CONFLICT = 6,
AMDUATD_FED_PUSH_APPLY_ERR_OOM = 7
} amduatd_fed_push_apply_status_t;
typedef struct {
const char *peer_key;
const amduatd_space_t *effective_space;
uint64_t limit;
bool cursor_present;
bool cursor_has_logseq;
uint64_t cursor_logseq;
bool cursor_ref_set;
amduat_reference_t cursor_ref;
size_t plan_record_count;
amduatd_fed_push_cursor_candidate_t plan_candidate;
size_t sent_record_count;
uint64_t sent_bytes_total;
size_t sent_artifact_count;
size_t sent_per_count;
size_t sent_tgk_edge_count;
size_t sent_tombstone_count;
size_t peer_ok_count;
size_t peer_already_present_count;
bool cursor_advanced;
bool cursor_after_has_logseq;
uint64_t cursor_after_logseq;
bool cursor_after_ref_set;
amduat_reference_t cursor_after_ref;
int remote_status;
char error[256];
} amduatd_fed_push_apply_report_t;
void amduatd_fed_push_apply_report_init(
amduatd_fed_push_apply_report_t *report);
void amduatd_fed_push_apply_report_free(
amduatd_fed_push_apply_report_t *report);
amduatd_fed_push_apply_status_t amduatd_fed_push_apply(
amduat_asl_store_t *store,
amduat_asl_pointer_store_t *pointer_store,
const amduatd_space_t *effective_space,
const char *peer_key,
uint64_t limit,
const char *root_path,
const amduatd_fed_cfg_t *fed_cfg,
const amduatd_fed_push_transport_t *transport,
amduatd_fed_push_apply_report_t *out_report);
#ifdef __cplusplus
} /* extern "C" */
#endif
#endif /* AMDUATD_FED_PUSH_APPLY_H */

645
src/amduatd_fed_push_plan.c Normal file
View file

@ -0,0 +1,645 @@
#include "amduatd_fed_push_plan.h"
#include "amduat/asl/log_store.h"
#include "amduat/asl/artifact_io.h"
#include "amduat/asl/ref_text.h"
#include "amduat/asl/store.h"
#include "amduat/enc/fer1_receipt.h"
#include "amduat/enc/tgk1_edge.h"
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
typedef struct {
char *data;
size_t len;
size_t cap;
} amduatd_fed_push_plan_strbuf_t;
static void amduatd_fed_push_plan_strbuf_free(
amduatd_fed_push_plan_strbuf_t *b) {
if (b == NULL) {
return;
}
free(b->data);
b->data = NULL;
b->len = 0;
b->cap = 0;
}
static bool amduatd_fed_push_plan_strbuf_reserve(
amduatd_fed_push_plan_strbuf_t *b,
size_t extra) {
size_t need;
size_t next_cap;
char *next;
if (b == NULL) {
return false;
}
if (extra > (SIZE_MAX - b->len)) {
return false;
}
need = b->len + extra;
if (need <= b->cap) {
return true;
}
next_cap = b->cap != 0 ? b->cap : 256u;
while (next_cap < need) {
if (next_cap > (SIZE_MAX / 2u)) {
next_cap = need;
break;
}
next_cap *= 2u;
}
next = (char *)realloc(b->data, next_cap);
if (next == NULL) {
return false;
}
b->data = next;
b->cap = next_cap;
return true;
}
static bool amduatd_fed_push_plan_strbuf_append(
amduatd_fed_push_plan_strbuf_t *b,
const char *s,
size_t n) {
if (b == NULL) {
return false;
}
if (n == 0u) {
return true;
}
if (s == NULL) {
return false;
}
if (!amduatd_fed_push_plan_strbuf_reserve(b, n + 1u)) {
return false;
}
memcpy(b->data + b->len, s, n);
b->len += n;
b->data[b->len] = '\0';
return true;
}
static bool amduatd_fed_push_plan_strbuf_append_cstr(
amduatd_fed_push_plan_strbuf_t *b,
const char *s) {
return amduatd_fed_push_plan_strbuf_append(
b, s != NULL ? s : "", s != NULL ? strlen(s) : 0u);
}
static const char *amduatd_fed_push_plan_record_type_name(
amduat_fed_record_type_t type) {
switch (type) {
case AMDUAT_FED_REC_ARTIFACT:
return "artifact";
case AMDUAT_FED_REC_PER:
return "per";
case AMDUAT_FED_REC_TGK_EDGE:
return "tgk_edge";
case AMDUAT_FED_REC_TOMBSTONE:
return "tombstone";
default:
return "unknown";
}
}
void amduatd_fed_push_plan_candidate_init(
amduatd_fed_push_cursor_candidate_t *candidate) {
if (candidate == NULL) {
return;
}
memset(candidate, 0, sizeof(*candidate));
candidate->ref = amduat_reference(0u, amduat_octets(NULL, 0u));
}
void amduatd_fed_push_plan_candidate_free(
amduatd_fed_push_cursor_candidate_t *candidate) {
if (candidate == NULL) {
return;
}
if (candidate->has_ref) {
amduat_reference_free(&candidate->ref);
}
memset(candidate, 0, sizeof(*candidate));
}
bool amduatd_fed_push_plan_next_cursor_candidate(
const amduatd_fed_cursor_record_t *cursor,
const amduat_fed_record_t *records,
size_t record_count,
amduatd_fed_push_cursor_candidate_t *out_candidate) {
if (out_candidate == NULL) {
return false;
}
amduatd_fed_push_plan_candidate_init(out_candidate);
if (record_count > 0u && records != NULL) {
const amduat_fed_record_t *last = &records[record_count - 1u];
out_candidate->has_logseq = true;
out_candidate->logseq = last->logseq;
out_candidate->has_ref = true;
if (!amduat_reference_clone(last->id.ref, &out_candidate->ref)) {
amduatd_fed_push_plan_candidate_free(out_candidate);
return false;
}
return true;
}
(void)cursor;
return true;
}
amduatd_fed_push_plan_status_t amduatd_fed_push_plan_check(
const amduatd_fed_cfg_t *cfg,
const amduat_asl_store_t *store) {
if (cfg == NULL || store == NULL) {
return AMDUATD_FED_PUSH_PLAN_ERR_INVALID;
}
if (!cfg->enabled) {
return AMDUATD_FED_PUSH_PLAN_ERR_DISABLED;
}
if (store->ops.log_scan == NULL || store->ops.current_state == NULL) {
return AMDUATD_FED_PUSH_PLAN_ERR_UNSUPPORTED;
}
return AMDUATD_FED_PUSH_PLAN_OK;
}
static void amduatd_fed_push_plan_records_free(amduat_fed_record_t *records,
size_t record_count) {
size_t i;
if (records == NULL) {
return;
}
for (i = 0; i < record_count; ++i) {
amduat_reference_free(&records[i].id.ref);
}
free(records);
}
static bool amduatd_fed_push_entry_record_type(
amduat_asl_store_t *store,
const amduat_asl_log_entry_t *entry,
amduat_fed_record_type_t *out_type) {
amduat_fed_record_type_t rec_type = AMDUAT_FED_REC_ARTIFACT;
if (store == NULL || entry == NULL || out_type == NULL) {
return false;
}
if (entry->kind == AMDUATD_FED_LOG_KIND_ARTIFACT) {
amduat_artifact_t artifact;
amduat_asl_store_error_t store_err;
memset(&artifact, 0, sizeof(artifact));
store_err = amduat_asl_store_get(store, entry->payload_ref, &artifact);
if (store_err == AMDUAT_ASL_STORE_OK && artifact.has_type_tag) {
if (artifact.type_tag.tag_id == AMDUAT_TYPE_TAG_TGK1_EDGE_V1) {
rec_type = AMDUAT_FED_REC_TGK_EDGE;
} else if (artifact.type_tag.tag_id == AMDUAT_TYPE_TAG_FER1_RECEIPT_1) {
rec_type = AMDUAT_FED_REC_PER;
}
}
if (store_err == AMDUAT_ASL_STORE_OK) {
amduat_asl_artifact_free(&artifact);
}
} else if (entry->kind == AMDUATD_FED_LOG_KIND_TOMBSTONE) {
rec_type = AMDUAT_FED_REC_TOMBSTONE;
} else {
return false;
}
*out_type = rec_type;
return true;
}
void amduatd_fed_push_plan_scan_init(amduatd_fed_push_plan_scan_t *scan) {
if (scan == NULL) {
return;
}
memset(scan, 0, sizeof(*scan));
amduatd_fed_cursor_record_init(&scan->cursor);
scan->cursor_ref = amduat_reference(0u, amduat_octets(NULL, 0u));
}
void amduatd_fed_push_plan_scan_free(amduatd_fed_push_plan_scan_t *scan) {
if (scan == NULL) {
return;
}
if (scan->cursor_present) {
amduatd_fed_cursor_record_free(&scan->cursor);
amduat_reference_free(&scan->cursor_ref);
}
amduatd_fed_push_plan_records_free(scan->records, scan->record_count);
memset(scan, 0, sizeof(*scan));
}
amduatd_fed_push_plan_status_t amduatd_fed_push_plan_scan(
amduat_asl_store_t *store,
amduat_asl_pointer_store_t *pointer_store,
const amduatd_space_t *effective_space,
const char *peer_key,
uint64_t limit,
const char *root_path,
amduatd_fed_push_plan_scan_t *out_scan) {
amduat_asl_log_store_t log_store;
amduat_asl_log_entry_t *entries = NULL;
size_t entry_count = 0u;
uint64_t next_offset = 0u;
bool end = false;
amduat_octets_t log_name = amduat_octets(NULL, 0u);
uint64_t from_logseq = 0u;
if (store == NULL || pointer_store == NULL || peer_key == NULL ||
root_path == NULL || out_scan == NULL) {
return AMDUATD_FED_PUSH_PLAN_ERR_INVALID;
}
amduatd_fed_push_plan_scan_init(out_scan);
{
amduatd_fed_cursor_status_t cursor_status;
cursor_status = amduatd_fed_push_cursor_get(store,
pointer_store,
effective_space,
peer_key,
&out_scan->cursor,
&out_scan->cursor_ref);
if (cursor_status == AMDUATD_FED_CURSOR_ERR_NOT_FOUND) {
out_scan->cursor_present = false;
} else if (cursor_status == AMDUATD_FED_CURSOR_OK) {
out_scan->cursor_present = true;
if (out_scan->cursor.has_logseq) {
if (out_scan->cursor.last_logseq == UINT64_MAX) {
amduatd_fed_push_plan_scan_free(out_scan);
return AMDUATD_FED_PUSH_PLAN_ERR_INVALID;
}
from_logseq = out_scan->cursor.last_logseq + 1u;
}
} else {
amduatd_fed_push_plan_scan_free(out_scan);
return AMDUATD_FED_PUSH_PLAN_ERR_INVALID;
}
}
if (!amduat_asl_log_store_init(&log_store, root_path, store,
pointer_store)) {
amduatd_fed_push_plan_scan_free(out_scan);
return AMDUATD_FED_PUSH_PLAN_ERR_INVALID;
}
if (!amduatd_space_scope_name(effective_space, "fed/records", &log_name)) {
amduatd_fed_push_plan_scan_free(out_scan);
return AMDUATD_FED_PUSH_PLAN_ERR_INVALID;
}
{
amduat_asl_store_error_t read_err =
amduat_asl_log_read(&log_store,
(const char *)log_name.data,
from_logseq,
(size_t)limit,
&entries,
&entry_count,
&next_offset,
&end);
amduat_octets_free(&log_name);
if (read_err != AMDUAT_ASL_STORE_OK) {
amduatd_fed_push_plan_scan_free(out_scan);
return AMDUATD_FED_PUSH_PLAN_ERR_INVALID;
}
}
(void)next_offset;
(void)end;
if (entry_count != 0u) {
out_scan->records =
(amduat_fed_record_t *)calloc(entry_count, sizeof(*out_scan->records));
if (out_scan->records == NULL) {
amduat_asl_log_entries_free(entries, entry_count);
amduatd_fed_push_plan_scan_free(out_scan);
return AMDUATD_FED_PUSH_PLAN_ERR_OOM;
}
}
{
size_t i;
for (i = 0; i < entry_count; ++i) {
const amduat_asl_log_entry_t *entry = &entries[i];
amduat_fed_record_type_t rec_type = AMDUAT_FED_REC_ARTIFACT;
uint64_t logseq;
if (entry->payload_ref.digest.data == NULL ||
entry->payload_ref.digest.len == 0u) {
continue;
}
if (from_logseq > UINT64_MAX - (uint64_t)i) {
amduat_asl_log_entries_free(entries, entry_count);
amduatd_fed_push_plan_scan_free(out_scan);
return AMDUATD_FED_PUSH_PLAN_ERR_INVALID;
}
logseq = from_logseq + (uint64_t)i;
if (!amduatd_fed_push_entry_record_type(store, entry, &rec_type)) {
continue;
}
memset(&out_scan->records[out_scan->record_count], 0,
sizeof(out_scan->records[out_scan->record_count]));
out_scan->records[out_scan->record_count].id.type = rec_type;
out_scan->records[out_scan->record_count].logseq = logseq;
if (!amduat_reference_clone(entry->payload_ref,
&out_scan->records[out_scan->record_count]
.id.ref)) {
amduat_asl_log_entries_free(entries, entry_count);
amduatd_fed_push_plan_scan_free(out_scan);
return AMDUATD_FED_PUSH_PLAN_ERR_OOM;
}
out_scan->record_count++;
}
}
amduat_asl_log_entries_free(entries, entry_count);
return AMDUATD_FED_PUSH_PLAN_OK;
}
amduatd_fed_push_plan_status_t amduatd_fed_push_plan_json(
const amduatd_fed_push_plan_input_t *input,
char **out_json) {
amduatd_fed_push_plan_strbuf_t b;
size_t i;
const amduat_fed_record_t *first = NULL;
const amduat_fed_record_t *last = NULL;
amduatd_fed_push_cursor_candidate_t candidate;
char *ref_hex = NULL;
char *cursor_ref_hex = NULL;
char tmp[64];
if (out_json != NULL) {
*out_json = NULL;
}
if (input == NULL || out_json == NULL || input->peer_key == NULL) {
return AMDUATD_FED_PUSH_PLAN_ERR_INVALID;
}
if (input->record_count > 0u && input->records == NULL) {
return AMDUATD_FED_PUSH_PLAN_ERR_INVALID;
}
if (input->cursor_present && input->cursor == NULL) {
return AMDUATD_FED_PUSH_PLAN_ERR_INVALID;
}
if (input->record_count > 0u && input->records != NULL) {
first = &input->records[0];
last = &input->records[input->record_count - 1u];
}
if (!amduatd_fed_push_plan_next_cursor_candidate(
input->cursor_present ? input->cursor : NULL,
input->records,
input->record_count,
&candidate)) {
return AMDUATD_FED_PUSH_PLAN_ERR_OOM;
}
if (input->cursor_present &&
input->cursor_ref != NULL &&
input->cursor_ref->digest.data != NULL) {
if (!amduat_asl_ref_encode_hex(*input->cursor_ref, &cursor_ref_hex)) {
amduatd_fed_push_plan_candidate_free(&candidate);
return AMDUATD_FED_PUSH_PLAN_ERR_OOM;
}
}
memset(&b, 0, sizeof(b));
if (!amduatd_fed_push_plan_strbuf_append_cstr(&b, "{")) {
goto plan_oom;
}
if (!amduatd_fed_push_plan_strbuf_append_cstr(&b, "\"peer\":\"") ||
!amduatd_fed_push_plan_strbuf_append_cstr(&b, input->peer_key) ||
!amduatd_fed_push_plan_strbuf_append_cstr(&b, "\",")) {
goto plan_oom;
}
snprintf(tmp, sizeof(tmp), "%u", (unsigned int)input->domain_id);
if (!amduatd_fed_push_plan_strbuf_append_cstr(&b, "\"domain_id\":") ||
!amduatd_fed_push_plan_strbuf_append_cstr(&b, tmp) ||
!amduatd_fed_push_plan_strbuf_append_cstr(&b, ",")) {
goto plan_oom;
}
if (!amduatd_fed_push_plan_strbuf_append_cstr(&b, "\"effective_space\":{")) {
goto plan_oom;
}
if (input->effective_space != NULL &&
input->effective_space->enabled &&
input->effective_space->space_id.data != NULL) {
const char *space_id = (const char *)input->effective_space->space_id.data;
if (!amduatd_fed_push_plan_strbuf_append_cstr(&b, "\"mode\":\"scoped\",") ||
!amduatd_fed_push_plan_strbuf_append_cstr(&b, "\"space_id\":\"") ||
!amduatd_fed_push_plan_strbuf_append_cstr(&b, space_id) ||
!amduatd_fed_push_plan_strbuf_append_cstr(&b, "\"")) {
goto plan_oom;
}
} else {
if (!amduatd_fed_push_plan_strbuf_append_cstr(
&b, "\"mode\":\"unscoped\",") ||
!amduatd_fed_push_plan_strbuf_append_cstr(&b, "\"space_id\":null")) {
goto plan_oom;
}
}
if (!amduatd_fed_push_plan_strbuf_append_cstr(&b, "},")) {
goto plan_oom;
}
if (!amduatd_fed_push_plan_strbuf_append_cstr(&b, "\"cursor\":{")) {
goto plan_oom;
}
if (!amduatd_fed_push_plan_strbuf_append_cstr(&b, "\"present\":") ||
!amduatd_fed_push_plan_strbuf_append_cstr(&b,
input->cursor_present ? "true"
: "false")) {
goto plan_oom;
}
if (!amduatd_fed_push_plan_strbuf_append_cstr(&b, ",\"last_logseq\":")) {
goto plan_oom;
}
if (input->cursor_present && input->cursor != NULL &&
input->cursor->has_logseq) {
snprintf(tmp, sizeof(tmp), "%llu",
(unsigned long long)input->cursor->last_logseq);
if (!amduatd_fed_push_plan_strbuf_append_cstr(&b, tmp)) {
goto plan_oom;
}
} else {
if (!amduatd_fed_push_plan_strbuf_append_cstr(&b, "null")) {
goto plan_oom;
}
}
if (!amduatd_fed_push_plan_strbuf_append_cstr(&b, ",\"ref\":")) {
goto plan_oom;
}
if (cursor_ref_hex != NULL) {
if (!amduatd_fed_push_plan_strbuf_append_cstr(&b, "\"") ||
!amduatd_fed_push_plan_strbuf_append_cstr(&b, cursor_ref_hex) ||
!amduatd_fed_push_plan_strbuf_append_cstr(&b, "\"")) {
goto plan_oom;
}
} else {
if (!amduatd_fed_push_plan_strbuf_append_cstr(&b, "null")) {
goto plan_oom;
}
}
if (!amduatd_fed_push_plan_strbuf_append_cstr(&b, "},")) {
goto plan_oom;
}
if (!amduatd_fed_push_plan_strbuf_append_cstr(&b, "\"scan\":{")) {
goto plan_oom;
}
snprintf(tmp, sizeof(tmp), "%zu", input->record_count);
if (!amduatd_fed_push_plan_strbuf_append_cstr(&b, "\"record_count\":") ||
!amduatd_fed_push_plan_strbuf_append_cstr(&b, tmp)) {
goto plan_oom;
}
if (!amduatd_fed_push_plan_strbuf_append_cstr(&b, ",\"first_logseq\":")) {
goto plan_oom;
}
if (first != NULL) {
snprintf(tmp, sizeof(tmp), "%llu", (unsigned long long)first->logseq);
if (!amduatd_fed_push_plan_strbuf_append_cstr(&b, tmp)) {
goto plan_oom;
}
} else {
if (!amduatd_fed_push_plan_strbuf_append_cstr(&b, "null")) {
goto plan_oom;
}
}
if (!amduatd_fed_push_plan_strbuf_append_cstr(&b, ",\"last_logseq\":")) {
goto plan_oom;
}
if (last != NULL) {
snprintf(tmp, sizeof(tmp), "%llu", (unsigned long long)last->logseq);
if (!amduatd_fed_push_plan_strbuf_append_cstr(&b, tmp)) {
goto plan_oom;
}
} else {
if (!amduatd_fed_push_plan_strbuf_append_cstr(&b, "null")) {
goto plan_oom;
}
}
if (!amduatd_fed_push_plan_strbuf_append_cstr(&b, "},")) {
goto plan_oom;
}
if (!amduatd_fed_push_plan_strbuf_append_cstr(&b, "\"records\":[")) {
goto plan_oom;
}
for (i = 0; i < input->record_count; ++i) {
const amduat_fed_record_t *rec = &input->records[i];
const char *type_name = amduatd_fed_push_plan_record_type_name(rec->id.type);
if (i > 0) {
if (!amduatd_fed_push_plan_strbuf_append_cstr(&b, ",")) {
goto plan_oom;
}
}
if (!amduat_asl_ref_encode_hex(rec->id.ref, &ref_hex)) {
goto plan_oom;
}
if (!amduatd_fed_push_plan_strbuf_append_cstr(&b, "{\"logseq\":")) {
goto plan_oom;
}
snprintf(tmp, sizeof(tmp), "%llu", (unsigned long long)rec->logseq);
if (!amduatd_fed_push_plan_strbuf_append_cstr(&b, tmp) ||
!amduatd_fed_push_plan_strbuf_append_cstr(&b, ",\"record_type\":\"") ||
!amduatd_fed_push_plan_strbuf_append_cstr(&b, type_name) ||
!amduatd_fed_push_plan_strbuf_append_cstr(&b, "\",\"ref\":\"") ||
!amduatd_fed_push_plan_strbuf_append_cstr(&b, ref_hex) ||
!amduatd_fed_push_plan_strbuf_append_cstr(&b, "\"}")) {
goto plan_oom;
}
free(ref_hex);
ref_hex = NULL;
}
if (!amduatd_fed_push_plan_strbuf_append_cstr(&b, "],")) {
goto plan_oom;
}
if (!amduatd_fed_push_plan_strbuf_append_cstr(&b,
"\"required_artifacts\":[")) {
goto plan_oom;
}
{
bool first_artifact = true;
for (i = 0; i < input->record_count; ++i) {
const amduat_fed_record_t *rec = &input->records[i];
if (rec->id.type == AMDUAT_FED_REC_TOMBSTONE) {
continue;
}
if (!amduat_asl_ref_encode_hex(rec->id.ref, &ref_hex)) {
goto plan_oom;
}
if (!first_artifact) {
if (!amduatd_fed_push_plan_strbuf_append_cstr(&b, ",")) {
goto plan_oom;
}
}
if (!amduatd_fed_push_plan_strbuf_append_cstr(&b, "\"") ||
!amduatd_fed_push_plan_strbuf_append_cstr(&b, ref_hex) ||
!amduatd_fed_push_plan_strbuf_append_cstr(&b, "\"")) {
goto plan_oom;
}
first_artifact = false;
free(ref_hex);
ref_hex = NULL;
}
}
if (!amduatd_fed_push_plan_strbuf_append_cstr(&b, "],")) {
goto plan_oom;
}
if (!amduatd_fed_push_plan_strbuf_append_cstr(
&b, "\"next_cursor_candidate\":{")) {
goto plan_oom;
}
if (!amduatd_fed_push_plan_strbuf_append_cstr(&b, "\"last_logseq\":")) {
goto plan_oom;
}
if (candidate.has_logseq) {
snprintf(tmp, sizeof(tmp), "%llu", (unsigned long long)candidate.logseq);
if (!amduatd_fed_push_plan_strbuf_append_cstr(&b, tmp)) {
goto plan_oom;
}
} else {
if (!amduatd_fed_push_plan_strbuf_append_cstr(&b, "null")) {
goto plan_oom;
}
}
if (!amduatd_fed_push_plan_strbuf_append_cstr(&b, ",\"ref\":")) {
goto plan_oom;
}
if (candidate.has_ref) {
if (!amduat_asl_ref_encode_hex(candidate.ref, &ref_hex)) {
goto plan_oom;
}
if (!amduatd_fed_push_plan_strbuf_append_cstr(&b, "\"") ||
!amduatd_fed_push_plan_strbuf_append_cstr(&b, ref_hex) ||
!amduatd_fed_push_plan_strbuf_append_cstr(&b, "\"")) {
goto plan_oom;
}
free(ref_hex);
ref_hex = NULL;
} else {
if (!amduatd_fed_push_plan_strbuf_append_cstr(&b, "null")) {
goto plan_oom;
}
}
if (!amduatd_fed_push_plan_strbuf_append_cstr(&b, "}}\n")) {
goto plan_oom;
}
amduatd_fed_push_plan_candidate_free(&candidate);
free(cursor_ref_hex);
*out_json = b.data;
return AMDUATD_FED_PUSH_PLAN_OK;
plan_oom:
free(ref_hex);
amduatd_fed_push_plan_candidate_free(&candidate);
free(cursor_ref_hex);
amduatd_fed_push_plan_strbuf_free(&b);
return AMDUATD_FED_PUSH_PLAN_ERR_OOM;
}

View file

@ -0,0 +1,88 @@
#ifndef AMDUATD_FED_PUSH_PLAN_H
#define AMDUATD_FED_PUSH_PLAN_H
#include "amduat/fed/replay.h"
#include "amduatd_fed.h"
#include "amduatd_fed_cursor.h"
#include "amduatd_space.h"
#include <stdbool.h>
#include <stddef.h>
#include <stdint.h>
#ifdef __cplusplus
extern "C" {
#endif
typedef enum {
AMDUATD_FED_PUSH_PLAN_OK = 0,
AMDUATD_FED_PUSH_PLAN_ERR_INVALID = 1,
AMDUATD_FED_PUSH_PLAN_ERR_DISABLED = 2,
AMDUATD_FED_PUSH_PLAN_ERR_UNSUPPORTED = 3,
AMDUATD_FED_PUSH_PLAN_ERR_OOM = 4
} amduatd_fed_push_plan_status_t;
typedef struct {
const char *peer_key;
uint32_t domain_id;
const amduatd_space_t *effective_space;
bool cursor_present;
const amduatd_fed_cursor_record_t *cursor;
const amduat_reference_t *cursor_ref;
const amduat_fed_record_t *records;
size_t record_count;
} amduatd_fed_push_plan_input_t;
typedef struct {
bool has_logseq;
uint64_t logseq;
bool has_ref;
amduat_reference_t ref;
} amduatd_fed_push_cursor_candidate_t;
typedef struct {
bool cursor_present;
amduatd_fed_cursor_record_t cursor;
amduat_reference_t cursor_ref;
amduat_fed_record_t *records;
size_t record_count;
} amduatd_fed_push_plan_scan_t;
amduatd_fed_push_plan_status_t amduatd_fed_push_plan_check(
const amduatd_fed_cfg_t *cfg,
const amduat_asl_store_t *store);
void amduatd_fed_push_plan_scan_init(amduatd_fed_push_plan_scan_t *scan);
void amduatd_fed_push_plan_scan_free(amduatd_fed_push_plan_scan_t *scan);
amduatd_fed_push_plan_status_t amduatd_fed_push_plan_scan(
amduat_asl_store_t *store,
amduat_asl_pointer_store_t *pointer_store,
const amduatd_space_t *effective_space,
const char *peer_key,
uint64_t limit,
const char *root_path,
amduatd_fed_push_plan_scan_t *out_scan);
void amduatd_fed_push_plan_candidate_init(
amduatd_fed_push_cursor_candidate_t *candidate);
void amduatd_fed_push_plan_candidate_free(
amduatd_fed_push_cursor_candidate_t *candidate);
bool amduatd_fed_push_plan_next_cursor_candidate(
const amduatd_fed_cursor_record_t *cursor,
const amduat_fed_record_t *records,
size_t record_count,
amduatd_fed_push_cursor_candidate_t *out_candidate);
amduatd_fed_push_plan_status_t amduatd_fed_push_plan_json(
const amduatd_fed_push_plan_input_t *input,
char **out_json);
#ifdef __cplusplus
} /* extern "C" */
#endif
#endif /* AMDUATD_FED_PUSH_PLAN_H */

View file

@ -0,0 +1,277 @@
#ifndef _POSIX_C_SOURCE
#define _POSIX_C_SOURCE 200809L
#endif
#include "amduatd_fed_push_apply.h"
#include "amduatd_fed_cursor.h"
#include "amduatd_space.h"
#include "amduatd_store.h"
#include "amduat/asl/asl_store_fs_meta.h"
#include "amduat/asl/artifact_io.h"
#include "amduat/asl/log_store.h"
#include "amduat/hash/asl1.h"
#include <stdbool.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
typedef struct {
size_t call_count;
size_t already_present_at;
} amduatd_test_push_transport_t;
static int failures = 0;
static void expect(bool cond, const char *msg) {
if (!cond) {
fprintf(stderr, "FAIL: %s\n", msg);
failures++;
}
}
static char *amduatd_test_make_temp_dir(void) {
char tmpl[] = "/tmp/amduatd-fed-push-XXXXXX";
char *dir = mkdtemp(tmpl);
size_t len;
char *copy;
if (dir == NULL) {
perror("mkdtemp");
return NULL;
}
len = strlen(dir);
copy = (char *)malloc(len + 1u);
if (copy == NULL) {
fprintf(stderr, "failed to allocate temp dir copy\n");
return NULL;
}
memcpy(copy, dir, len + 1u);
return copy;
}
static bool amduatd_test_store_artifact(amduat_asl_store_t *store,
const char *payload,
amduat_reference_t *out_ref) {
amduat_artifact_t artifact;
amduat_octets_t payload_bytes = amduat_octets(NULL, 0u);
amduat_asl_index_state_t state;
amduat_asl_store_error_t err;
if (store == NULL || payload == NULL || out_ref == NULL) {
return false;
}
if (!amduat_octets_clone(amduat_octets(payload, strlen(payload)),
&payload_bytes)) {
return false;
}
if (!amduat_asl_artifact_from_bytes(payload_bytes,
AMDUAT_ASL_IO_RAW,
false,
amduat_type_tag(0u),
&artifact)) {
amduat_octets_free(&payload_bytes);
return false;
}
err = amduat_asl_store_put_indexed(store, artifact, out_ref, &state);
amduat_asl_artifact_free(&artifact);
return err == AMDUAT_ASL_STORE_OK;
}
static bool amduatd_test_append_fed_log(amduat_asl_store_t *store,
amduat_asl_pointer_store_t *pointer_store,
const amduatd_space_t *space,
const char *root_path,
amduat_reference_t ref) {
amduat_asl_log_store_t log_store;
amduat_octets_t log_name = amduat_octets(NULL, 0u);
amduat_asl_log_entry_t entry;
uint64_t offset = 0u;
amduat_asl_store_error_t err;
if (!amduat_asl_log_store_init(&log_store,
root_path,
store,
pointer_store)) {
return false;
}
if (!amduatd_space_scope_name(space, "fed/records", &log_name)) {
return false;
}
memset(&entry, 0, sizeof(entry));
entry.kind = AMDUATD_FED_LOG_KIND_ARTIFACT;
entry.has_timestamp = false;
entry.timestamp = 0u;
entry.payload_ref = ref;
entry.has_actor = false;
entry.actor = amduat_octets(NULL, 0u);
err = amduat_asl_log_append(&log_store,
(const char *)log_name.data,
&entry,
1u,
&offset);
amduat_octets_free(&log_name);
return err == AMDUAT_ASL_STORE_OK;
}
static bool amduatd_test_push_post_ingest(void *ctx,
amduat_fed_record_type_t record_type,
amduat_reference_t ref,
amduat_octets_t bytes,
int *out_status,
char **out_body) {
amduatd_test_push_transport_t *t = (amduatd_test_push_transport_t *)ctx;
const char *status = "ok";
const char *applied = "true";
char buf[256];
int n;
(void)record_type;
(void)ref;
(void)bytes;
if (out_status == NULL || out_body == NULL || t == NULL) {
return false;
}
t->call_count++;
if (t->already_present_at != 0u && t->call_count == t->already_present_at) {
status = "already_present";
applied = "false";
}
n = snprintf(buf,
sizeof(buf),
"{\"status\":\"%s\",\"applied\":%s,"
"\"ref\":null,\"effective_space\":{"
"\"mode\":\"unscoped\",\"space_id\":null}}",
status,
applied);
if (n <= 0 || (size_t)n >= sizeof(buf)) {
return false;
}
*out_status = 200;
*out_body = strdup(buf);
return *out_body != NULL;
}
int main(void) {
char *root = amduatd_test_make_temp_dir();
amduat_asl_store_fs_config_t cfg;
amduatd_store_ctx_t store_ctx;
amduat_asl_store_t store;
amduat_asl_pointer_store_t pointer_store;
amduatd_space_t space;
amduatd_fed_cfg_t fed_cfg;
amduat_reference_t ref0;
amduat_reference_t ref1;
amduatd_fed_push_transport_t transport;
amduatd_test_push_transport_t stub;
amduatd_fed_push_apply_report_t report;
amduatd_fed_push_apply_status_t status;
amduatd_fed_cursor_record_t cursor;
amduat_reference_t cursor_ref;
if (root == NULL) {
return 1;
}
memset(&cfg, 0, sizeof(cfg));
if (!amduat_asl_store_fs_init_root(root, NULL, &cfg)) {
fprintf(stderr, "failed to init store root\n");
free(root);
return 1;
}
memset(&store_ctx, 0, sizeof(store_ctx));
memset(&store, 0, sizeof(store));
if (!amduatd_store_init(&store,
&cfg,
&store_ctx,
root,
AMDUATD_STORE_BACKEND_INDEX)) {
fprintf(stderr, "failed to init store\n");
free(root);
return 1;
}
if (!amduat_asl_pointer_store_init(&pointer_store, root)) {
fprintf(stderr, "failed to init pointer store\n");
free(root);
return 1;
}
if (!amduatd_space_init(&space, "demo", false)) {
fprintf(stderr, "failed to init space\n");
free(root);
return 1;
}
if (!amduatd_test_store_artifact(&store, "alpha", &ref0) ||
!amduatd_test_store_artifact(&store, "beta", &ref1)) {
fprintf(stderr, "failed to store artifacts\n");
free(root);
return 1;
}
if (!amduatd_test_append_fed_log(&store,
&pointer_store,
&space,
root,
ref0) ||
!amduatd_test_append_fed_log(&store,
&pointer_store,
&space,
root,
ref1)) {
fprintf(stderr, "failed to append fed log\n");
amduat_reference_free(&ref0);
amduat_reference_free(&ref1);
free(root);
return 1;
}
amduatd_fed_cfg_init(&fed_cfg);
fed_cfg.enabled = true;
memset(&stub, 0, sizeof(stub));
stub.already_present_at = 1u;
memset(&transport, 0, sizeof(transport));
transport.ctx = &stub;
transport.post_ingest = amduatd_test_push_post_ingest;
status = amduatd_fed_push_apply(&store,
&pointer_store,
&space,
"2",
16u,
root,
&fed_cfg,
&transport,
&report);
expect(status == AMDUATD_FED_PUSH_APPLY_OK, "push apply ok");
expect(report.sent_record_count == 2u, "sent record count");
expect(report.peer_ok_count == 1u, "peer ok count");
expect(report.peer_already_present_count == 1u, "peer already present");
expect(report.cursor_advanced, "cursor advanced");
expect(report.cursor_after_has_logseq &&
report.cursor_after_logseq == 1u,
"cursor after logseq");
amduatd_fed_cursor_record_init(&cursor);
memset(&cursor_ref, 0, sizeof(cursor_ref));
{
amduatd_fed_cursor_status_t st;
st = amduatd_fed_push_cursor_get(&store,
&pointer_store,
&space,
"2",
&cursor,
&cursor_ref);
expect(st == AMDUATD_FED_CURSOR_OK, "push cursor stored");
expect(cursor.has_logseq && cursor.last_logseq == 1u,
"push cursor logseq");
}
amduatd_fed_cursor_record_free(&cursor);
amduat_reference_free(&cursor_ref);
amduatd_fed_push_apply_report_free(&report);
amduat_reference_free(&ref0);
amduat_reference_free(&ref1);
free(root);
return failures == 0 ? 0 : 1;
}

View file

@ -0,0 +1,175 @@
#ifndef _POSIX_C_SOURCE
#define _POSIX_C_SOURCE 200809L
#endif
#include "amduatd_fed_push_plan.h"
#include "amduatd_fed_cursor.h"
#include "amduat/asl/ref_text.h"
#include "amduat/hash/asl1.h"
#include <stdbool.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
static int failures = 0;
static void expect(bool cond, const char *msg) {
if (!cond) {
fprintf(stderr, "FAIL: %s\n", msg);
failures++;
}
}
static bool amduatd_make_test_ref(uint8_t fill, amduat_reference_t *out_ref) {
uint8_t digest_bytes[32];
amduat_octets_t digest;
if (out_ref == NULL) {
return false;
}
memset(digest_bytes, fill, sizeof(digest_bytes));
if (!amduat_octets_clone(amduat_octets(digest_bytes, sizeof(digest_bytes)),
&digest)) {
return false;
}
*out_ref = amduat_reference(AMDUAT_HASH_ASL1_ID_SHA256, digest);
return true;
}
int main(void) {
amduatd_fed_cfg_t cfg;
amduat_asl_store_t store;
amduatd_fed_push_plan_status_t status;
amduatd_fed_cfg_init(&cfg);
memset(&store, 0, sizeof(store));
status = amduatd_fed_push_plan_check(&cfg, &store);
expect(status == AMDUATD_FED_PUSH_PLAN_ERR_DISABLED,
"disabled federation check");
cfg.enabled = true;
status = amduatd_fed_push_plan_check(&cfg, &store);
expect(status == AMDUATD_FED_PUSH_PLAN_ERR_UNSUPPORTED,
"unsupported backend check");
{
amduatd_fed_push_plan_input_t input;
amduat_fed_record_t records[2];
amduat_reference_t ref0;
amduat_reference_t ref1;
char *json = NULL;
char *ref0_hex = NULL;
if (!amduatd_make_test_ref(0x01, &ref0) ||
!amduatd_make_test_ref(0x02, &ref1)) {
fprintf(stderr, "FAIL: make refs\n");
return 1;
}
memset(records, 0, sizeof(records));
records[0].id.type = AMDUAT_FED_REC_ARTIFACT;
records[0].id.ref = ref0;
records[0].logseq = 1u;
records[1].id.type = AMDUAT_FED_REC_PER;
records[1].id.ref = ref1;
records[1].logseq = 2u;
memset(&input, 0, sizeof(input));
input.peer_key = "1";
input.domain_id = 42u;
input.cursor_present = false;
input.records = records;
input.record_count = 2u;
status = amduatd_fed_push_plan_json(&input, &json);
expect(status == AMDUATD_FED_PUSH_PLAN_OK, "plan json missing cursor");
expect(json != NULL && strstr(json, "\"present\":false") != NULL,
"cursor present false");
expect(json != NULL && strstr(json, "\"record_count\":2") != NULL,
"record count");
expect(json != NULL && strstr(json, "\"last_logseq\":2") != NULL,
"next cursor candidate");
expect(json != NULL && strstr(json, "\"domain_id\":42") != NULL,
"domain id");
expect(json != NULL && strstr(json, "\"record_type\":\"per\"") != NULL,
"record type per");
if (amduat_asl_ref_encode_hex(ref0, &ref0_hex)) {
expect(json != NULL && strstr(json, ref0_hex) != NULL,
"required artifacts include ref");
} else {
fprintf(stderr, "FAIL: encode ref\n");
failures++;
}
free(ref0_hex);
free(json);
amduat_reference_free(&ref0);
amduat_reference_free(&ref1);
}
{
amduatd_fed_push_plan_input_t input;
amduatd_fed_cursor_record_t cursor;
amduat_reference_t cursor_ref;
amduat_reference_t record_ref;
char *json = NULL;
char *cursor_ref_hex = NULL;
amduatd_fed_cursor_record_init(&cursor);
cursor.peer_key = strdup("7");
cursor.space_id = NULL;
if (cursor.peer_key == NULL) {
fprintf(stderr, "FAIL: cursor peer allocation\n");
return 1;
}
cursor.has_logseq = true;
cursor.last_logseq = 5u;
if (!amduatd_make_test_ref(0x03, &record_ref)) {
fprintf(stderr, "FAIL: make cursor record ref\n");
return 1;
}
cursor.has_record_ref = true;
cursor.last_record_ref = record_ref;
if (!amduatd_make_test_ref(0x04, &cursor_ref)) {
fprintf(stderr, "FAIL: make cursor ref\n");
amduatd_fed_cursor_record_free(&cursor);
return 1;
}
if (!amduat_asl_ref_encode_hex(cursor_ref, &cursor_ref_hex)) {
fprintf(stderr, "FAIL: encode cursor ref\n");
amduat_reference_free(&cursor_ref);
amduatd_fed_cursor_record_free(&cursor);
return 1;
}
memset(&input, 0, sizeof(input));
input.peer_key = "7";
input.domain_id = 7u;
input.cursor_present = true;
input.cursor = &cursor;
input.cursor_ref = &cursor_ref;
input.records = NULL;
input.record_count = 0u;
status = amduatd_fed_push_plan_json(&input, &json);
expect(status == AMDUATD_FED_PUSH_PLAN_OK, "plan json with cursor");
expect(json != NULL && strstr(json, "\"present\":true") != NULL,
"cursor present true");
expect(json != NULL && strstr(json, "\"last_logseq\":5") != NULL,
"cursor logseq echoed");
expect(json != NULL &&
strstr(json, "\"next_cursor_candidate\":{\"last_logseq\":null") !=
NULL,
"next cursor candidate empty");
expect(json != NULL && strstr(json, cursor_ref_hex) != NULL,
"cursor ref echoed");
free(cursor_ref_hex);
free(json);
amduat_reference_free(&cursor_ref);
amduatd_fed_cursor_record_free(&cursor);
}
return failures == 0 ? 0 : 1;
}