354 lines
12 KiB
C
354 lines
12 KiB
C
#include "amduatd_fed_push_apply.h"
|
|
|
|
#include "amduat/asl/artifact_io.h"
|
|
#include "amduat/asl/store.h"
|
|
|
|
#include <stdio.h>
|
|
#include <stdlib.h>
|
|
#include <string.h>
|
|
|
|
static bool amduatd_fed_push_parse_u32(const char *s, uint32_t *out) {
|
|
char *end = NULL;
|
|
unsigned long val;
|
|
if (s == NULL || out == NULL || s[0] == '\0') {
|
|
return false;
|
|
}
|
|
val = strtoul(s, &end, 10);
|
|
if (end == s || *end != '\0' || val > UINT32_MAX) {
|
|
return false;
|
|
}
|
|
*out = (uint32_t)val;
|
|
return true;
|
|
}
|
|
|
|
static bool amduatd_fed_push_strdup(const char *s, char **out) {
|
|
size_t len;
|
|
char *buf;
|
|
if (out == NULL) {
|
|
return false;
|
|
}
|
|
*out = NULL;
|
|
if (s == NULL) {
|
|
return false;
|
|
}
|
|
len = strlen(s);
|
|
if (len > SIZE_MAX - 1u) {
|
|
return false;
|
|
}
|
|
buf = (char *)malloc(len + 1u);
|
|
if (buf == NULL) {
|
|
return false;
|
|
}
|
|
if (len != 0u) {
|
|
memcpy(buf, s, len);
|
|
}
|
|
buf[len] = '\0';
|
|
*out = buf;
|
|
return true;
|
|
}
|
|
|
|
static void amduatd_fed_push_report_error(
|
|
amduatd_fed_push_apply_report_t *report,
|
|
const char *msg) {
|
|
if (report == NULL || msg == NULL) {
|
|
return;
|
|
}
|
|
memset(report->error, 0, sizeof(report->error));
|
|
strncpy(report->error, msg, sizeof(report->error) - 1u);
|
|
}
|
|
|
|
static bool amduatd_fed_push_body_is_already_present(const char *body) {
|
|
if (body == NULL) {
|
|
return false;
|
|
}
|
|
return strstr(body, "\"status\":\"already_present\"") != NULL;
|
|
}
|
|
|
|
void amduatd_fed_push_apply_report_init(
|
|
amduatd_fed_push_apply_report_t *report) {
|
|
if (report == NULL) {
|
|
return;
|
|
}
|
|
memset(report, 0, sizeof(*report));
|
|
report->cursor_ref = amduat_reference(0u, amduat_octets(NULL, 0u));
|
|
report->cursor_after_ref = amduat_reference(0u, amduat_octets(NULL, 0u));
|
|
amduatd_fed_push_plan_candidate_init(&report->plan_candidate);
|
|
}
|
|
|
|
void amduatd_fed_push_apply_report_free(
|
|
amduatd_fed_push_apply_report_t *report) {
|
|
if (report == NULL) {
|
|
return;
|
|
}
|
|
if (report->cursor_ref_set) {
|
|
amduat_reference_free(&report->cursor_ref);
|
|
}
|
|
if (report->cursor_after_ref_set) {
|
|
amduat_reference_free(&report->cursor_after_ref);
|
|
}
|
|
amduatd_fed_push_plan_candidate_free(&report->plan_candidate);
|
|
memset(report, 0, sizeof(*report));
|
|
}
|
|
|
|
amduatd_fed_push_apply_status_t amduatd_fed_push_apply(
|
|
amduat_asl_store_t *store,
|
|
amduat_asl_pointer_store_t *pointer_store,
|
|
const amduatd_space_t *effective_space,
|
|
const char *peer_key,
|
|
const char *remote_space_id,
|
|
uint64_t limit,
|
|
const char *root_path,
|
|
const amduatd_fed_cfg_t *fed_cfg,
|
|
const amduatd_fed_push_transport_t *transport,
|
|
amduatd_fed_push_apply_report_t *out_report) {
|
|
amduatd_fed_push_plan_scan_t scan;
|
|
amduatd_fed_push_cursor_candidate_t candidate;
|
|
amduatd_fed_cursor_record_t next_cursor;
|
|
amduat_reference_t next_ref;
|
|
size_t i;
|
|
|
|
if (out_report == NULL) {
|
|
return AMDUATD_FED_PUSH_APPLY_ERR_INVALID;
|
|
}
|
|
amduatd_fed_push_apply_report_init(out_report);
|
|
out_report->peer_key = peer_key;
|
|
out_report->effective_space = effective_space;
|
|
out_report->limit = limit;
|
|
|
|
if (store == NULL || pointer_store == NULL || peer_key == NULL ||
|
|
fed_cfg == NULL || transport == NULL || root_path == NULL) {
|
|
amduatd_fed_push_report_error(out_report, "invalid inputs");
|
|
return AMDUATD_FED_PUSH_APPLY_ERR_INVALID;
|
|
}
|
|
if (!fed_cfg->enabled) {
|
|
amduatd_fed_push_report_error(out_report, "federation disabled");
|
|
return AMDUATD_FED_PUSH_APPLY_ERR_DISABLED;
|
|
}
|
|
if (store->ops.log_scan == NULL || store->ops.current_state == NULL) {
|
|
amduatd_fed_push_report_error(out_report, "requires index backend");
|
|
return AMDUATD_FED_PUSH_APPLY_ERR_UNSUPPORTED;
|
|
}
|
|
if (transport->post_ingest == NULL) {
|
|
amduatd_fed_push_report_error(out_report, "transport unavailable");
|
|
return AMDUATD_FED_PUSH_APPLY_ERR_UNSUPPORTED;
|
|
}
|
|
{
|
|
amduat_octets_t scoped = amduat_octets(NULL, 0u);
|
|
if (remote_space_id != NULL && remote_space_id[0] != '\0') {
|
|
if (!amduatd_fed_push_cursor_pointer_name_v2(effective_space,
|
|
peer_key,
|
|
remote_space_id,
|
|
&scoped)) {
|
|
amduatd_fed_push_report_error(out_report, "invalid peer");
|
|
return AMDUATD_FED_PUSH_APPLY_ERR_INVALID;
|
|
}
|
|
} else if (!amduatd_fed_push_cursor_pointer_name(effective_space,
|
|
peer_key,
|
|
&scoped)) {
|
|
amduatd_fed_push_report_error(out_report, "invalid peer");
|
|
return AMDUATD_FED_PUSH_APPLY_ERR_INVALID;
|
|
}
|
|
amduat_octets_free(&scoped);
|
|
}
|
|
{
|
|
uint32_t domain_id = 0u;
|
|
if (!amduatd_fed_push_parse_u32(peer_key, &domain_id)) {
|
|
amduatd_fed_push_report_error(out_report, "invalid peer");
|
|
return AMDUATD_FED_PUSH_APPLY_ERR_INVALID;
|
|
}
|
|
}
|
|
|
|
if (amduatd_fed_push_plan_scan(store,
|
|
pointer_store,
|
|
effective_space,
|
|
peer_key,
|
|
remote_space_id,
|
|
limit,
|
|
root_path,
|
|
&scan) != AMDUATD_FED_PUSH_PLAN_OK) {
|
|
amduatd_fed_push_plan_scan_free(&scan);
|
|
amduatd_fed_push_report_error(out_report, "plan scan failed");
|
|
return AMDUATD_FED_PUSH_APPLY_ERR_STORE;
|
|
}
|
|
|
|
out_report->cursor_present = scan.cursor_present;
|
|
if (scan.cursor_present) {
|
|
out_report->cursor_has_logseq = scan.cursor.has_logseq;
|
|
out_report->cursor_logseq = scan.cursor.last_logseq;
|
|
if (scan.cursor_ref.digest.data != NULL) {
|
|
if (!amduat_reference_clone(scan.cursor_ref, &out_report->cursor_ref)) {
|
|
amduatd_fed_push_plan_scan_free(&scan);
|
|
amduatd_fed_push_report_error(out_report, "oom");
|
|
return AMDUATD_FED_PUSH_APPLY_ERR_OOM;
|
|
}
|
|
out_report->cursor_ref_set = true;
|
|
}
|
|
}
|
|
|
|
if (!amduatd_fed_push_plan_next_cursor_candidate(scan.cursor_present
|
|
? &scan.cursor
|
|
: NULL,
|
|
scan.records,
|
|
scan.record_count,
|
|
&candidate)) {
|
|
amduatd_fed_push_plan_scan_free(&scan);
|
|
amduatd_fed_push_report_error(out_report, "oom");
|
|
return AMDUATD_FED_PUSH_APPLY_ERR_OOM;
|
|
}
|
|
out_report->plan_record_count = scan.record_count;
|
|
out_report->plan_candidate = candidate;
|
|
|
|
if (scan.record_count == 0u) {
|
|
amduatd_fed_push_plan_scan_free(&scan);
|
|
return AMDUATD_FED_PUSH_APPLY_OK;
|
|
}
|
|
|
|
for (i = 0; i < scan.record_count; ++i) {
|
|
const amduat_fed_record_t *rec = &scan.records[i];
|
|
amduat_octets_t bytes = amduat_octets(NULL, 0u);
|
|
amduat_artifact_t artifact;
|
|
amduat_asl_store_error_t store_err;
|
|
int status = 0;
|
|
char *body = NULL;
|
|
bool already_present = false;
|
|
|
|
if (rec->id.type != AMDUAT_FED_REC_TOMBSTONE) {
|
|
memset(&artifact, 0, sizeof(artifact));
|
|
store_err = amduat_asl_store_get(store, rec->id.ref, &artifact);
|
|
if (store_err != AMDUAT_ASL_STORE_OK) {
|
|
amduatd_fed_push_plan_scan_free(&scan);
|
|
amduatd_fed_push_report_error(out_report, "artifact missing");
|
|
return AMDUATD_FED_PUSH_APPLY_ERR_STORE;
|
|
}
|
|
bytes = artifact.bytes;
|
|
out_report->sent_bytes_total += bytes.len;
|
|
}
|
|
|
|
if (!transport->post_ingest(transport->ctx,
|
|
rec->id.type,
|
|
rec->id.ref,
|
|
bytes,
|
|
&status,
|
|
&body)) {
|
|
if (rec->id.type != AMDUAT_FED_REC_TOMBSTONE) {
|
|
amduat_asl_artifact_free(&artifact);
|
|
}
|
|
free(body);
|
|
amduatd_fed_push_plan_scan_free(&scan);
|
|
amduatd_fed_push_report_error(out_report, "ingest failed");
|
|
return AMDUATD_FED_PUSH_APPLY_ERR_REMOTE;
|
|
}
|
|
if (rec->id.type != AMDUAT_FED_REC_TOMBSTONE) {
|
|
amduat_asl_artifact_free(&artifact);
|
|
}
|
|
|
|
out_report->sent_record_count++;
|
|
if (rec->id.type == AMDUAT_FED_REC_ARTIFACT) {
|
|
out_report->sent_artifact_count++;
|
|
} else if (rec->id.type == AMDUAT_FED_REC_PER) {
|
|
out_report->sent_per_count++;
|
|
} else if (rec->id.type == AMDUAT_FED_REC_TGK_EDGE) {
|
|
out_report->sent_tgk_edge_count++;
|
|
} else if (rec->id.type == AMDUAT_FED_REC_TOMBSTONE) {
|
|
out_report->sent_tombstone_count++;
|
|
}
|
|
|
|
if (status != 200) {
|
|
out_report->remote_status = status;
|
|
free(body);
|
|
amduatd_fed_push_plan_scan_free(&scan);
|
|
amduatd_fed_push_report_error(out_report, "peer error");
|
|
return AMDUATD_FED_PUSH_APPLY_ERR_REMOTE;
|
|
}
|
|
|
|
already_present = amduatd_fed_push_body_is_already_present(body);
|
|
free(body);
|
|
if (already_present) {
|
|
out_report->peer_already_present_count++;
|
|
} else {
|
|
out_report->peer_ok_count++;
|
|
}
|
|
}
|
|
|
|
amduatd_fed_cursor_record_init(&next_cursor);
|
|
if (!amduatd_fed_push_strdup(peer_key, &next_cursor.peer_key)) {
|
|
amduatd_fed_cursor_record_free(&next_cursor);
|
|
amduatd_fed_push_plan_scan_free(&scan);
|
|
amduatd_fed_push_report_error(out_report, "oom");
|
|
return AMDUATD_FED_PUSH_APPLY_ERR_OOM;
|
|
}
|
|
if (effective_space != NULL && effective_space->enabled &&
|
|
effective_space->space_id.data != NULL) {
|
|
const char *space_id = (const char *)effective_space->space_id.data;
|
|
if (!amduatd_fed_push_strdup(space_id, &next_cursor.space_id)) {
|
|
amduatd_fed_cursor_record_free(&next_cursor);
|
|
amduatd_fed_push_plan_scan_free(&scan);
|
|
amduatd_fed_push_report_error(out_report, "oom");
|
|
return AMDUATD_FED_PUSH_APPLY_ERR_OOM;
|
|
}
|
|
} else {
|
|
next_cursor.space_id = NULL;
|
|
}
|
|
if (out_report->plan_candidate.has_logseq) {
|
|
next_cursor.has_logseq = true;
|
|
next_cursor.last_logseq = out_report->plan_candidate.logseq;
|
|
}
|
|
if (out_report->plan_candidate.has_ref) {
|
|
next_cursor.has_record_ref = true;
|
|
if (!amduat_reference_clone(out_report->plan_candidate.ref,
|
|
&next_cursor.last_record_ref)) {
|
|
amduatd_fed_cursor_record_free(&next_cursor);
|
|
amduatd_fed_push_plan_scan_free(&scan);
|
|
amduatd_fed_push_report_error(out_report, "oom");
|
|
return AMDUATD_FED_PUSH_APPLY_ERR_OOM;
|
|
}
|
|
}
|
|
if (!next_cursor.has_logseq && !next_cursor.has_record_ref) {
|
|
amduatd_fed_cursor_record_free(&next_cursor);
|
|
amduatd_fed_push_plan_scan_free(&scan);
|
|
amduatd_fed_push_report_error(out_report, "invalid cursor");
|
|
return AMDUATD_FED_PUSH_APPLY_ERR_INVALID;
|
|
}
|
|
|
|
memset(&next_ref, 0, sizeof(next_ref));
|
|
{
|
|
amduatd_fed_cursor_status_t st;
|
|
st = amduatd_fed_push_cursor_cas_set_remote(store,
|
|
pointer_store,
|
|
effective_space,
|
|
peer_key,
|
|
remote_space_id,
|
|
scan.cursor_present
|
|
? &scan.cursor_ref
|
|
: NULL,
|
|
&next_cursor,
|
|
&next_ref);
|
|
amduatd_fed_cursor_record_free(&next_cursor);
|
|
if (st == AMDUATD_FED_CURSOR_ERR_CONFLICT) {
|
|
amduatd_fed_push_plan_scan_free(&scan);
|
|
amduatd_fed_push_report_error(out_report, "cursor conflict");
|
|
return AMDUATD_FED_PUSH_APPLY_ERR_CONFLICT;
|
|
}
|
|
if (st != AMDUATD_FED_CURSOR_OK) {
|
|
amduatd_fed_push_plan_scan_free(&scan);
|
|
amduatd_fed_push_report_error(out_report, "cursor update failed");
|
|
return AMDUATD_FED_PUSH_APPLY_ERR_STORE;
|
|
}
|
|
}
|
|
|
|
out_report->cursor_advanced = true;
|
|
if (out_report->plan_candidate.has_logseq) {
|
|
out_report->cursor_after_has_logseq = true;
|
|
out_report->cursor_after_logseq = out_report->plan_candidate.logseq;
|
|
}
|
|
if (next_ref.digest.data != NULL) {
|
|
out_report->cursor_after_ref_set = true;
|
|
out_report->cursor_after_ref = next_ref;
|
|
} else {
|
|
amduat_reference_free(&next_ref);
|
|
}
|
|
|
|
amduatd_fed_push_plan_scan_free(&scan);
|
|
return AMDUATD_FED_PUSH_APPLY_OK;
|
|
}
|