amduat-api/src/amduatd_fed_pull_apply.c
2026-01-24 14:03:26 +01:00

519 lines
18 KiB
C

#include "amduatd_fed_pull_apply.h"
#include "amduat/asl/artifact_io.h"
#include "amduat/asl/store.h"
#include "amduat/enc/fer1_receipt.h"
#include "amduat/enc/tgk1_edge.h"
#include "amduat/fed/ingest.h"
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
static bool amduatd_fed_pull_parse_u32(const char *s, uint32_t *out) {
char *end = NULL;
unsigned long val;
if (s == NULL || out == NULL || s[0] == '\0') {
return false;
}
val = strtoul(s, &end, 10);
if (end == s || *end != '\0' || val > UINT32_MAX) {
return false;
}
*out = (uint32_t)val;
return true;
}
static bool amduatd_fed_pull_strdup(const char *s, char **out) {
size_t len;
char *buf;
if (out == NULL) {
return false;
}
*out = NULL;
if (s == NULL) {
return false;
}
len = strlen(s);
if (len > SIZE_MAX - 1u) {
return false;
}
buf = (char *)malloc(len + 1u);
if (buf == NULL) {
return false;
}
if (len != 0u) {
memcpy(buf, s, len);
}
buf[len] = '\0';
*out = buf;
return true;
}
void amduatd_fed_pull_apply_report_init(
amduatd_fed_pull_apply_report_t *report) {
if (report == NULL) {
return;
}
memset(report, 0, sizeof(*report));
report->cursor_ref = amduat_reference(0u, amduat_octets(NULL, 0u));
report->cursor_after_ref = amduat_reference(0u, amduat_octets(NULL, 0u));
amduatd_fed_pull_plan_candidate_init(&report->plan_candidate);
}
void amduatd_fed_pull_apply_report_free(
amduatd_fed_pull_apply_report_t *report) {
if (report == NULL) {
return;
}
if (report->cursor_ref_set) {
amduat_reference_free(&report->cursor_ref);
}
if (report->cursor_after_ref_set) {
amduat_reference_free(&report->cursor_after_ref);
}
amduatd_fed_pull_plan_candidate_free(&report->plan_candidate);
memset(report, 0, sizeof(*report));
}
static void amduatd_fed_pull_report_error(
amduatd_fed_pull_apply_report_t *report,
const char *msg) {
if (report == NULL || msg == NULL) {
return;
}
memset(report->error, 0, sizeof(report->error));
strncpy(report->error, msg, sizeof(report->error) - 1u);
}
static bool amduatd_fed_pull_apply_record(
amduat_asl_store_t *store,
const amduatd_fed_pull_transport_t *transport,
const amduat_fed_record_t *record,
size_t *io_artifact_count,
int *out_remote_status,
char *err_buf,
size_t err_cap) {
int status = 0;
amduat_octets_t bytes = amduat_octets(NULL, 0u);
amduat_artifact_t artifact;
amduat_reference_t stored_ref;
amduat_asl_index_state_t state;
amduat_asl_store_error_t store_err;
amduat_type_tag_t type_tag = amduat_type_tag(0u);
bool has_tag = false;
char *body = NULL;
if (out_remote_status != NULL) {
*out_remote_status = 0;
}
if (record->id.type == AMDUAT_FED_REC_TOMBSTONE) {
store_err = amduat_asl_store_tombstone(store,
record->id.ref,
0u,
0u,
&state);
if (store_err != AMDUAT_ASL_STORE_OK) {
snprintf(err_buf, err_cap, "tombstone failed");
return false;
}
return true;
}
if (transport == NULL || transport->get_artifact == NULL) {
snprintf(err_buf, err_cap, "missing artifact transport");
return false;
}
if (!transport->get_artifact(transport->ctx,
record->id.ref,
&status,
&bytes,
&body)) {
snprintf(err_buf, err_cap, "artifact fetch failed");
free(body);
return false;
}
if (status != 200) {
if (out_remote_status != NULL) {
*out_remote_status = status;
}
snprintf(err_buf,
err_cap,
"artifact fetch status %d",
status);
free(body);
amduat_octets_free(&bytes);
return false;
}
free(body);
if (record->id.type == AMDUAT_FED_REC_TGK_EDGE) {
type_tag = amduat_type_tag(AMDUAT_TYPE_TAG_TGK1_EDGE_V1);
has_tag = true;
} else if (record->id.type == AMDUAT_FED_REC_PER) {
type_tag = amduat_type_tag(AMDUAT_TYPE_TAG_FER1_RECEIPT_1);
has_tag = true;
}
if (!amduat_asl_artifact_from_bytes(bytes,
AMDUAT_ASL_IO_RAW,
has_tag,
type_tag,
&artifact)) {
amduat_octets_free(&bytes);
snprintf(err_buf, err_cap, "artifact decode failed");
return false;
}
bytes = amduat_octets(NULL, 0u);
store_err = amduat_asl_store_put_indexed(store,
artifact,
&stored_ref,
&state);
amduat_asl_artifact_free(&artifact);
amduat_octets_free(&bytes);
if (store_err != AMDUAT_ASL_STORE_OK) {
snprintf(err_buf, err_cap, "artifact store failed");
return false;
}
amduat_reference_free(&stored_ref);
if (io_artifact_count != NULL) {
*io_artifact_count += 1u;
}
return true;
}
amduatd_fed_pull_apply_status_t amduatd_fed_pull_apply(
amduat_asl_store_t *store,
amduat_asl_pointer_store_t *pointer_store,
const amduatd_space_t *effective_space,
const char *peer_key,
uint64_t limit,
const amduatd_fed_cfg_t *fed_cfg,
const amduatd_fed_pull_transport_t *transport,
amduatd_fed_pull_apply_report_t *out_report) {
uint32_t domain_id = 0u;
amduatd_fed_cursor_record_t cursor;
amduat_reference_t cursor_ref;
bool cursor_present = false;
amduat_fed_record_t *records = NULL;
size_t record_len = 0;
size_t record_len_total = 0;
int remote_status = 0;
char *remote_body = NULL;
amduatd_fed_cursor_candidate_t candidate;
amduatd_fed_cursor_record_t next_cursor;
amduat_reference_t next_ref;
size_t i;
size_t applied_records = 0u;
size_t applied_artifacts = 0u;
char err_buf[128];
if (out_report == NULL) {
return AMDUATD_FED_PULL_APPLY_ERR_INVALID;
}
amduatd_fed_pull_apply_report_init(out_report);
out_report->peer_key = peer_key;
out_report->effective_space = effective_space;
out_report->limit = limit;
if (store == NULL || pointer_store == NULL || peer_key == NULL ||
fed_cfg == NULL || transport == NULL) {
amduatd_fed_pull_report_error(out_report, "invalid inputs");
return AMDUATD_FED_PULL_APPLY_ERR_INVALID;
}
if (!fed_cfg->enabled) {
amduatd_fed_pull_report_error(out_report, "federation disabled");
return AMDUATD_FED_PULL_APPLY_ERR_DISABLED;
}
if (store->ops.log_scan == NULL || store->ops.current_state == NULL ||
store->ops.put_indexed == NULL || store->ops.tombstone == NULL) {
amduatd_fed_pull_report_error(out_report, "requires index backend");
return AMDUATD_FED_PULL_APPLY_ERR_UNSUPPORTED;
}
{
amduat_octets_t scoped = amduat_octets(NULL, 0u);
if (!amduatd_fed_cursor_pointer_name(effective_space,
peer_key,
&scoped)) {
amduatd_fed_pull_report_error(out_report, "invalid peer");
return AMDUATD_FED_PULL_APPLY_ERR_INVALID;
}
amduat_octets_free(&scoped);
}
if (!amduatd_fed_pull_parse_u32(peer_key, &domain_id)) {
amduatd_fed_pull_report_error(out_report, "invalid peer");
return AMDUATD_FED_PULL_APPLY_ERR_INVALID;
}
if (transport->get_records == NULL || transport->free_records == NULL ||
transport->get_artifact == NULL) {
amduatd_fed_pull_report_error(out_report, "transport unavailable");
return AMDUATD_FED_PULL_APPLY_ERR_UNSUPPORTED;
}
amduatd_fed_cursor_record_init(&cursor);
memset(&cursor_ref, 0, sizeof(cursor_ref));
{
amduatd_fed_cursor_status_t cursor_status;
cursor_status = amduatd_fed_cursor_get(store,
pointer_store,
effective_space,
peer_key,
&cursor,
&cursor_ref);
if (cursor_status == AMDUATD_FED_CURSOR_ERR_NOT_FOUND) {
cursor_present = false;
} else if (cursor_status == AMDUATD_FED_CURSOR_OK) {
cursor_present = true;
} else {
amduatd_fed_cursor_record_free(&cursor);
amduat_reference_free(&cursor_ref);
amduatd_fed_pull_report_error(out_report, "cursor read failed");
return AMDUATD_FED_PULL_APPLY_ERR_STORE;
}
}
if (cursor_present && cursor.has_logseq &&
cursor.last_logseq == UINT64_MAX) {
amduatd_fed_cursor_record_free(&cursor);
amduat_reference_free(&cursor_ref);
amduatd_fed_pull_report_error(out_report, "cursor overflow");
return AMDUATD_FED_PULL_APPLY_ERR_INVALID;
}
if (!transport->get_records(transport->ctx,
domain_id,
cursor_present && cursor.has_logseq
? cursor.last_logseq + 1u
: 0u,
limit,
&remote_status,
&records,
&record_len,
&remote_body)) {
amduatd_fed_cursor_record_free(&cursor);
amduat_reference_free(&cursor_ref);
amduatd_fed_pull_report_error(out_report, "remote fetch failed");
return AMDUATD_FED_PULL_APPLY_ERR_REMOTE;
}
out_report->remote_status = remote_status;
if (remote_status != 200) {
amduatd_fed_cursor_record_free(&cursor);
amduat_reference_free(&cursor_ref);
if (remote_body != NULL && remote_body[0] != '\0') {
amduatd_fed_pull_report_error(out_report, remote_body);
} else {
amduatd_fed_pull_report_error(out_report, "remote error");
}
free(remote_body);
return AMDUATD_FED_PULL_APPLY_ERR_REMOTE;
}
free(remote_body);
remote_body = NULL;
record_len_total = record_len;
if (record_len > limit) {
record_len = (size_t)limit;
}
out_report->cursor_present = cursor_present;
if (cursor_present && cursor.has_logseq) {
out_report->cursor_has_logseq = true;
out_report->cursor_logseq = cursor.last_logseq;
}
if (cursor_present) {
if (amduat_reference_clone(cursor_ref, &out_report->cursor_ref)) {
out_report->cursor_ref_set = true;
}
}
if (!amduatd_fed_pull_plan_next_cursor_candidate(cursor_present ? &cursor
: NULL,
records,
record_len,
&candidate)) {
if (records != NULL) {
transport->free_records(transport->ctx, records, record_len_total);
}
amduatd_fed_cursor_record_free(&cursor);
amduat_reference_free(&cursor_ref);
amduatd_fed_pull_report_error(out_report, "plan candidate failed");
return AMDUATD_FED_PULL_APPLY_ERR_INVALID;
}
out_report->plan_record_count = record_len;
out_report->plan_candidate = candidate;
if (record_len == 0u) {
if (records != NULL) {
transport->free_records(transport->ctx, records, record_len_total);
}
amduatd_fed_cursor_record_free(&cursor);
amduat_reference_free(&cursor_ref);
return AMDUATD_FED_PULL_APPLY_OK;
}
{
size_t err_index = 0;
size_t conflict_index = 0;
amduat_fed_ingest_error_t ingest_rc;
ingest_rc = amduat_fed_ingest_validate(records,
record_len,
&err_index,
&conflict_index);
if (ingest_rc != AMDUAT_FED_INGEST_OK) {
transport->free_records(transport->ctx, records, record_len_total);
amduatd_fed_cursor_record_free(&cursor);
amduat_reference_free(&cursor_ref);
amduatd_fed_pull_report_error(out_report, "invalid record batch");
return AMDUATD_FED_PULL_APPLY_ERR_INVALID;
}
}
if (cursor_present && cursor.has_logseq &&
records[0].logseq <= cursor.last_logseq) {
transport->free_records(transport->ctx, records, record_len_total);
amduatd_fed_cursor_record_free(&cursor);
amduat_reference_free(&cursor_ref);
amduatd_fed_pull_report_error(out_report, "cursor would move backwards");
return AMDUATD_FED_PULL_APPLY_ERR_INVALID;
}
for (i = 0; i < record_len; ++i) {
int artifact_status = 0;
if (i > 0 && records[i].logseq < records[i - 1].logseq) {
transport->free_records(transport->ctx, records, record_len_total);
amduatd_fed_cursor_record_free(&cursor);
amduat_reference_free(&cursor_ref);
amduatd_fed_pull_report_error(out_report, "record order invalid");
return AMDUATD_FED_PULL_APPLY_ERR_INVALID;
}
memset(err_buf, 0, sizeof(err_buf));
if (!amduatd_fed_pull_apply_record(store,
transport,
&records[i],
&applied_artifacts,
&artifact_status,
err_buf,
sizeof(err_buf))) {
transport->free_records(transport->ctx, records, record_len_total);
amduatd_fed_cursor_record_free(&cursor);
amduat_reference_free(&cursor_ref);
applied_records = i;
out_report->applied_record_count = applied_records;
out_report->applied_artifact_count = applied_artifacts;
if (artifact_status != 0) {
out_report->remote_status = artifact_status;
}
amduatd_fed_pull_report_error(out_report, err_buf);
return AMDUATD_FED_PULL_APPLY_ERR_STORE;
}
applied_records++;
}
out_report->applied_record_count = applied_records;
out_report->applied_artifact_count = applied_artifacts;
amduatd_fed_cursor_record_init(&next_cursor);
if (!amduatd_fed_pull_strdup(peer_key, &next_cursor.peer_key)) {
transport->free_records(transport->ctx, records, record_len_total);
amduatd_fed_cursor_record_free(&cursor);
amduat_reference_free(&cursor_ref);
amduatd_fed_cursor_record_free(&next_cursor);
amduatd_fed_pull_report_error(out_report, "oom");
return AMDUATD_FED_PULL_APPLY_ERR_OOM;
}
if (next_cursor.peer_key == NULL) {
transport->free_records(transport->ctx, records, record_len_total);
amduatd_fed_cursor_record_free(&cursor);
amduat_reference_free(&cursor_ref);
amduatd_fed_pull_report_error(out_report, "oom");
return AMDUATD_FED_PULL_APPLY_ERR_OOM;
}
if (effective_space != NULL && effective_space->enabled &&
effective_space->space_id.data != NULL) {
if (!amduatd_fed_pull_strdup(
(const char *)effective_space->space_id.data,
&next_cursor.space_id)) {
transport->free_records(transport->ctx, records, record_len_total);
amduatd_fed_cursor_record_free(&cursor);
amduat_reference_free(&cursor_ref);
amduatd_fed_cursor_record_free(&next_cursor);
amduatd_fed_pull_report_error(out_report, "oom");
return AMDUATD_FED_PULL_APPLY_ERR_OOM;
}
if (next_cursor.space_id == NULL) {
transport->free_records(transport->ctx, records, record_len_total);
amduatd_fed_cursor_record_free(&cursor);
amduat_reference_free(&cursor_ref);
amduatd_fed_cursor_record_free(&next_cursor);
amduatd_fed_pull_report_error(out_report, "oom");
return AMDUATD_FED_PULL_APPLY_ERR_OOM;
}
}
if (candidate.has_logseq) {
next_cursor.has_logseq = true;
next_cursor.last_logseq = candidate.logseq;
}
if (candidate.has_ref) {
next_cursor.has_record_ref = true;
if (!amduat_reference_clone(candidate.ref,
&next_cursor.last_record_ref)) {
transport->free_records(transport->ctx, records, record_len_total);
amduatd_fed_cursor_record_free(&cursor);
amduat_reference_free(&cursor_ref);
amduatd_fed_cursor_record_free(&next_cursor);
amduatd_fed_pull_report_error(out_report, "oom");
return AMDUATD_FED_PULL_APPLY_ERR_OOM;
}
}
memset(&next_ref, 0, sizeof(next_ref));
{
amduatd_fed_cursor_status_t cursor_status;
cursor_status = amduatd_fed_cursor_cas_set(store,
pointer_store,
effective_space,
peer_key,
cursor_present ? &cursor_ref
: NULL,
&next_cursor,
&next_ref);
if (cursor_status == AMDUATD_FED_CURSOR_ERR_CONFLICT) {
transport->free_records(transport->ctx, records, record_len_total);
amduatd_fed_cursor_record_free(&cursor);
amduat_reference_free(&cursor_ref);
amduatd_fed_cursor_record_free(&next_cursor);
amduat_reference_free(&next_ref);
amduatd_fed_pull_report_error(out_report, "cursor conflict");
return AMDUATD_FED_PULL_APPLY_ERR_CONFLICT;
}
if (cursor_status != AMDUATD_FED_CURSOR_OK) {
transport->free_records(transport->ctx, records, record_len_total);
amduatd_fed_cursor_record_free(&cursor);
amduat_reference_free(&cursor_ref);
amduatd_fed_cursor_record_free(&next_cursor);
amduat_reference_free(&next_ref);
amduatd_fed_pull_report_error(out_report, "cursor update failed");
return AMDUATD_FED_PULL_APPLY_ERR_STORE;
}
}
out_report->cursor_advanced = true;
if (candidate.has_logseq) {
out_report->cursor_after_has_logseq = true;
out_report->cursor_after_logseq = candidate.logseq;
}
if (amduat_reference_clone(next_ref, &out_report->cursor_after_ref)) {
out_report->cursor_after_ref_set = true;
}
transport->free_records(transport->ctx, records, record_len_total);
amduatd_fed_cursor_record_free(&cursor);
amduat_reference_free(&cursor_ref);
amduatd_fed_cursor_record_free(&next_cursor);
amduat_reference_free(&next_ref);
return AMDUATD_FED_PULL_APPLY_OK;
}