#include "amduatd_fed_pull_apply.h" #include "amduat/asl/artifact_io.h" #include "amduat/asl/store.h" #include "amduat/enc/fer1_receipt.h" #include "amduat/enc/tgk1_edge.h" #include "amduat/fed/ingest.h" #include #include #include static bool amduatd_fed_pull_parse_u32(const char *s, uint32_t *out) { char *end = NULL; unsigned long val; if (s == NULL || out == NULL || s[0] == '\0') { return false; } val = strtoul(s, &end, 10); if (end == s || *end != '\0' || val > UINT32_MAX) { return false; } *out = (uint32_t)val; return true; } static bool amduatd_fed_pull_strdup(const char *s, char **out) { size_t len; char *buf; if (out == NULL) { return false; } *out = NULL; if (s == NULL) { return false; } len = strlen(s); if (len > SIZE_MAX - 1u) { return false; } buf = (char *)malloc(len + 1u); if (buf == NULL) { return false; } if (len != 0u) { memcpy(buf, s, len); } buf[len] = '\0'; *out = buf; return true; } void amduatd_fed_pull_apply_report_init( amduatd_fed_pull_apply_report_t *report) { if (report == NULL) { return; } memset(report, 0, sizeof(*report)); report->cursor_ref = amduat_reference(0u, amduat_octets(NULL, 0u)); report->cursor_after_ref = amduat_reference(0u, amduat_octets(NULL, 0u)); amduatd_fed_pull_plan_candidate_init(&report->plan_candidate); } void amduatd_fed_pull_apply_report_free( amduatd_fed_pull_apply_report_t *report) { if (report == NULL) { return; } if (report->cursor_ref_set) { amduat_reference_free(&report->cursor_ref); } if (report->cursor_after_ref_set) { amduat_reference_free(&report->cursor_after_ref); } amduatd_fed_pull_plan_candidate_free(&report->plan_candidate); memset(report, 0, sizeof(*report)); } static void amduatd_fed_pull_report_error( amduatd_fed_pull_apply_report_t *report, const char *msg) { if (report == NULL || msg == NULL) { return; } memset(report->error, 0, sizeof(report->error)); strncpy(report->error, msg, sizeof(report->error) - 1u); } static bool amduatd_fed_pull_apply_record( amduat_asl_store_t *store, const amduatd_fed_pull_transport_t *transport, const amduat_fed_record_t *record, size_t *io_artifact_count, int *out_remote_status, char *err_buf, size_t err_cap) { int status = 0; amduat_octets_t bytes = amduat_octets(NULL, 0u); amduat_artifact_t artifact; amduat_reference_t stored_ref; amduat_asl_index_state_t state; amduat_asl_store_error_t store_err; amduat_type_tag_t type_tag = amduat_type_tag(0u); bool has_tag = false; char *body = NULL; if (out_remote_status != NULL) { *out_remote_status = 0; } if (record->id.type == AMDUAT_FED_REC_TOMBSTONE) { store_err = amduat_asl_store_tombstone(store, record->id.ref, 0u, 0u, &state); if (store_err != AMDUAT_ASL_STORE_OK) { snprintf(err_buf, err_cap, "tombstone failed"); return false; } return true; } if (transport == NULL || transport->get_artifact == NULL) { snprintf(err_buf, err_cap, "missing artifact transport"); return false; } if (!transport->get_artifact(transport->ctx, record->id.ref, &status, &bytes, &body)) { snprintf(err_buf, err_cap, "artifact fetch failed"); free(body); return false; } if (status != 200) { if (out_remote_status != NULL) { *out_remote_status = status; } snprintf(err_buf, err_cap, "artifact fetch status %d", status); free(body); amduat_octets_free(&bytes); return false; } free(body); if (record->id.type == AMDUAT_FED_REC_TGK_EDGE) { type_tag = amduat_type_tag(AMDUAT_TYPE_TAG_TGK1_EDGE_V1); has_tag = true; } else if (record->id.type == AMDUAT_FED_REC_PER) { type_tag = amduat_type_tag(AMDUAT_TYPE_TAG_FER1_RECEIPT_1); has_tag = true; } if (!amduat_asl_artifact_from_bytes(bytes, AMDUAT_ASL_IO_RAW, has_tag, type_tag, &artifact)) { amduat_octets_free(&bytes); snprintf(err_buf, err_cap, "artifact decode failed"); return false; } bytes = amduat_octets(NULL, 0u); store_err = amduat_asl_store_put_indexed(store, artifact, &stored_ref, &state); amduat_asl_artifact_free(&artifact); amduat_octets_free(&bytes); if (store_err != AMDUAT_ASL_STORE_OK) { snprintf(err_buf, err_cap, "artifact store failed"); return false; } amduat_reference_free(&stored_ref); if (io_artifact_count != NULL) { *io_artifact_count += 1u; } return true; } amduatd_fed_pull_apply_status_t amduatd_fed_pull_apply( amduat_asl_store_t *store, amduat_asl_pointer_store_t *pointer_store, const amduatd_space_t *effective_space, const char *peer_key, const char *remote_space_id, uint64_t limit, const amduatd_fed_cfg_t *fed_cfg, const amduatd_fed_pull_transport_t *transport, amduatd_fed_pull_apply_report_t *out_report) { uint32_t domain_id = 0u; amduatd_fed_cursor_record_t cursor; amduat_reference_t cursor_ref; bool cursor_present = false; amduat_fed_record_t *records = NULL; size_t record_len = 0; size_t record_len_total = 0; int remote_status = 0; char *remote_body = NULL; amduatd_fed_cursor_candidate_t candidate; amduatd_fed_cursor_record_t next_cursor; amduat_reference_t next_ref; size_t i; size_t applied_records = 0u; size_t applied_artifacts = 0u; char err_buf[128]; if (out_report == NULL) { return AMDUATD_FED_PULL_APPLY_ERR_INVALID; } amduatd_fed_pull_apply_report_init(out_report); out_report->peer_key = peer_key; out_report->effective_space = effective_space; out_report->limit = limit; if (store == NULL || pointer_store == NULL || peer_key == NULL || fed_cfg == NULL || transport == NULL) { amduatd_fed_pull_report_error(out_report, "invalid inputs"); return AMDUATD_FED_PULL_APPLY_ERR_INVALID; } if (!fed_cfg->enabled) { amduatd_fed_pull_report_error(out_report, "federation disabled"); return AMDUATD_FED_PULL_APPLY_ERR_DISABLED; } if (store->ops.log_scan == NULL || store->ops.current_state == NULL || store->ops.put_indexed == NULL || store->ops.tombstone == NULL) { amduatd_fed_pull_report_error(out_report, "requires index backend"); return AMDUATD_FED_PULL_APPLY_ERR_UNSUPPORTED; } { amduat_octets_t scoped = amduat_octets(NULL, 0u); if (remote_space_id != NULL && remote_space_id[0] != '\0') { if (!amduatd_fed_cursor_pointer_name_v2(effective_space, peer_key, remote_space_id, &scoped)) { amduatd_fed_pull_report_error(out_report, "invalid peer"); return AMDUATD_FED_PULL_APPLY_ERR_INVALID; } } else if (!amduatd_fed_cursor_pointer_name(effective_space, peer_key, &scoped)) { amduatd_fed_pull_report_error(out_report, "invalid peer"); return AMDUATD_FED_PULL_APPLY_ERR_INVALID; } amduat_octets_free(&scoped); } if (!amduatd_fed_pull_parse_u32(peer_key, &domain_id)) { amduatd_fed_pull_report_error(out_report, "invalid peer"); return AMDUATD_FED_PULL_APPLY_ERR_INVALID; } if (transport->get_records == NULL || transport->free_records == NULL || transport->get_artifact == NULL) { amduatd_fed_pull_report_error(out_report, "transport unavailable"); return AMDUATD_FED_PULL_APPLY_ERR_UNSUPPORTED; } amduatd_fed_cursor_record_init(&cursor); memset(&cursor_ref, 0, sizeof(cursor_ref)); { amduatd_fed_cursor_status_t cursor_status; cursor_status = amduatd_fed_cursor_get_remote(store, pointer_store, effective_space, peer_key, remote_space_id, &cursor, &cursor_ref); if (cursor_status == AMDUATD_FED_CURSOR_ERR_NOT_FOUND) { cursor_present = false; } else if (cursor_status == AMDUATD_FED_CURSOR_OK) { cursor_present = true; } else { amduatd_fed_cursor_record_free(&cursor); amduat_reference_free(&cursor_ref); amduatd_fed_pull_report_error(out_report, "cursor read failed"); return AMDUATD_FED_PULL_APPLY_ERR_STORE; } } if (cursor_present && cursor.has_logseq && cursor.last_logseq == UINT64_MAX) { amduatd_fed_cursor_record_free(&cursor); amduat_reference_free(&cursor_ref); amduatd_fed_pull_report_error(out_report, "cursor overflow"); return AMDUATD_FED_PULL_APPLY_ERR_INVALID; } if (!transport->get_records(transport->ctx, domain_id, cursor_present && cursor.has_logseq ? cursor.last_logseq + 1u : 0u, limit, &remote_status, &records, &record_len, &remote_body)) { amduatd_fed_cursor_record_free(&cursor); amduat_reference_free(&cursor_ref); amduatd_fed_pull_report_error(out_report, "remote fetch failed"); return AMDUATD_FED_PULL_APPLY_ERR_REMOTE; } out_report->remote_status = remote_status; if (remote_status != 200) { amduatd_fed_cursor_record_free(&cursor); amduat_reference_free(&cursor_ref); if (remote_body != NULL && remote_body[0] != '\0') { amduatd_fed_pull_report_error(out_report, remote_body); } else { amduatd_fed_pull_report_error(out_report, "remote error"); } free(remote_body); return AMDUATD_FED_PULL_APPLY_ERR_REMOTE; } free(remote_body); remote_body = NULL; record_len_total = record_len; if (record_len > limit) { record_len = (size_t)limit; } out_report->cursor_present = cursor_present; if (cursor_present && cursor.has_logseq) { out_report->cursor_has_logseq = true; out_report->cursor_logseq = cursor.last_logseq; } if (cursor_present) { if (amduat_reference_clone(cursor_ref, &out_report->cursor_ref)) { out_report->cursor_ref_set = true; } } if (!amduatd_fed_pull_plan_next_cursor_candidate(cursor_present ? &cursor : NULL, records, record_len, &candidate)) { if (records != NULL) { transport->free_records(transport->ctx, records, record_len_total); } amduatd_fed_cursor_record_free(&cursor); amduat_reference_free(&cursor_ref); amduatd_fed_pull_report_error(out_report, "plan candidate failed"); return AMDUATD_FED_PULL_APPLY_ERR_INVALID; } out_report->plan_record_count = record_len; out_report->plan_candidate = candidate; if (record_len == 0u) { if (records != NULL) { transport->free_records(transport->ctx, records, record_len_total); } amduatd_fed_cursor_record_free(&cursor); amduat_reference_free(&cursor_ref); return AMDUATD_FED_PULL_APPLY_OK; } { size_t err_index = 0; size_t conflict_index = 0; amduat_fed_ingest_error_t ingest_rc; ingest_rc = amduat_fed_ingest_validate(records, record_len, &err_index, &conflict_index); if (ingest_rc != AMDUAT_FED_INGEST_OK) { transport->free_records(transport->ctx, records, record_len_total); amduatd_fed_cursor_record_free(&cursor); amduat_reference_free(&cursor_ref); amduatd_fed_pull_report_error(out_report, "invalid record batch"); return AMDUATD_FED_PULL_APPLY_ERR_INVALID; } } if (cursor_present && cursor.has_logseq && records[0].logseq <= cursor.last_logseq) { transport->free_records(transport->ctx, records, record_len_total); amduatd_fed_cursor_record_free(&cursor); amduat_reference_free(&cursor_ref); amduatd_fed_pull_report_error(out_report, "cursor would move backwards"); return AMDUATD_FED_PULL_APPLY_ERR_INVALID; } for (i = 0; i < record_len; ++i) { int artifact_status = 0; if (i > 0 && records[i].logseq < records[i - 1].logseq) { transport->free_records(transport->ctx, records, record_len_total); amduatd_fed_cursor_record_free(&cursor); amduat_reference_free(&cursor_ref); amduatd_fed_pull_report_error(out_report, "record order invalid"); return AMDUATD_FED_PULL_APPLY_ERR_INVALID; } memset(err_buf, 0, sizeof(err_buf)); if (!amduatd_fed_pull_apply_record(store, transport, &records[i], &applied_artifacts, &artifact_status, err_buf, sizeof(err_buf))) { transport->free_records(transport->ctx, records, record_len_total); amduatd_fed_cursor_record_free(&cursor); amduat_reference_free(&cursor_ref); applied_records = i; out_report->applied_record_count = applied_records; out_report->applied_artifact_count = applied_artifacts; if (artifact_status != 0) { out_report->remote_status = artifact_status; } amduatd_fed_pull_report_error(out_report, err_buf); return AMDUATD_FED_PULL_APPLY_ERR_STORE; } applied_records++; } out_report->applied_record_count = applied_records; out_report->applied_artifact_count = applied_artifacts; amduatd_fed_cursor_record_init(&next_cursor); if (!amduatd_fed_pull_strdup(peer_key, &next_cursor.peer_key)) { transport->free_records(transport->ctx, records, record_len_total); amduatd_fed_cursor_record_free(&cursor); amduat_reference_free(&cursor_ref); amduatd_fed_cursor_record_free(&next_cursor); amduatd_fed_pull_report_error(out_report, "oom"); return AMDUATD_FED_PULL_APPLY_ERR_OOM; } if (next_cursor.peer_key == NULL) { transport->free_records(transport->ctx, records, record_len_total); amduatd_fed_cursor_record_free(&cursor); amduat_reference_free(&cursor_ref); amduatd_fed_pull_report_error(out_report, "oom"); return AMDUATD_FED_PULL_APPLY_ERR_OOM; } if (effective_space != NULL && effective_space->enabled && effective_space->space_id.data != NULL) { if (!amduatd_fed_pull_strdup( (const char *)effective_space->space_id.data, &next_cursor.space_id)) { transport->free_records(transport->ctx, records, record_len_total); amduatd_fed_cursor_record_free(&cursor); amduat_reference_free(&cursor_ref); amduatd_fed_cursor_record_free(&next_cursor); amduatd_fed_pull_report_error(out_report, "oom"); return AMDUATD_FED_PULL_APPLY_ERR_OOM; } if (next_cursor.space_id == NULL) { transport->free_records(transport->ctx, records, record_len_total); amduatd_fed_cursor_record_free(&cursor); amduat_reference_free(&cursor_ref); amduatd_fed_cursor_record_free(&next_cursor); amduatd_fed_pull_report_error(out_report, "oom"); return AMDUATD_FED_PULL_APPLY_ERR_OOM; } } if (candidate.has_logseq) { next_cursor.has_logseq = true; next_cursor.last_logseq = candidate.logseq; } if (candidate.has_ref) { next_cursor.has_record_ref = true; if (!amduat_reference_clone(candidate.ref, &next_cursor.last_record_ref)) { transport->free_records(transport->ctx, records, record_len_total); amduatd_fed_cursor_record_free(&cursor); amduat_reference_free(&cursor_ref); amduatd_fed_cursor_record_free(&next_cursor); amduatd_fed_pull_report_error(out_report, "oom"); return AMDUATD_FED_PULL_APPLY_ERR_OOM; } } memset(&next_ref, 0, sizeof(next_ref)); { amduatd_fed_cursor_status_t cursor_status; cursor_status = amduatd_fed_cursor_cas_set_remote(store, pointer_store, effective_space, peer_key, remote_space_id, cursor_present ? &cursor_ref : NULL, &next_cursor, &next_ref); if (cursor_status == AMDUATD_FED_CURSOR_ERR_CONFLICT) { transport->free_records(transport->ctx, records, record_len_total); amduatd_fed_cursor_record_free(&cursor); amduat_reference_free(&cursor_ref); amduatd_fed_cursor_record_free(&next_cursor); amduat_reference_free(&next_ref); amduatd_fed_pull_report_error(out_report, "cursor conflict"); return AMDUATD_FED_PULL_APPLY_ERR_CONFLICT; } if (cursor_status != AMDUATD_FED_CURSOR_OK) { transport->free_records(transport->ctx, records, record_len_total); amduatd_fed_cursor_record_free(&cursor); amduat_reference_free(&cursor_ref); amduatd_fed_cursor_record_free(&next_cursor); amduat_reference_free(&next_ref); amduatd_fed_pull_report_error(out_report, "cursor update failed"); return AMDUATD_FED_PULL_APPLY_ERR_STORE; } } out_report->cursor_advanced = true; if (candidate.has_logseq) { out_report->cursor_after_has_logseq = true; out_report->cursor_after_logseq = candidate.logseq; } if (amduat_reference_clone(next_ref, &out_report->cursor_after_ref)) { out_report->cursor_after_ref_set = true; } transport->free_records(transport->ctx, records, record_len_total); amduatd_fed_cursor_record_free(&cursor); amduat_reference_free(&cursor_ref); amduatd_fed_cursor_record_free(&next_cursor); amduat_reference_free(&next_ref); return AMDUATD_FED_PULL_APPLY_OK; }