Add bounded fed pull/push until endpoints with tests
This commit is contained in:
parent
67c837be3c
commit
f99ec3ee89
|
|
@ -28,6 +28,7 @@ set(amduatd_sources src/amduatd.c src/amduatd_http.c src/amduatd_caps.c
|
||||||
src/amduatd_space.c src/amduatd_concepts.c
|
src/amduatd_space.c src/amduatd_concepts.c
|
||||||
src/amduatd_store.c src/amduatd_derivation_index.c
|
src/amduatd_store.c src/amduatd_derivation_index.c
|
||||||
src/amduatd_fed.c src/amduatd_fed_cursor.c
|
src/amduatd_fed.c src/amduatd_fed_cursor.c
|
||||||
|
src/amduatd_fed_until.c
|
||||||
src/amduatd_fed_pull_plan.c src/amduatd_fed_push_plan.c
|
src/amduatd_fed_pull_plan.c src/amduatd_fed_push_plan.c
|
||||||
src/amduatd_fed_pull_apply.c src/amduatd_fed_push_apply.c
|
src/amduatd_fed_pull_apply.c src/amduatd_fed_push_apply.c
|
||||||
src/amduatd_space_doctor.c src/amduatd_space_roots.c)
|
src/amduatd_space_doctor.c src/amduatd_space_roots.c)
|
||||||
|
|
@ -228,6 +229,62 @@ target_link_libraries(amduatd_test_fed_push_apply
|
||||||
|
|
||||||
add_test(NAME amduatd_fed_push_apply COMMAND amduatd_test_fed_push_apply)
|
add_test(NAME amduatd_fed_push_apply COMMAND amduatd_test_fed_push_apply)
|
||||||
|
|
||||||
|
add_executable(amduatd_test_fed_pull_until
|
||||||
|
tests/test_amduatd_fed_pull_until.c
|
||||||
|
src/amduatd_fed_until.c
|
||||||
|
src/amduatd_fed_pull_apply.c
|
||||||
|
src/amduatd_fed_pull_plan.c
|
||||||
|
src/amduatd_fed_push_apply.c
|
||||||
|
src/amduatd_fed_push_plan.c
|
||||||
|
src/amduatd_fed_cursor.c
|
||||||
|
src/amduatd_fed.c
|
||||||
|
src/amduatd_space.c
|
||||||
|
src/amduatd_store.c
|
||||||
|
)
|
||||||
|
|
||||||
|
target_include_directories(amduatd_test_fed_pull_until
|
||||||
|
PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}
|
||||||
|
PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}/src
|
||||||
|
PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}/vendor/amduat/include
|
||||||
|
)
|
||||||
|
|
||||||
|
target_link_libraries(amduatd_test_fed_pull_until
|
||||||
|
PRIVATE amduat_asl_store_fs amduat_asl_store_index_fs amduat_asl_record
|
||||||
|
amduat_asl amduat_enc amduat_asl_pointer_fs amduat_util
|
||||||
|
amduat_asl_log_store
|
||||||
|
amduat_hash_asl1 amduat_fed
|
||||||
|
)
|
||||||
|
|
||||||
|
add_test(NAME amduatd_fed_pull_until COMMAND amduatd_test_fed_pull_until)
|
||||||
|
|
||||||
|
add_executable(amduatd_test_fed_push_until
|
||||||
|
tests/test_amduatd_fed_push_until.c
|
||||||
|
src/amduatd_fed_until.c
|
||||||
|
src/amduatd_fed_pull_apply.c
|
||||||
|
src/amduatd_fed_pull_plan.c
|
||||||
|
src/amduatd_fed_push_apply.c
|
||||||
|
src/amduatd_fed_push_plan.c
|
||||||
|
src/amduatd_fed_cursor.c
|
||||||
|
src/amduatd_fed.c
|
||||||
|
src/amduatd_space.c
|
||||||
|
src/amduatd_store.c
|
||||||
|
)
|
||||||
|
|
||||||
|
target_include_directories(amduatd_test_fed_push_until
|
||||||
|
PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}
|
||||||
|
PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}/src
|
||||||
|
PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}/vendor/amduat/include
|
||||||
|
)
|
||||||
|
|
||||||
|
target_link_libraries(amduatd_test_fed_push_until
|
||||||
|
PRIVATE amduat_asl_store_fs amduat_asl_store_index_fs amduat_asl_record
|
||||||
|
amduat_asl amduat_enc amduat_asl_pointer_fs amduat_util
|
||||||
|
amduat_asl_log_store
|
||||||
|
amduat_hash_asl1 amduat_fed
|
||||||
|
)
|
||||||
|
|
||||||
|
add_test(NAME amduatd_fed_push_until COMMAND amduatd_test_fed_push_until)
|
||||||
|
|
||||||
add_executable(amduatd_test_fed_pull_apply
|
add_executable(amduatd_test_fed_pull_apply
|
||||||
tests/test_amduatd_fed_pull_apply.c
|
tests/test_amduatd_fed_pull_apply.c
|
||||||
src/amduatd_fed_pull_apply.c
|
src/amduatd_fed_pull_apply.c
|
||||||
|
|
|
||||||
17
README.md
17
README.md
|
|
@ -160,6 +160,23 @@ curl --unix-socket amduatd.sock -X POST \
|
||||||
`/v1/fed/push` uses `/v1/fed/ingest` on the peer and only advances the push
|
`/v1/fed/push` uses `/v1/fed/ingest` on the peer and only advances the push
|
||||||
cursor after the batch completes. It requires the index backend.
|
cursor after the batch completes. It requires the index backend.
|
||||||
|
|
||||||
|
### Federation sync until caught up
|
||||||
|
|
||||||
|
Use the bounded helpers to repeatedly apply pull/push until no work remains,
|
||||||
|
with a hard `max_rounds` cap to keep requests bounded:
|
||||||
|
|
||||||
|
```sh
|
||||||
|
curl --unix-socket amduatd.sock -X POST \
|
||||||
|
'http://localhost/v1/fed/pull/until?peer=2&limit=128&max_rounds=10' \
|
||||||
|
-H 'X-Amduat-Space: demo'
|
||||||
|
```
|
||||||
|
|
||||||
|
```sh
|
||||||
|
curl --unix-socket amduatd.sock -X POST \
|
||||||
|
'http://localhost/v1/fed/push/until?peer=2&limit=128&max_rounds=10' \
|
||||||
|
-H 'X-Amduat-Space: demo'
|
||||||
|
```
|
||||||
|
|
||||||
### Federation ingest (receiver)
|
### Federation ingest (receiver)
|
||||||
|
|
||||||
`/v1/fed/ingest` applies a single incoming record (push receiver). The request
|
`/v1/fed/ingest` applies a single incoming record (push receiver). The request
|
||||||
|
|
|
||||||
|
|
@ -14,6 +14,8 @@
|
||||||
{"method": "GET", "path": "/v1/fed/records"},
|
{"method": "GET", "path": "/v1/fed/records"},
|
||||||
{"method": "GET", "path": "/v1/fed/artifacts/{ref}"},
|
{"method": "GET", "path": "/v1/fed/artifacts/{ref}"},
|
||||||
{"method": "GET", "path": "/v1/fed/status"},
|
{"method": "GET", "path": "/v1/fed/status"},
|
||||||
|
{"method": "POST", "path": "/v1/fed/pull/until"},
|
||||||
|
{"method": "POST", "path": "/v1/fed/push/until"},
|
||||||
{"method": "POST", "path": "/v1/concepts"},
|
{"method": "POST", "path": "/v1/concepts"},
|
||||||
{"method": "GET", "path": "/v1/concepts"},
|
{"method": "GET", "path": "/v1/concepts"},
|
||||||
{"method": "GET", "path": "/v1/concepts/{name}"},
|
{"method": "GET", "path": "/v1/concepts/{name}"},
|
||||||
|
|
|
||||||
630
src/amduatd.c
630
src/amduatd.c
|
|
@ -46,6 +46,7 @@
|
||||||
#include "amduatd_fed_push_plan.h"
|
#include "amduatd_fed_push_plan.h"
|
||||||
#include "amduatd_fed_push_apply.h"
|
#include "amduatd_fed_push_apply.h"
|
||||||
#include "amduatd_fed_pull_apply.h"
|
#include "amduatd_fed_pull_apply.h"
|
||||||
|
#include "amduatd_fed_until.h"
|
||||||
#include "amduatd_store.h"
|
#include "amduatd_store.h"
|
||||||
#include "amduatd_derivation_index.h"
|
#include "amduatd_derivation_index.h"
|
||||||
#include "amduatd_space_doctor.h"
|
#include "amduatd_space_doctor.h"
|
||||||
|
|
@ -133,7 +134,9 @@ static const char k_amduatd_contract_v1_json[] =
|
||||||
"{\"method\":\"GET\",\"path\":\"/v1/fed/pull/plan\"},"
|
"{\"method\":\"GET\",\"path\":\"/v1/fed/pull/plan\"},"
|
||||||
"{\"method\":\"GET\",\"path\":\"/v1/fed/push/plan\"},"
|
"{\"method\":\"GET\",\"path\":\"/v1/fed/push/plan\"},"
|
||||||
"{\"method\":\"POST\",\"path\":\"/v1/fed/pull\"},"
|
"{\"method\":\"POST\",\"path\":\"/v1/fed/pull\"},"
|
||||||
|
"{\"method\":\"POST\",\"path\":\"/v1/fed/pull/until\"},"
|
||||||
"{\"method\":\"POST\",\"path\":\"/v1/fed/push\"},"
|
"{\"method\":\"POST\",\"path\":\"/v1/fed/push\"},"
|
||||||
|
"{\"method\":\"POST\",\"path\":\"/v1/fed/push/until\"},"
|
||||||
"{\"method\":\"POST\",\"path\":\"/v1/fed/pull\"},"
|
"{\"method\":\"POST\",\"path\":\"/v1/fed/pull\"},"
|
||||||
"{\"method\":\"POST\",\"path\":\"/v1/fed/ingest\"},"
|
"{\"method\":\"POST\",\"path\":\"/v1/fed/ingest\"},"
|
||||||
"{\"method\":\"GET\",\"path\":\"/v1/fed/artifacts/{ref}\"},"
|
"{\"method\":\"GET\",\"path\":\"/v1/fed/artifacts/{ref}\"},"
|
||||||
|
|
@ -5250,6 +5253,345 @@ fed_pull_cleanup:
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static void amduatd_fed_pull_apply_status_http(
|
||||||
|
amduatd_fed_pull_apply_status_t status,
|
||||||
|
int *out_status,
|
||||||
|
const char **out_reason) {
|
||||||
|
int http_status = 200;
|
||||||
|
const char *http_reason = "OK";
|
||||||
|
|
||||||
|
if (status == AMDUATD_FED_PULL_APPLY_ERR_DISABLED) {
|
||||||
|
http_status = 503;
|
||||||
|
http_reason = "Service Unavailable";
|
||||||
|
} else if (status == AMDUATD_FED_PULL_APPLY_ERR_UNSUPPORTED) {
|
||||||
|
http_status = 501;
|
||||||
|
http_reason = "Not Implemented";
|
||||||
|
} else if (status == AMDUATD_FED_PULL_APPLY_ERR_INVALID) {
|
||||||
|
http_status = 400;
|
||||||
|
http_reason = "Bad Request";
|
||||||
|
} else if (status == AMDUATD_FED_PULL_APPLY_ERR_REMOTE) {
|
||||||
|
http_status = 502;
|
||||||
|
http_reason = "Bad Gateway";
|
||||||
|
} else if (status == AMDUATD_FED_PULL_APPLY_ERR_CONFLICT) {
|
||||||
|
http_status = 409;
|
||||||
|
http_reason = "Conflict";
|
||||||
|
} else if (status != AMDUATD_FED_PULL_APPLY_OK) {
|
||||||
|
http_status = 500;
|
||||||
|
http_reason = "Internal Server Error";
|
||||||
|
}
|
||||||
|
|
||||||
|
if (out_status != NULL) {
|
||||||
|
*out_status = http_status;
|
||||||
|
}
|
||||||
|
if (out_reason != NULL) {
|
||||||
|
*out_reason = http_reason;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static void amduatd_fed_push_apply_status_http(
|
||||||
|
amduatd_fed_push_apply_status_t status,
|
||||||
|
int *out_status,
|
||||||
|
const char **out_reason) {
|
||||||
|
int http_status = 200;
|
||||||
|
const char *http_reason = "OK";
|
||||||
|
|
||||||
|
if (status == AMDUATD_FED_PUSH_APPLY_ERR_DISABLED) {
|
||||||
|
http_status = 503;
|
||||||
|
http_reason = "Service Unavailable";
|
||||||
|
} else if (status == AMDUATD_FED_PUSH_APPLY_ERR_UNSUPPORTED) {
|
||||||
|
http_status = 501;
|
||||||
|
http_reason = "Not Implemented";
|
||||||
|
} else if (status == AMDUATD_FED_PUSH_APPLY_ERR_INVALID) {
|
||||||
|
http_status = 400;
|
||||||
|
http_reason = "Bad Request";
|
||||||
|
} else if (status == AMDUATD_FED_PUSH_APPLY_ERR_REMOTE) {
|
||||||
|
http_status = 502;
|
||||||
|
http_reason = "Bad Gateway";
|
||||||
|
} else if (status == AMDUATD_FED_PUSH_APPLY_ERR_CONFLICT) {
|
||||||
|
http_status = 409;
|
||||||
|
http_reason = "Conflict";
|
||||||
|
} else if (status != AMDUATD_FED_PUSH_APPLY_OK) {
|
||||||
|
http_status = 500;
|
||||||
|
http_reason = "Internal Server Error";
|
||||||
|
}
|
||||||
|
|
||||||
|
if (out_status != NULL) {
|
||||||
|
*out_status = http_status;
|
||||||
|
}
|
||||||
|
if (out_reason != NULL) {
|
||||||
|
*out_reason = http_reason;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static bool amduatd_handle_post_fed_pull_until(
|
||||||
|
int fd,
|
||||||
|
amduat_asl_store_t *store,
|
||||||
|
const amduatd_fed_cfg_t *fed_cfg,
|
||||||
|
const amduatd_caps_t *caps,
|
||||||
|
const amduatd_cfg_t *dcfg,
|
||||||
|
const amduatd_http_req_t *req,
|
||||||
|
const char *root_path) {
|
||||||
|
char peer_buf[AMDUAT_ASL_POINTER_NAME_MAX + 1u];
|
||||||
|
char limit_buf[32];
|
||||||
|
char rounds_buf[32];
|
||||||
|
uint64_t limit = 128u;
|
||||||
|
uint64_t max_rounds = 10u;
|
||||||
|
amduat_asl_pointer_store_t pointer_store;
|
||||||
|
amduat_fed_transport_unix_t transport;
|
||||||
|
amduat_fed_transport_t ops;
|
||||||
|
amduatd_fed_pull_transport_t pull_transport;
|
||||||
|
amduatd_fed_until_report_t report;
|
||||||
|
amduatd_fed_pull_apply_status_t status;
|
||||||
|
amduatd_strbuf_t b;
|
||||||
|
char *cursor_ref_hex = NULL;
|
||||||
|
bool ok = false;
|
||||||
|
int http_status = 200;
|
||||||
|
const char *http_reason = "OK";
|
||||||
|
|
||||||
|
if (store == NULL || fed_cfg == NULL || req == NULL || root_path == NULL ||
|
||||||
|
dcfg == NULL) {
|
||||||
|
return amduatd_send_json_error(fd, 500, "Internal Server Error",
|
||||||
|
"internal error");
|
||||||
|
}
|
||||||
|
if (!amduatd_fed_require_space(fd, fed_cfg, req)) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
{
|
||||||
|
amduatd_fed_pull_plan_status_t check =
|
||||||
|
amduatd_fed_pull_plan_check(fed_cfg, store);
|
||||||
|
if (check == AMDUATD_FED_PULL_PLAN_ERR_DISABLED) {
|
||||||
|
return amduatd_send_json_error(fd, 503, "Service Unavailable",
|
||||||
|
"federation disabled");
|
||||||
|
}
|
||||||
|
if (check == AMDUATD_FED_PULL_PLAN_ERR_UNSUPPORTED) {
|
||||||
|
return amduatd_send_json_error(fd, 501, "Not Implemented",
|
||||||
|
"requires index backend");
|
||||||
|
}
|
||||||
|
if (check != AMDUATD_FED_PULL_PLAN_OK) {
|
||||||
|
return amduatd_send_json_error(fd, 500, "Internal Server Error",
|
||||||
|
"plan unavailable");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (caps != NULL && caps->enabled && req->x_capability[0] != '\0') {
|
||||||
|
const char *reason = NULL;
|
||||||
|
if (!amduatd_caps_check_space(caps, dcfg, req, &reason)) {
|
||||||
|
if (reason != NULL && strcmp(reason, "wrong-space") == 0) {
|
||||||
|
return amduatd_send_json_error(fd, 403, "Forbidden",
|
||||||
|
"space not permitted by capability");
|
||||||
|
}
|
||||||
|
return amduatd_send_json_error(fd, 403, "Forbidden",
|
||||||
|
"invalid capability");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (amduatd_query_param(req->path, "peer",
|
||||||
|
peer_buf, sizeof(peer_buf)) == NULL ||
|
||||||
|
peer_buf[0] == '\0') {
|
||||||
|
return amduatd_send_json_error(fd, 400, "Bad Request",
|
||||||
|
"missing peer");
|
||||||
|
}
|
||||||
|
{
|
||||||
|
amduat_octets_t scoped = amduat_octets(NULL, 0u);
|
||||||
|
if (!amduatd_fed_cursor_pointer_name(req->effective_space,
|
||||||
|
peer_buf,
|
||||||
|
&scoped)) {
|
||||||
|
return amduatd_send_json_error(fd, 400, "Bad Request", "invalid peer");
|
||||||
|
}
|
||||||
|
amduat_octets_free(&scoped);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (amduatd_query_param(req->path, "limit",
|
||||||
|
limit_buf, sizeof(limit_buf)) != NULL) {
|
||||||
|
if (!amduatd_parse_u64_str(limit_buf, &limit) || limit == 0u ||
|
||||||
|
limit > 10000u) {
|
||||||
|
return amduatd_send_json_error(fd, 400, "Bad Request", "invalid limit");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (amduatd_query_param(req->path, "max_rounds",
|
||||||
|
rounds_buf, sizeof(rounds_buf)) != NULL) {
|
||||||
|
if (!amduatd_parse_u64_str(rounds_buf, &max_rounds) || max_rounds == 0u ||
|
||||||
|
max_rounds > 100u) {
|
||||||
|
return amduatd_send_json_error(fd, 400, "Bad Request",
|
||||||
|
"invalid max_rounds");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!amduat_asl_pointer_store_init(&pointer_store, root_path)) {
|
||||||
|
return amduatd_send_json_error(fd, 500, "Internal Server Error",
|
||||||
|
"pointer store error");
|
||||||
|
}
|
||||||
|
|
||||||
|
if (fed_cfg->transport_kind != AMDUATD_FED_TRANSPORT_UNIX ||
|
||||||
|
!fed_cfg->unix_socket_set) {
|
||||||
|
return amduatd_send_json_error(fd, 501, "Not Implemented",
|
||||||
|
"federation transport unavailable");
|
||||||
|
}
|
||||||
|
if (!amduat_fed_transport_unix_init(&transport, fed_cfg->unix_socket_path)) {
|
||||||
|
return amduatd_send_json_error(fd, 500, "Internal Server Error",
|
||||||
|
"transport init failed");
|
||||||
|
}
|
||||||
|
if (req->effective_space != NULL && req->effective_space->enabled &&
|
||||||
|
req->effective_space->space_id.data != NULL) {
|
||||||
|
(void)amduat_fed_transport_unix_set_space(
|
||||||
|
&transport, (const char *)req->effective_space->space_id.data);
|
||||||
|
}
|
||||||
|
|
||||||
|
ops = amduat_fed_transport_unix_ops(&transport);
|
||||||
|
memset(&pull_transport, 0, sizeof(pull_transport));
|
||||||
|
pull_transport.ctx = &transport;
|
||||||
|
pull_transport.get_records = amduatd_fed_pull_unix_get_records;
|
||||||
|
pull_transport.free_records = ops.free_records;
|
||||||
|
pull_transport.get_artifact = amduatd_fed_pull_unix_get_artifact;
|
||||||
|
|
||||||
|
status = amduatd_fed_pull_until(store,
|
||||||
|
&pointer_store,
|
||||||
|
req->effective_space,
|
||||||
|
peer_buf,
|
||||||
|
limit,
|
||||||
|
max_rounds,
|
||||||
|
fed_cfg,
|
||||||
|
&pull_transport,
|
||||||
|
&report);
|
||||||
|
amduatd_fed_pull_apply_status_http(status, &http_status, &http_reason);
|
||||||
|
if (status != AMDUATD_FED_PULL_APPLY_OK) {
|
||||||
|
const char *msg = report.error[0] != '\0' ? report.error : "error";
|
||||||
|
amduatd_fed_until_report_free(&report);
|
||||||
|
return amduatd_send_json_error(fd, http_status, http_reason, msg);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (report.cursor_ref_set) {
|
||||||
|
(void)amduat_asl_ref_encode_hex(report.cursor_ref, &cursor_ref_hex);
|
||||||
|
}
|
||||||
|
|
||||||
|
memset(&b, 0, sizeof(b));
|
||||||
|
if (!amduatd_strbuf_append_cstr(&b, "{")) {
|
||||||
|
goto fed_pull_until_cleanup;
|
||||||
|
}
|
||||||
|
if (!amduatd_strbuf_append_cstr(&b, "\"peer\":\"") ||
|
||||||
|
!amduatd_strbuf_append_cstr(&b, peer_buf) ||
|
||||||
|
!amduatd_strbuf_append_cstr(&b, "\",")) {
|
||||||
|
goto fed_pull_until_cleanup;
|
||||||
|
}
|
||||||
|
if (!amduatd_strbuf_append_cstr(&b, "\"effective_space\":{")) {
|
||||||
|
goto fed_pull_until_cleanup;
|
||||||
|
}
|
||||||
|
if (req->effective_space != NULL && req->effective_space->enabled &&
|
||||||
|
req->effective_space->space_id.data != NULL) {
|
||||||
|
const char *space_id = (const char *)req->effective_space->space_id.data;
|
||||||
|
if (!amduatd_strbuf_append_cstr(&b, "\"mode\":\"scoped\",") ||
|
||||||
|
!amduatd_strbuf_append_cstr(&b, "\"space_id\":\"") ||
|
||||||
|
!amduatd_strbuf_append_cstr(&b, space_id) ||
|
||||||
|
!amduatd_strbuf_append_cstr(&b, "\"")) {
|
||||||
|
goto fed_pull_until_cleanup;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
if (!amduatd_strbuf_append_cstr(&b, "\"mode\":\"unscoped\",") ||
|
||||||
|
!amduatd_strbuf_append_cstr(&b, "\"space_id\":null")) {
|
||||||
|
goto fed_pull_until_cleanup;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (!amduatd_strbuf_append_cstr(&b, "},")) {
|
||||||
|
goto fed_pull_until_cleanup;
|
||||||
|
}
|
||||||
|
if (!amduatd_strbuf_append_cstr(&b, "\"operation\":\"pull\",")) {
|
||||||
|
goto fed_pull_until_cleanup;
|
||||||
|
}
|
||||||
|
{
|
||||||
|
char tmp[64];
|
||||||
|
snprintf(tmp, sizeof(tmp), "%llu", (unsigned long long)limit);
|
||||||
|
if (!amduatd_strbuf_append_cstr(&b, "\"limit\":") ||
|
||||||
|
!amduatd_strbuf_append_cstr(&b, tmp) ||
|
||||||
|
!amduatd_strbuf_append_cstr(&b, ",")) {
|
||||||
|
goto fed_pull_until_cleanup;
|
||||||
|
}
|
||||||
|
snprintf(tmp, sizeof(tmp), "%llu", (unsigned long long)max_rounds);
|
||||||
|
if (!amduatd_strbuf_append_cstr(&b, "\"max_rounds\":") ||
|
||||||
|
!amduatd_strbuf_append_cstr(&b, tmp) ||
|
||||||
|
!amduatd_strbuf_append_cstr(&b, ",")) {
|
||||||
|
goto fed_pull_until_cleanup;
|
||||||
|
}
|
||||||
|
snprintf(tmp, sizeof(tmp), "%llu",
|
||||||
|
(unsigned long long)report.rounds_executed);
|
||||||
|
if (!amduatd_strbuf_append_cstr(&b, "\"rounds_executed\":") ||
|
||||||
|
!amduatd_strbuf_append_cstr(&b, tmp) ||
|
||||||
|
!amduatd_strbuf_append_cstr(&b, ",")) {
|
||||||
|
goto fed_pull_until_cleanup;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (!amduatd_strbuf_append_cstr(&b, "\"caught_up\":") ||
|
||||||
|
!amduatd_strbuf_append_cstr(&b, report.caught_up ? "true" : "false") ||
|
||||||
|
!amduatd_strbuf_append_cstr(&b, ",")) {
|
||||||
|
goto fed_pull_until_cleanup;
|
||||||
|
}
|
||||||
|
if (!amduatd_strbuf_append_cstr(&b, "\"applied\":{")) {
|
||||||
|
goto fed_pull_until_cleanup;
|
||||||
|
}
|
||||||
|
{
|
||||||
|
char tmp[64];
|
||||||
|
snprintf(tmp, sizeof(tmp), "%zu", report.total_records);
|
||||||
|
if (!amduatd_strbuf_append_cstr(&b, "\"records\":") ||
|
||||||
|
!amduatd_strbuf_append_cstr(&b, tmp) ||
|
||||||
|
!amduatd_strbuf_append_cstr(&b, ",")) {
|
||||||
|
goto fed_pull_until_cleanup;
|
||||||
|
}
|
||||||
|
snprintf(tmp, sizeof(tmp), "%zu", report.total_artifacts);
|
||||||
|
if (!amduatd_strbuf_append_cstr(&b, "\"artifacts\":") ||
|
||||||
|
!amduatd_strbuf_append_cstr(&b, tmp) ||
|
||||||
|
!amduatd_strbuf_append_cstr(&b, "},")) {
|
||||||
|
goto fed_pull_until_cleanup;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (!amduatd_strbuf_append_cstr(&b, "\"cursor\":{")) {
|
||||||
|
goto fed_pull_until_cleanup;
|
||||||
|
}
|
||||||
|
if (!amduatd_strbuf_append_cstr(&b, "\"last_logseq\":")) {
|
||||||
|
goto fed_pull_until_cleanup;
|
||||||
|
}
|
||||||
|
if (report.cursor_has_logseq) {
|
||||||
|
char tmp[64];
|
||||||
|
snprintf(tmp, sizeof(tmp), "%llu",
|
||||||
|
(unsigned long long)report.cursor_logseq);
|
||||||
|
if (!amduatd_strbuf_append_cstr(&b, tmp)) {
|
||||||
|
goto fed_pull_until_cleanup;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
if (!amduatd_strbuf_append_cstr(&b, "null")) {
|
||||||
|
goto fed_pull_until_cleanup;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (!amduatd_strbuf_append_cstr(&b, ",\"ref\":")) {
|
||||||
|
goto fed_pull_until_cleanup;
|
||||||
|
}
|
||||||
|
if (cursor_ref_hex != NULL) {
|
||||||
|
if (!amduatd_strbuf_append_cstr(&b, "\"") ||
|
||||||
|
!amduatd_strbuf_append_cstr(&b, cursor_ref_hex) ||
|
||||||
|
!amduatd_strbuf_append_cstr(&b, "\"")) {
|
||||||
|
goto fed_pull_until_cleanup;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
if (!amduatd_strbuf_append_cstr(&b, "null")) {
|
||||||
|
goto fed_pull_until_cleanup;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (!amduatd_strbuf_append_cstr(&b, "}}\n")) {
|
||||||
|
goto fed_pull_until_cleanup;
|
||||||
|
}
|
||||||
|
|
||||||
|
ok = amduatd_http_send_json(fd, 200, "OK", b.data, false);
|
||||||
|
|
||||||
|
fed_pull_until_cleanup:
|
||||||
|
amduatd_strbuf_free(&b);
|
||||||
|
free(cursor_ref_hex);
|
||||||
|
amduatd_fed_until_report_free(&report);
|
||||||
|
if (!ok) {
|
||||||
|
return amduatd_send_json_error(fd, 500, "Internal Server Error",
|
||||||
|
"encode error");
|
||||||
|
}
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
static bool amduatd_handle_post_fed_push(int fd,
|
static bool amduatd_handle_post_fed_push(int fd,
|
||||||
amduat_asl_store_t *store,
|
amduat_asl_store_t *store,
|
||||||
const amduatd_fed_cfg_t *fed_cfg,
|
const amduatd_fed_cfg_t *fed_cfg,
|
||||||
|
|
@ -5675,6 +6017,272 @@ fed_push_cleanup:
|
||||||
}
|
}
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static bool amduatd_handle_post_fed_push_until(
|
||||||
|
int fd,
|
||||||
|
amduat_asl_store_t *store,
|
||||||
|
const amduatd_fed_cfg_t *fed_cfg,
|
||||||
|
const amduatd_caps_t *caps,
|
||||||
|
const amduatd_cfg_t *dcfg,
|
||||||
|
const amduatd_http_req_t *req,
|
||||||
|
const char *root_path) {
|
||||||
|
char peer_buf[AMDUAT_ASL_POINTER_NAME_MAX + 1u];
|
||||||
|
char limit_buf[32];
|
||||||
|
char rounds_buf[32];
|
||||||
|
uint64_t limit = 128u;
|
||||||
|
uint64_t max_rounds = 10u;
|
||||||
|
amduat_asl_pointer_store_t pointer_store;
|
||||||
|
amduat_fed_transport_unix_t transport;
|
||||||
|
amduatd_fed_push_transport_t push_transport;
|
||||||
|
amduatd_fed_until_report_t report;
|
||||||
|
amduatd_fed_push_apply_status_t status;
|
||||||
|
amduatd_strbuf_t b;
|
||||||
|
char *cursor_ref_hex = NULL;
|
||||||
|
bool ok = false;
|
||||||
|
int http_status = 200;
|
||||||
|
const char *http_reason = "OK";
|
||||||
|
|
||||||
|
if (store == NULL || fed_cfg == NULL || req == NULL || root_path == NULL ||
|
||||||
|
dcfg == NULL) {
|
||||||
|
return amduatd_send_json_error(fd, 500, "Internal Server Error",
|
||||||
|
"internal error");
|
||||||
|
}
|
||||||
|
if (!amduatd_fed_require_space(fd, fed_cfg, req)) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
{
|
||||||
|
amduatd_fed_push_plan_status_t check =
|
||||||
|
amduatd_fed_push_plan_check(fed_cfg, store);
|
||||||
|
if (check == AMDUATD_FED_PUSH_PLAN_ERR_DISABLED) {
|
||||||
|
return amduatd_send_json_error(fd, 503, "Service Unavailable",
|
||||||
|
"federation disabled");
|
||||||
|
}
|
||||||
|
if (check == AMDUATD_FED_PUSH_PLAN_ERR_UNSUPPORTED) {
|
||||||
|
return amduatd_send_json_error(fd, 501, "Not Implemented",
|
||||||
|
"requires index backend");
|
||||||
|
}
|
||||||
|
if (check != AMDUATD_FED_PUSH_PLAN_OK) {
|
||||||
|
return amduatd_send_json_error(fd, 500, "Internal Server Error",
|
||||||
|
"plan unavailable");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (caps != NULL && caps->enabled && req->x_capability[0] != '\0') {
|
||||||
|
const char *reason = NULL;
|
||||||
|
if (!amduatd_caps_check_space(caps, dcfg, req, &reason)) {
|
||||||
|
if (reason != NULL && strcmp(reason, "wrong-space") == 0) {
|
||||||
|
return amduatd_send_json_error(fd, 403, "Forbidden",
|
||||||
|
"space not permitted by capability");
|
||||||
|
}
|
||||||
|
return amduatd_send_json_error(fd, 403, "Forbidden",
|
||||||
|
"invalid capability");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (amduatd_query_param(req->path, "peer",
|
||||||
|
peer_buf, sizeof(peer_buf)) == NULL ||
|
||||||
|
peer_buf[0] == '\0') {
|
||||||
|
return amduatd_send_json_error(fd, 400, "Bad Request",
|
||||||
|
"missing peer");
|
||||||
|
}
|
||||||
|
{
|
||||||
|
amduat_octets_t scoped = amduat_octets(NULL, 0u);
|
||||||
|
if (!amduatd_fed_push_cursor_pointer_name(req->effective_space,
|
||||||
|
peer_buf,
|
||||||
|
&scoped)) {
|
||||||
|
return amduatd_send_json_error(fd, 400, "Bad Request", "invalid peer");
|
||||||
|
}
|
||||||
|
amduat_octets_free(&scoped);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (amduatd_query_param(req->path, "limit",
|
||||||
|
limit_buf, sizeof(limit_buf)) != NULL) {
|
||||||
|
if (!amduatd_parse_u64_str(limit_buf, &limit) || limit == 0u ||
|
||||||
|
limit > 2048u) {
|
||||||
|
return amduatd_send_json_error(fd, 400, "Bad Request", "invalid limit");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (amduatd_query_param(req->path, "max_rounds",
|
||||||
|
rounds_buf, sizeof(rounds_buf)) != NULL) {
|
||||||
|
if (!amduatd_parse_u64_str(rounds_buf, &max_rounds) || max_rounds == 0u ||
|
||||||
|
max_rounds > 100u) {
|
||||||
|
return amduatd_send_json_error(fd, 400, "Bad Request",
|
||||||
|
"invalid max_rounds");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!amduat_asl_pointer_store_init(&pointer_store, root_path)) {
|
||||||
|
return amduatd_send_json_error(fd, 500, "Internal Server Error",
|
||||||
|
"pointer store error");
|
||||||
|
}
|
||||||
|
|
||||||
|
if (fed_cfg->transport_kind != AMDUATD_FED_TRANSPORT_UNIX ||
|
||||||
|
!fed_cfg->unix_socket_set) {
|
||||||
|
return amduatd_send_json_error(fd, 501, "Not Implemented",
|
||||||
|
"federation transport unavailable");
|
||||||
|
}
|
||||||
|
if (!amduat_fed_transport_unix_init(&transport, fed_cfg->unix_socket_path)) {
|
||||||
|
return amduatd_send_json_error(fd, 500, "Internal Server Error",
|
||||||
|
"transport init failed");
|
||||||
|
}
|
||||||
|
if (req->effective_space != NULL && req->effective_space->enabled &&
|
||||||
|
req->effective_space->space_id.data != NULL) {
|
||||||
|
(void)amduat_fed_transport_unix_set_space(
|
||||||
|
&transport, (const char *)req->effective_space->space_id.data);
|
||||||
|
}
|
||||||
|
|
||||||
|
memset(&push_transport, 0, sizeof(push_transport));
|
||||||
|
push_transport.ctx = &transport;
|
||||||
|
push_transport.post_ingest = amduatd_fed_push_unix_post_ingest;
|
||||||
|
|
||||||
|
status = amduatd_fed_push_until(store,
|
||||||
|
&pointer_store,
|
||||||
|
req->effective_space,
|
||||||
|
peer_buf,
|
||||||
|
limit,
|
||||||
|
max_rounds,
|
||||||
|
root_path,
|
||||||
|
fed_cfg,
|
||||||
|
&push_transport,
|
||||||
|
&report);
|
||||||
|
amduatd_fed_push_apply_status_http(status, &http_status, &http_reason);
|
||||||
|
if (status != AMDUATD_FED_PUSH_APPLY_OK) {
|
||||||
|
const char *msg = report.error[0] != '\0' ? report.error : "error";
|
||||||
|
amduatd_fed_until_report_free(&report);
|
||||||
|
return amduatd_send_json_error(fd, http_status, http_reason, msg);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (report.cursor_ref_set) {
|
||||||
|
(void)amduat_asl_ref_encode_hex(report.cursor_ref, &cursor_ref_hex);
|
||||||
|
}
|
||||||
|
|
||||||
|
memset(&b, 0, sizeof(b));
|
||||||
|
if (!amduatd_strbuf_append_cstr(&b, "{")) {
|
||||||
|
goto fed_push_until_cleanup;
|
||||||
|
}
|
||||||
|
if (!amduatd_strbuf_append_cstr(&b, "\"peer\":\"") ||
|
||||||
|
!amduatd_strbuf_append_cstr(&b, peer_buf) ||
|
||||||
|
!amduatd_strbuf_append_cstr(&b, "\",")) {
|
||||||
|
goto fed_push_until_cleanup;
|
||||||
|
}
|
||||||
|
if (!amduatd_strbuf_append_cstr(&b, "\"effective_space\":{")) {
|
||||||
|
goto fed_push_until_cleanup;
|
||||||
|
}
|
||||||
|
if (req->effective_space != NULL && req->effective_space->enabled &&
|
||||||
|
req->effective_space->space_id.data != NULL) {
|
||||||
|
const char *space_id = (const char *)req->effective_space->space_id.data;
|
||||||
|
if (!amduatd_strbuf_append_cstr(&b, "\"mode\":\"scoped\",") ||
|
||||||
|
!amduatd_strbuf_append_cstr(&b, "\"space_id\":\"") ||
|
||||||
|
!amduatd_strbuf_append_cstr(&b, space_id) ||
|
||||||
|
!amduatd_strbuf_append_cstr(&b, "\"")) {
|
||||||
|
goto fed_push_until_cleanup;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
if (!amduatd_strbuf_append_cstr(&b, "\"mode\":\"unscoped\",") ||
|
||||||
|
!amduatd_strbuf_append_cstr(&b, "\"space_id\":null")) {
|
||||||
|
goto fed_push_until_cleanup;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (!amduatd_strbuf_append_cstr(&b, "},")) {
|
||||||
|
goto fed_push_until_cleanup;
|
||||||
|
}
|
||||||
|
if (!amduatd_strbuf_append_cstr(&b, "\"operation\":\"push\",")) {
|
||||||
|
goto fed_push_until_cleanup;
|
||||||
|
}
|
||||||
|
{
|
||||||
|
char tmp[64];
|
||||||
|
snprintf(tmp, sizeof(tmp), "%llu", (unsigned long long)limit);
|
||||||
|
if (!amduatd_strbuf_append_cstr(&b, "\"limit\":") ||
|
||||||
|
!amduatd_strbuf_append_cstr(&b, tmp) ||
|
||||||
|
!amduatd_strbuf_append_cstr(&b, ",")) {
|
||||||
|
goto fed_push_until_cleanup;
|
||||||
|
}
|
||||||
|
snprintf(tmp, sizeof(tmp), "%llu", (unsigned long long)max_rounds);
|
||||||
|
if (!amduatd_strbuf_append_cstr(&b, "\"max_rounds\":") ||
|
||||||
|
!amduatd_strbuf_append_cstr(&b, tmp) ||
|
||||||
|
!amduatd_strbuf_append_cstr(&b, ",")) {
|
||||||
|
goto fed_push_until_cleanup;
|
||||||
|
}
|
||||||
|
snprintf(tmp, sizeof(tmp), "%llu",
|
||||||
|
(unsigned long long)report.rounds_executed);
|
||||||
|
if (!amduatd_strbuf_append_cstr(&b, "\"rounds_executed\":") ||
|
||||||
|
!amduatd_strbuf_append_cstr(&b, tmp) ||
|
||||||
|
!amduatd_strbuf_append_cstr(&b, ",")) {
|
||||||
|
goto fed_push_until_cleanup;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (!amduatd_strbuf_append_cstr(&b, "\"caught_up\":") ||
|
||||||
|
!amduatd_strbuf_append_cstr(&b, report.caught_up ? "true" : "false") ||
|
||||||
|
!amduatd_strbuf_append_cstr(&b, ",")) {
|
||||||
|
goto fed_push_until_cleanup;
|
||||||
|
}
|
||||||
|
if (!amduatd_strbuf_append_cstr(&b, "\"applied\":{")) {
|
||||||
|
goto fed_push_until_cleanup;
|
||||||
|
}
|
||||||
|
{
|
||||||
|
char tmp[64];
|
||||||
|
snprintf(tmp, sizeof(tmp), "%zu", report.total_records);
|
||||||
|
if (!amduatd_strbuf_append_cstr(&b, "\"records\":") ||
|
||||||
|
!amduatd_strbuf_append_cstr(&b, tmp) ||
|
||||||
|
!amduatd_strbuf_append_cstr(&b, ",")) {
|
||||||
|
goto fed_push_until_cleanup;
|
||||||
|
}
|
||||||
|
snprintf(tmp, sizeof(tmp), "%zu", report.total_artifacts);
|
||||||
|
if (!amduatd_strbuf_append_cstr(&b, "\"artifacts\":") ||
|
||||||
|
!amduatd_strbuf_append_cstr(&b, tmp) ||
|
||||||
|
!amduatd_strbuf_append_cstr(&b, "},")) {
|
||||||
|
goto fed_push_until_cleanup;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (!amduatd_strbuf_append_cstr(&b, "\"cursor\":{")) {
|
||||||
|
goto fed_push_until_cleanup;
|
||||||
|
}
|
||||||
|
if (!amduatd_strbuf_append_cstr(&b, "\"last_logseq\":")) {
|
||||||
|
goto fed_push_until_cleanup;
|
||||||
|
}
|
||||||
|
if (report.cursor_has_logseq) {
|
||||||
|
char tmp[64];
|
||||||
|
snprintf(tmp, sizeof(tmp), "%llu",
|
||||||
|
(unsigned long long)report.cursor_logseq);
|
||||||
|
if (!amduatd_strbuf_append_cstr(&b, tmp)) {
|
||||||
|
goto fed_push_until_cleanup;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
if (!amduatd_strbuf_append_cstr(&b, "null")) {
|
||||||
|
goto fed_push_until_cleanup;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (!amduatd_strbuf_append_cstr(&b, ",\"ref\":")) {
|
||||||
|
goto fed_push_until_cleanup;
|
||||||
|
}
|
||||||
|
if (cursor_ref_hex != NULL) {
|
||||||
|
if (!amduatd_strbuf_append_cstr(&b, "\"") ||
|
||||||
|
!amduatd_strbuf_append_cstr(&b, cursor_ref_hex) ||
|
||||||
|
!amduatd_strbuf_append_cstr(&b, "\"")) {
|
||||||
|
goto fed_push_until_cleanup;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
if (!amduatd_strbuf_append_cstr(&b, "null")) {
|
||||||
|
goto fed_push_until_cleanup;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (!amduatd_strbuf_append_cstr(&b, "}}\n")) {
|
||||||
|
goto fed_push_until_cleanup;
|
||||||
|
}
|
||||||
|
|
||||||
|
ok = amduatd_http_send_json(fd, 200, "OK", b.data, false);
|
||||||
|
|
||||||
|
fed_push_until_cleanup:
|
||||||
|
amduatd_strbuf_free(&b);
|
||||||
|
free(cursor_ref_hex);
|
||||||
|
amduatd_fed_until_report_free(&report);
|
||||||
|
if (!ok) {
|
||||||
|
return amduatd_send_json_error(fd, 500, "Internal Server Error",
|
||||||
|
"encode error");
|
||||||
|
}
|
||||||
|
return true;
|
||||||
|
}
|
||||||
static bool amduatd_handle_post_pel_programs(int fd,
|
static bool amduatd_handle_post_pel_programs(int fd,
|
||||||
amduat_asl_store_t *store,
|
amduat_asl_store_t *store,
|
||||||
const amduatd_http_req_t *req) {
|
const amduatd_http_req_t *req) {
|
||||||
|
|
@ -7077,6 +7685,17 @@ static bool amduatd_handle_conn(int fd,
|
||||||
root_path);
|
root_path);
|
||||||
goto conn_cleanup;
|
goto conn_cleanup;
|
||||||
}
|
}
|
||||||
|
if (strcmp(req.method, "POST") == 0 &&
|
||||||
|
strcmp(no_query, "/v1/fed/pull/until") == 0) {
|
||||||
|
ok = amduatd_handle_post_fed_pull_until(fd,
|
||||||
|
store,
|
||||||
|
fed_cfg,
|
||||||
|
caps,
|
||||||
|
effective_cfg,
|
||||||
|
&req,
|
||||||
|
root_path);
|
||||||
|
goto conn_cleanup;
|
||||||
|
}
|
||||||
if (strcmp(req.method, "POST") == 0 &&
|
if (strcmp(req.method, "POST") == 0 &&
|
||||||
strcmp(no_query, "/v1/fed/push") == 0) {
|
strcmp(no_query, "/v1/fed/push") == 0) {
|
||||||
ok = amduatd_handle_post_fed_push(fd,
|
ok = amduatd_handle_post_fed_push(fd,
|
||||||
|
|
@ -7088,6 +7707,17 @@ static bool amduatd_handle_conn(int fd,
|
||||||
root_path);
|
root_path);
|
||||||
goto conn_cleanup;
|
goto conn_cleanup;
|
||||||
}
|
}
|
||||||
|
if (strcmp(req.method, "POST") == 0 &&
|
||||||
|
strcmp(no_query, "/v1/fed/push/until") == 0) {
|
||||||
|
ok = amduatd_handle_post_fed_push_until(fd,
|
||||||
|
store,
|
||||||
|
fed_cfg,
|
||||||
|
caps,
|
||||||
|
effective_cfg,
|
||||||
|
&req,
|
||||||
|
root_path);
|
||||||
|
goto conn_cleanup;
|
||||||
|
}
|
||||||
if (strcmp(req.method, "POST") == 0 &&
|
if (strcmp(req.method, "POST") == 0 &&
|
||||||
strcmp(no_query, "/v1/fed/ingest") == 0) {
|
strcmp(no_query, "/v1/fed/ingest") == 0) {
|
||||||
ok = amduatd_handle_post_fed_ingest(fd,
|
ok = amduatd_handle_post_fed_ingest(fd,
|
||||||
|
|
|
||||||
236
src/amduatd_fed_until.c
Normal file
236
src/amduatd_fed_until.c
Normal file
|
|
@ -0,0 +1,236 @@
|
||||||
|
#include "amduatd_fed_until.h"
|
||||||
|
|
||||||
|
#include <stdlib.h>
|
||||||
|
#include <string.h>
|
||||||
|
|
||||||
|
static void amduatd_fed_until_report_clear_cursor(
|
||||||
|
amduatd_fed_until_report_t *report) {
|
||||||
|
if (report == NULL) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
if (report->cursor_ref_set) {
|
||||||
|
amduat_reference_free(&report->cursor_ref);
|
||||||
|
report->cursor_ref_set = false;
|
||||||
|
}
|
||||||
|
report->cursor_has_logseq = false;
|
||||||
|
report->cursor_logseq = 0u;
|
||||||
|
}
|
||||||
|
|
||||||
|
static void amduatd_fed_until_report_set_cursor(
|
||||||
|
amduatd_fed_until_report_t *report,
|
||||||
|
bool has_logseq,
|
||||||
|
uint64_t logseq,
|
||||||
|
bool has_ref,
|
||||||
|
amduat_reference_t ref) {
|
||||||
|
if (report == NULL) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
amduatd_fed_until_report_clear_cursor(report);
|
||||||
|
if (has_logseq) {
|
||||||
|
report->cursor_has_logseq = true;
|
||||||
|
report->cursor_logseq = logseq;
|
||||||
|
}
|
||||||
|
if (has_ref) {
|
||||||
|
if (amduat_reference_clone(ref, &report->cursor_ref)) {
|
||||||
|
report->cursor_ref_set = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static void amduatd_fed_until_report_error(amduatd_fed_until_report_t *report,
|
||||||
|
const char *msg,
|
||||||
|
int remote_status) {
|
||||||
|
if (report == NULL || msg == NULL) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
memset(report->error, 0, sizeof(report->error));
|
||||||
|
strncpy(report->error, msg, sizeof(report->error) - 1u);
|
||||||
|
report->remote_status = remote_status;
|
||||||
|
}
|
||||||
|
|
||||||
|
void amduatd_fed_until_report_init(amduatd_fed_until_report_t *report) {
|
||||||
|
if (report == NULL) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
memset(report, 0, sizeof(*report));
|
||||||
|
report->cursor_ref = amduat_reference(0u, amduat_octets(NULL, 0u));
|
||||||
|
}
|
||||||
|
|
||||||
|
void amduatd_fed_until_report_free(amduatd_fed_until_report_t *report) {
|
||||||
|
if (report == NULL) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
if (report->cursor_ref_set) {
|
||||||
|
amduat_reference_free(&report->cursor_ref);
|
||||||
|
}
|
||||||
|
memset(report, 0, sizeof(*report));
|
||||||
|
}
|
||||||
|
|
||||||
|
amduatd_fed_pull_apply_status_t amduatd_fed_pull_until(
|
||||||
|
amduat_asl_store_t *store,
|
||||||
|
amduat_asl_pointer_store_t *pointer_store,
|
||||||
|
const amduatd_space_t *effective_space,
|
||||||
|
const char *peer_key,
|
||||||
|
uint64_t limit,
|
||||||
|
uint64_t max_rounds,
|
||||||
|
const amduatd_fed_cfg_t *fed_cfg,
|
||||||
|
const amduatd_fed_pull_transport_t *transport,
|
||||||
|
amduatd_fed_until_report_t *out_report) {
|
||||||
|
amduatd_fed_pull_apply_status_t status = AMDUATD_FED_PULL_APPLY_OK;
|
||||||
|
amduatd_fed_pull_apply_report_t round_report;
|
||||||
|
|
||||||
|
if (out_report == NULL) {
|
||||||
|
return AMDUATD_FED_PULL_APPLY_ERR_INVALID;
|
||||||
|
}
|
||||||
|
amduatd_fed_until_report_init(out_report);
|
||||||
|
out_report->peer_key = peer_key;
|
||||||
|
out_report->effective_space = effective_space;
|
||||||
|
out_report->limit = limit;
|
||||||
|
out_report->max_rounds = max_rounds;
|
||||||
|
|
||||||
|
if (store == NULL || pointer_store == NULL || peer_key == NULL ||
|
||||||
|
fed_cfg == NULL || transport == NULL || max_rounds == 0u) {
|
||||||
|
amduatd_fed_until_report_error(out_report, "invalid inputs", 0);
|
||||||
|
return AMDUATD_FED_PULL_APPLY_ERR_INVALID;
|
||||||
|
}
|
||||||
|
|
||||||
|
for (uint64_t round = 0u; round < max_rounds; ++round) {
|
||||||
|
amduatd_fed_pull_apply_report_init(&round_report);
|
||||||
|
status = amduatd_fed_pull_apply(store,
|
||||||
|
pointer_store,
|
||||||
|
effective_space,
|
||||||
|
peer_key,
|
||||||
|
limit,
|
||||||
|
fed_cfg,
|
||||||
|
transport,
|
||||||
|
&round_report);
|
||||||
|
out_report->rounds_executed++;
|
||||||
|
|
||||||
|
if (status != AMDUATD_FED_PULL_APPLY_OK) {
|
||||||
|
amduatd_fed_until_report_error(out_report,
|
||||||
|
round_report.error[0] != '\0'
|
||||||
|
? round_report.error
|
||||||
|
: "error",
|
||||||
|
round_report.remote_status);
|
||||||
|
amduatd_fed_pull_apply_report_free(&round_report);
|
||||||
|
return status;
|
||||||
|
}
|
||||||
|
|
||||||
|
out_report->total_records += round_report.applied_record_count;
|
||||||
|
out_report->total_artifacts += round_report.applied_artifact_count;
|
||||||
|
if (round_report.cursor_advanced) {
|
||||||
|
amduatd_fed_until_report_set_cursor(
|
||||||
|
out_report,
|
||||||
|
round_report.cursor_after_has_logseq,
|
||||||
|
round_report.cursor_after_logseq,
|
||||||
|
round_report.cursor_after_ref_set,
|
||||||
|
round_report.cursor_after_ref);
|
||||||
|
} else if (round_report.cursor_present) {
|
||||||
|
amduatd_fed_until_report_set_cursor(
|
||||||
|
out_report,
|
||||||
|
round_report.cursor_has_logseq,
|
||||||
|
round_report.cursor_logseq,
|
||||||
|
round_report.cursor_ref_set,
|
||||||
|
round_report.cursor_ref);
|
||||||
|
} else {
|
||||||
|
amduatd_fed_until_report_clear_cursor(out_report);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (round_report.plan_record_count == 0u) {
|
||||||
|
out_report->caught_up = true;
|
||||||
|
amduatd_fed_pull_apply_report_free(&round_report);
|
||||||
|
return AMDUATD_FED_PULL_APPLY_OK;
|
||||||
|
}
|
||||||
|
|
||||||
|
amduatd_fed_pull_apply_report_free(&round_report);
|
||||||
|
}
|
||||||
|
|
||||||
|
out_report->caught_up = false;
|
||||||
|
return AMDUATD_FED_PULL_APPLY_OK;
|
||||||
|
}
|
||||||
|
|
||||||
|
amduatd_fed_push_apply_status_t amduatd_fed_push_until(
|
||||||
|
amduat_asl_store_t *store,
|
||||||
|
amduat_asl_pointer_store_t *pointer_store,
|
||||||
|
const amduatd_space_t *effective_space,
|
||||||
|
const char *peer_key,
|
||||||
|
uint64_t limit,
|
||||||
|
uint64_t max_rounds,
|
||||||
|
const char *root_path,
|
||||||
|
const amduatd_fed_cfg_t *fed_cfg,
|
||||||
|
const amduatd_fed_push_transport_t *transport,
|
||||||
|
amduatd_fed_until_report_t *out_report) {
|
||||||
|
amduatd_fed_push_apply_status_t status = AMDUATD_FED_PUSH_APPLY_OK;
|
||||||
|
amduatd_fed_push_apply_report_t round_report;
|
||||||
|
|
||||||
|
if (out_report == NULL) {
|
||||||
|
return AMDUATD_FED_PUSH_APPLY_ERR_INVALID;
|
||||||
|
}
|
||||||
|
amduatd_fed_until_report_init(out_report);
|
||||||
|
out_report->peer_key = peer_key;
|
||||||
|
out_report->effective_space = effective_space;
|
||||||
|
out_report->limit = limit;
|
||||||
|
out_report->max_rounds = max_rounds;
|
||||||
|
|
||||||
|
if (store == NULL || pointer_store == NULL || peer_key == NULL ||
|
||||||
|
root_path == NULL || fed_cfg == NULL || transport == NULL ||
|
||||||
|
max_rounds == 0u) {
|
||||||
|
amduatd_fed_until_report_error(out_report, "invalid inputs", 0);
|
||||||
|
return AMDUATD_FED_PUSH_APPLY_ERR_INVALID;
|
||||||
|
}
|
||||||
|
|
||||||
|
for (uint64_t round = 0u; round < max_rounds; ++round) {
|
||||||
|
amduatd_fed_push_apply_report_init(&round_report);
|
||||||
|
status = amduatd_fed_push_apply(store,
|
||||||
|
pointer_store,
|
||||||
|
effective_space,
|
||||||
|
peer_key,
|
||||||
|
limit,
|
||||||
|
root_path,
|
||||||
|
fed_cfg,
|
||||||
|
transport,
|
||||||
|
&round_report);
|
||||||
|
out_report->rounds_executed++;
|
||||||
|
|
||||||
|
if (status != AMDUATD_FED_PUSH_APPLY_OK) {
|
||||||
|
amduatd_fed_until_report_error(out_report,
|
||||||
|
round_report.error[0] != '\0'
|
||||||
|
? round_report.error
|
||||||
|
: "error",
|
||||||
|
round_report.remote_status);
|
||||||
|
amduatd_fed_push_apply_report_free(&round_report);
|
||||||
|
return status;
|
||||||
|
}
|
||||||
|
|
||||||
|
out_report->total_records += round_report.sent_record_count;
|
||||||
|
out_report->total_artifacts += round_report.sent_artifact_count;
|
||||||
|
if (round_report.cursor_advanced) {
|
||||||
|
amduatd_fed_until_report_set_cursor(
|
||||||
|
out_report,
|
||||||
|
round_report.cursor_after_has_logseq,
|
||||||
|
round_report.cursor_after_logseq,
|
||||||
|
round_report.cursor_after_ref_set,
|
||||||
|
round_report.cursor_after_ref);
|
||||||
|
} else if (round_report.cursor_present) {
|
||||||
|
amduatd_fed_until_report_set_cursor(
|
||||||
|
out_report,
|
||||||
|
round_report.cursor_has_logseq,
|
||||||
|
round_report.cursor_logseq,
|
||||||
|
round_report.cursor_ref_set,
|
||||||
|
round_report.cursor_ref);
|
||||||
|
} else {
|
||||||
|
amduatd_fed_until_report_clear_cursor(out_report);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (round_report.plan_record_count == 0u) {
|
||||||
|
out_report->caught_up = true;
|
||||||
|
amduatd_fed_push_apply_report_free(&round_report);
|
||||||
|
return AMDUATD_FED_PUSH_APPLY_OK;
|
||||||
|
}
|
||||||
|
|
||||||
|
amduatd_fed_push_apply_report_free(&round_report);
|
||||||
|
}
|
||||||
|
|
||||||
|
out_report->caught_up = false;
|
||||||
|
return AMDUATD_FED_PUSH_APPLY_OK;
|
||||||
|
}
|
||||||
63
src/amduatd_fed_until.h
Normal file
63
src/amduatd_fed_until.h
Normal file
|
|
@ -0,0 +1,63 @@
|
||||||
|
#ifndef AMDUATD_FED_UNTIL_H
|
||||||
|
#define AMDUATD_FED_UNTIL_H
|
||||||
|
|
||||||
|
#include "amduatd_fed_pull_apply.h"
|
||||||
|
#include "amduatd_fed_push_apply.h"
|
||||||
|
|
||||||
|
#include <stdbool.h>
|
||||||
|
#include <stddef.h>
|
||||||
|
#include <stdint.h>
|
||||||
|
|
||||||
|
#ifdef __cplusplus
|
||||||
|
extern "C" {
|
||||||
|
#endif
|
||||||
|
|
||||||
|
typedef struct {
|
||||||
|
const char *peer_key;
|
||||||
|
const amduatd_space_t *effective_space;
|
||||||
|
uint64_t limit;
|
||||||
|
uint64_t max_rounds;
|
||||||
|
uint64_t rounds_executed;
|
||||||
|
bool caught_up;
|
||||||
|
size_t total_records;
|
||||||
|
size_t total_artifacts;
|
||||||
|
bool cursor_has_logseq;
|
||||||
|
uint64_t cursor_logseq;
|
||||||
|
bool cursor_ref_set;
|
||||||
|
amduat_reference_t cursor_ref;
|
||||||
|
int remote_status;
|
||||||
|
char error[256];
|
||||||
|
} amduatd_fed_until_report_t;
|
||||||
|
|
||||||
|
void amduatd_fed_until_report_init(amduatd_fed_until_report_t *report);
|
||||||
|
|
||||||
|
void amduatd_fed_until_report_free(amduatd_fed_until_report_t *report);
|
||||||
|
|
||||||
|
amduatd_fed_pull_apply_status_t amduatd_fed_pull_until(
|
||||||
|
amduat_asl_store_t *store,
|
||||||
|
amduat_asl_pointer_store_t *pointer_store,
|
||||||
|
const amduatd_space_t *effective_space,
|
||||||
|
const char *peer_key,
|
||||||
|
uint64_t limit,
|
||||||
|
uint64_t max_rounds,
|
||||||
|
const amduatd_fed_cfg_t *fed_cfg,
|
||||||
|
const amduatd_fed_pull_transport_t *transport,
|
||||||
|
amduatd_fed_until_report_t *out_report);
|
||||||
|
|
||||||
|
amduatd_fed_push_apply_status_t amduatd_fed_push_until(
|
||||||
|
amduat_asl_store_t *store,
|
||||||
|
amduat_asl_pointer_store_t *pointer_store,
|
||||||
|
const amduatd_space_t *effective_space,
|
||||||
|
const char *peer_key,
|
||||||
|
uint64_t limit,
|
||||||
|
uint64_t max_rounds,
|
||||||
|
const char *root_path,
|
||||||
|
const amduatd_fed_cfg_t *fed_cfg,
|
||||||
|
const amduatd_fed_push_transport_t *transport,
|
||||||
|
amduatd_fed_until_report_t *out_report);
|
||||||
|
|
||||||
|
#ifdef __cplusplus
|
||||||
|
} /* extern "C" */
|
||||||
|
#endif
|
||||||
|
|
||||||
|
#endif /* AMDUATD_FED_UNTIL_H */
|
||||||
512
tests/test_amduatd_fed_pull_until.c
Normal file
512
tests/test_amduatd_fed_pull_until.c
Normal file
|
|
@ -0,0 +1,512 @@
|
||||||
|
#ifndef _POSIX_C_SOURCE
|
||||||
|
#define _POSIX_C_SOURCE 200809L
|
||||||
|
#endif
|
||||||
|
|
||||||
|
#include "amduatd_fed_until.h"
|
||||||
|
#include "amduatd_fed_cursor.h"
|
||||||
|
#include "amduatd_store.h"
|
||||||
|
|
||||||
|
#include "amduat/asl/artifact_io.h"
|
||||||
|
#include "amduat/asl/core.h"
|
||||||
|
#include "amduat/asl/asl_store_fs_meta.h"
|
||||||
|
#include "amduat/asl/ref_derive.h"
|
||||||
|
|
||||||
|
#include <stdbool.h>
|
||||||
|
#include <stdio.h>
|
||||||
|
#include <stdlib.h>
|
||||||
|
#include <string.h>
|
||||||
|
|
||||||
|
typedef struct {
|
||||||
|
amduat_fed_record_t *records;
|
||||||
|
size_t record_count;
|
||||||
|
amduat_octets_t *artifact_bytes;
|
||||||
|
} amduatd_test_pull_round_t;
|
||||||
|
|
||||||
|
typedef struct {
|
||||||
|
amduatd_test_pull_round_t *rounds;
|
||||||
|
size_t round_count;
|
||||||
|
size_t call_index;
|
||||||
|
size_t fail_round;
|
||||||
|
int fail_status;
|
||||||
|
} amduatd_test_pull_transport_t;
|
||||||
|
|
||||||
|
static int failures = 0;
|
||||||
|
|
||||||
|
static void expect(bool cond, const char *msg) {
|
||||||
|
if (!cond) {
|
||||||
|
fprintf(stderr, "FAIL: %s\n", msg);
|
||||||
|
failures++;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static char *amduatd_test_make_temp_dir(void) {
|
||||||
|
char tmpl[] = "/tmp/amduatd-fed-pull-until-XXXXXX";
|
||||||
|
char *dir = mkdtemp(tmpl);
|
||||||
|
size_t len;
|
||||||
|
char *copy;
|
||||||
|
if (dir == NULL) {
|
||||||
|
perror("mkdtemp");
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
len = strlen(dir);
|
||||||
|
copy = (char *)malloc(len + 1u);
|
||||||
|
if (copy == NULL) {
|
||||||
|
fprintf(stderr, "failed to allocate temp dir copy\n");
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
memcpy(copy, dir, len + 1u);
|
||||||
|
return copy;
|
||||||
|
}
|
||||||
|
|
||||||
|
static bool amduatd_test_make_record(amduat_asl_store_t *store,
|
||||||
|
const char *payload,
|
||||||
|
uint64_t logseq,
|
||||||
|
amduat_fed_record_t *out_record,
|
||||||
|
amduat_octets_t *out_bytes) {
|
||||||
|
amduat_artifact_t artifact;
|
||||||
|
amduat_reference_t ref;
|
||||||
|
amduat_octets_t artifact_bytes = amduat_octets(NULL, 0u);
|
||||||
|
amduat_octets_t payload_bytes = amduat_octets(NULL, 0u);
|
||||||
|
|
||||||
|
if (store == NULL || payload == NULL || out_record == NULL ||
|
||||||
|
out_bytes == NULL) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!amduat_octets_clone(amduat_octets(payload, strlen(payload)),
|
||||||
|
&payload_bytes)) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
if (!amduat_asl_artifact_from_bytes(payload_bytes,
|
||||||
|
AMDUAT_ASL_IO_RAW,
|
||||||
|
false,
|
||||||
|
amduat_type_tag(0u),
|
||||||
|
&artifact)) {
|
||||||
|
amduat_octets_free(&payload_bytes);
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
if (!amduat_asl_ref_derive(artifact,
|
||||||
|
store->config.encoding_profile_id,
|
||||||
|
store->config.hash_id,
|
||||||
|
&ref,
|
||||||
|
&artifact_bytes)) {
|
||||||
|
amduat_asl_artifact_free(&artifact);
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
amduat_asl_artifact_free(&artifact);
|
||||||
|
|
||||||
|
memset(out_record, 0, sizeof(*out_record));
|
||||||
|
out_record->logseq = logseq;
|
||||||
|
out_record->id.type = AMDUAT_FED_REC_ARTIFACT;
|
||||||
|
out_record->id.ref = ref;
|
||||||
|
|
||||||
|
*out_bytes = artifact_bytes;
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
static bool amduatd_test_pull_get_records(void *ctx,
|
||||||
|
uint32_t domain_id,
|
||||||
|
uint64_t from_logseq,
|
||||||
|
uint64_t limit,
|
||||||
|
int *out_status,
|
||||||
|
amduat_fed_record_t **out_records,
|
||||||
|
size_t *out_len,
|
||||||
|
char **out_body) {
|
||||||
|
amduatd_test_pull_transport_t *t = (amduatd_test_pull_transport_t *)ctx;
|
||||||
|
(void)domain_id;
|
||||||
|
(void)from_logseq;
|
||||||
|
(void)limit;
|
||||||
|
if (out_status == NULL || out_records == NULL || out_len == NULL) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
*out_status = 200;
|
||||||
|
*out_records = NULL;
|
||||||
|
*out_len = 0;
|
||||||
|
if (out_body != NULL) {
|
||||||
|
*out_body = NULL;
|
||||||
|
}
|
||||||
|
if (t == NULL) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
if (t->fail_round != 0u && t->call_index + 1u == t->fail_round) {
|
||||||
|
*out_status = t->fail_status;
|
||||||
|
if (out_body != NULL) {
|
||||||
|
*out_body = strdup("fail");
|
||||||
|
}
|
||||||
|
t->call_index++;
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
if (t->call_index >= t->round_count) {
|
||||||
|
t->call_index++;
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
*out_records = t->rounds[t->call_index].records;
|
||||||
|
*out_len = t->rounds[t->call_index].record_count;
|
||||||
|
t->call_index++;
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
static void amduatd_test_pull_free_records(void *ctx,
|
||||||
|
amduat_fed_record_t *records,
|
||||||
|
size_t len) {
|
||||||
|
(void)ctx;
|
||||||
|
(void)records;
|
||||||
|
(void)len;
|
||||||
|
}
|
||||||
|
|
||||||
|
static bool amduatd_test_pull_get_artifact(void *ctx,
|
||||||
|
amduat_reference_t ref,
|
||||||
|
int *out_status,
|
||||||
|
amduat_octets_t *out_bytes,
|
||||||
|
char **out_body) {
|
||||||
|
amduatd_test_pull_transport_t *t = (amduatd_test_pull_transport_t *)ctx;
|
||||||
|
size_t i;
|
||||||
|
if (out_status == NULL || out_bytes == NULL) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
*out_status = 404;
|
||||||
|
*out_bytes = amduat_octets(NULL, 0u);
|
||||||
|
if (out_body != NULL) {
|
||||||
|
*out_body = NULL;
|
||||||
|
}
|
||||||
|
if (t == NULL) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
for (i = 0; i < t->round_count; ++i) {
|
||||||
|
size_t j;
|
||||||
|
for (j = 0; j < t->rounds[i].record_count; ++j) {
|
||||||
|
if (!amduat_reference_eq(ref, t->rounds[i].records[j].id.ref)) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
if (!amduat_octets_clone(t->rounds[i].artifact_bytes[j], out_bytes)) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
*out_status = 200;
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
static void amduatd_test_pull_round_free(amduatd_test_pull_round_t *round) {
|
||||||
|
size_t i;
|
||||||
|
if (round == NULL) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
for (i = 0; i < round->record_count; ++i) {
|
||||||
|
amduat_reference_free(&round->records[i].id.ref);
|
||||||
|
amduat_octets_free(&round->artifact_bytes[i]);
|
||||||
|
}
|
||||||
|
free(round->records);
|
||||||
|
free(round->artifact_bytes);
|
||||||
|
round->records = NULL;
|
||||||
|
round->artifact_bytes = NULL;
|
||||||
|
round->record_count = 0u;
|
||||||
|
}
|
||||||
|
|
||||||
|
static bool amduatd_test_pull_round_init(amduat_asl_store_t *store,
|
||||||
|
amduatd_test_pull_round_t *round,
|
||||||
|
const char **payloads,
|
||||||
|
size_t payloads_len,
|
||||||
|
uint64_t base_logseq) {
|
||||||
|
size_t i;
|
||||||
|
if (store == NULL || round == NULL) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
memset(round, 0, sizeof(*round));
|
||||||
|
if (payloads_len == 0u) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
round->records = (amduat_fed_record_t *)calloc(payloads_len,
|
||||||
|
sizeof(*round->records));
|
||||||
|
round->artifact_bytes = (amduat_octets_t *)calloc(payloads_len,
|
||||||
|
sizeof(*round->artifact_bytes));
|
||||||
|
if (round->records == NULL || round->artifact_bytes == NULL) {
|
||||||
|
amduatd_test_pull_round_free(round);
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
for (i = 0; i < payloads_len; ++i) {
|
||||||
|
if (!amduatd_test_make_record(store,
|
||||||
|
payloads[i],
|
||||||
|
base_logseq + i,
|
||||||
|
&round->records[i],
|
||||||
|
&round->artifact_bytes[i])) {
|
||||||
|
amduatd_test_pull_round_free(round);
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
round->record_count = payloads_len;
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int amduatd_test_pull_until_zero(void) {
|
||||||
|
char *root = amduatd_test_make_temp_dir();
|
||||||
|
amduat_asl_store_fs_config_t cfg;
|
||||||
|
amduatd_store_ctx_t store_ctx;
|
||||||
|
amduat_asl_store_t store;
|
||||||
|
amduat_asl_pointer_store_t pointer_store;
|
||||||
|
amduatd_space_t space;
|
||||||
|
amduatd_fed_cfg_t fed_cfg;
|
||||||
|
amduatd_test_pull_transport_t stub;
|
||||||
|
amduatd_fed_pull_transport_t transport;
|
||||||
|
amduatd_fed_until_report_t report;
|
||||||
|
amduatd_fed_pull_apply_status_t status;
|
||||||
|
|
||||||
|
if (root == NULL) {
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
memset(&cfg, 0, sizeof(cfg));
|
||||||
|
if (!amduat_asl_store_fs_init_root(root, NULL, &cfg)) {
|
||||||
|
fprintf(stderr, "failed to init store root\n");
|
||||||
|
free(root);
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
memset(&store_ctx, 0, sizeof(store_ctx));
|
||||||
|
memset(&store, 0, sizeof(store));
|
||||||
|
if (!amduatd_store_init(&store,
|
||||||
|
&cfg,
|
||||||
|
&store_ctx,
|
||||||
|
root,
|
||||||
|
AMDUATD_STORE_BACKEND_INDEX)) {
|
||||||
|
fprintf(stderr, "failed to init store\n");
|
||||||
|
free(root);
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
if (!amduat_asl_pointer_store_init(&pointer_store, root)) {
|
||||||
|
fprintf(stderr, "failed to init pointer store\n");
|
||||||
|
free(root);
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
if (!amduatd_space_init(&space, "demo", false)) {
|
||||||
|
fprintf(stderr, "failed to init space\n");
|
||||||
|
free(root);
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
amduatd_fed_cfg_init(&fed_cfg);
|
||||||
|
fed_cfg.enabled = true;
|
||||||
|
|
||||||
|
memset(&stub, 0, sizeof(stub));
|
||||||
|
memset(&transport, 0, sizeof(transport));
|
||||||
|
transport.ctx = &stub;
|
||||||
|
transport.get_records = amduatd_test_pull_get_records;
|
||||||
|
transport.free_records = amduatd_test_pull_free_records;
|
||||||
|
transport.get_artifact = amduatd_test_pull_get_artifact;
|
||||||
|
|
||||||
|
status = amduatd_fed_pull_until(&store,
|
||||||
|
&pointer_store,
|
||||||
|
&space,
|
||||||
|
"1",
|
||||||
|
16u,
|
||||||
|
3u,
|
||||||
|
&fed_cfg,
|
||||||
|
&transport,
|
||||||
|
&report);
|
||||||
|
expect(status == AMDUATD_FED_PULL_APPLY_OK, "pull until zero ok");
|
||||||
|
expect(report.caught_up, "pull until caught up");
|
||||||
|
expect(report.rounds_executed == 1u, "pull until rounds executed");
|
||||||
|
expect(report.total_records == 0u, "pull until records");
|
||||||
|
expect(report.total_artifacts == 0u, "pull until artifacts");
|
||||||
|
|
||||||
|
amduatd_fed_until_report_free(&report);
|
||||||
|
free(root);
|
||||||
|
return failures == 0 ? 0 : 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int amduatd_test_pull_until_multi(void) {
|
||||||
|
char *root = amduatd_test_make_temp_dir();
|
||||||
|
amduat_asl_store_fs_config_t cfg;
|
||||||
|
amduatd_store_ctx_t store_ctx;
|
||||||
|
amduat_asl_store_t store;
|
||||||
|
amduat_asl_pointer_store_t pointer_store;
|
||||||
|
amduatd_space_t space;
|
||||||
|
amduatd_fed_cfg_t fed_cfg;
|
||||||
|
amduatd_test_pull_round_t rounds[2];
|
||||||
|
amduatd_test_pull_transport_t stub;
|
||||||
|
amduatd_fed_pull_transport_t transport;
|
||||||
|
amduatd_fed_until_report_t report;
|
||||||
|
amduatd_fed_pull_apply_status_t status;
|
||||||
|
const char *round0_payloads[] = {"a", "b"};
|
||||||
|
const char *round1_payloads[] = {"c"};
|
||||||
|
|
||||||
|
if (root == NULL) {
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
memset(&cfg, 0, sizeof(cfg));
|
||||||
|
if (!amduat_asl_store_fs_init_root(root, NULL, &cfg)) {
|
||||||
|
fprintf(stderr, "failed to init store root\n");
|
||||||
|
free(root);
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
memset(&store_ctx, 0, sizeof(store_ctx));
|
||||||
|
memset(&store, 0, sizeof(store));
|
||||||
|
if (!amduatd_store_init(&store,
|
||||||
|
&cfg,
|
||||||
|
&store_ctx,
|
||||||
|
root,
|
||||||
|
AMDUATD_STORE_BACKEND_INDEX)) {
|
||||||
|
fprintf(stderr, "failed to init store\n");
|
||||||
|
free(root);
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
if (!amduat_asl_pointer_store_init(&pointer_store, root)) {
|
||||||
|
fprintf(stderr, "failed to init pointer store\n");
|
||||||
|
free(root);
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
if (!amduatd_space_init(&space, "demo", false)) {
|
||||||
|
fprintf(stderr, "failed to init space\n");
|
||||||
|
free(root);
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
amduatd_fed_cfg_init(&fed_cfg);
|
||||||
|
fed_cfg.enabled = true;
|
||||||
|
|
||||||
|
if (!amduatd_test_pull_round_init(&store,
|
||||||
|
&rounds[0],
|
||||||
|
round0_payloads,
|
||||||
|
2u,
|
||||||
|
0u) ||
|
||||||
|
!amduatd_test_pull_round_init(&store,
|
||||||
|
&rounds[1],
|
||||||
|
round1_payloads,
|
||||||
|
1u,
|
||||||
|
2u)) {
|
||||||
|
fprintf(stderr, "failed to init rounds\n");
|
||||||
|
free(root);
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
memset(&stub, 0, sizeof(stub));
|
||||||
|
stub.rounds = rounds;
|
||||||
|
stub.round_count = 2u;
|
||||||
|
memset(&transport, 0, sizeof(transport));
|
||||||
|
transport.ctx = &stub;
|
||||||
|
transport.get_records = amduatd_test_pull_get_records;
|
||||||
|
transport.free_records = amduatd_test_pull_free_records;
|
||||||
|
transport.get_artifact = amduatd_test_pull_get_artifact;
|
||||||
|
|
||||||
|
status = amduatd_fed_pull_until(&store,
|
||||||
|
&pointer_store,
|
||||||
|
&space,
|
||||||
|
"1",
|
||||||
|
2u,
|
||||||
|
5u,
|
||||||
|
&fed_cfg,
|
||||||
|
&transport,
|
||||||
|
&report);
|
||||||
|
expect(status == AMDUATD_FED_PULL_APPLY_OK, "pull until multi ok");
|
||||||
|
expect(report.caught_up, "pull until multi caught up");
|
||||||
|
expect(report.rounds_executed == 3u, "pull until multi rounds");
|
||||||
|
expect(report.total_records == 3u, "pull until multi records");
|
||||||
|
expect(report.total_artifacts == 3u, "pull until multi artifacts");
|
||||||
|
|
||||||
|
amduatd_fed_until_report_free(&report);
|
||||||
|
amduatd_test_pull_round_free(&rounds[0]);
|
||||||
|
amduatd_test_pull_round_free(&rounds[1]);
|
||||||
|
free(root);
|
||||||
|
return failures == 0 ? 0 : 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int amduatd_test_pull_until_error(void) {
|
||||||
|
char *root = amduatd_test_make_temp_dir();
|
||||||
|
amduat_asl_store_fs_config_t cfg;
|
||||||
|
amduatd_store_ctx_t store_ctx;
|
||||||
|
amduat_asl_store_t store;
|
||||||
|
amduat_asl_pointer_store_t pointer_store;
|
||||||
|
amduatd_space_t space;
|
||||||
|
amduatd_fed_cfg_t fed_cfg;
|
||||||
|
amduatd_test_pull_round_t rounds[1];
|
||||||
|
amduatd_test_pull_transport_t stub;
|
||||||
|
amduatd_fed_pull_transport_t transport;
|
||||||
|
amduatd_fed_until_report_t report;
|
||||||
|
amduatd_fed_pull_apply_status_t status;
|
||||||
|
const char *round0_payloads[] = {"a"};
|
||||||
|
|
||||||
|
if (root == NULL) {
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
memset(&cfg, 0, sizeof(cfg));
|
||||||
|
if (!amduat_asl_store_fs_init_root(root, NULL, &cfg)) {
|
||||||
|
fprintf(stderr, "failed to init store root\n");
|
||||||
|
free(root);
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
memset(&store_ctx, 0, sizeof(store_ctx));
|
||||||
|
memset(&store, 0, sizeof(store));
|
||||||
|
if (!amduatd_store_init(&store,
|
||||||
|
&cfg,
|
||||||
|
&store_ctx,
|
||||||
|
root,
|
||||||
|
AMDUATD_STORE_BACKEND_INDEX)) {
|
||||||
|
fprintf(stderr, "failed to init store\n");
|
||||||
|
free(root);
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
if (!amduat_asl_pointer_store_init(&pointer_store, root)) {
|
||||||
|
fprintf(stderr, "failed to init pointer store\n");
|
||||||
|
free(root);
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
if (!amduatd_space_init(&space, "demo", false)) {
|
||||||
|
fprintf(stderr, "failed to init space\n");
|
||||||
|
free(root);
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
amduatd_fed_cfg_init(&fed_cfg);
|
||||||
|
fed_cfg.enabled = true;
|
||||||
|
|
||||||
|
if (!amduatd_test_pull_round_init(&store,
|
||||||
|
&rounds[0],
|
||||||
|
round0_payloads,
|
||||||
|
1u,
|
||||||
|
0u)) {
|
||||||
|
fprintf(stderr, "failed to init rounds\n");
|
||||||
|
free(root);
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
memset(&stub, 0, sizeof(stub));
|
||||||
|
stub.rounds = rounds;
|
||||||
|
stub.round_count = 1u;
|
||||||
|
stub.fail_round = 2u;
|
||||||
|
stub.fail_status = 500;
|
||||||
|
memset(&transport, 0, sizeof(transport));
|
||||||
|
transport.ctx = &stub;
|
||||||
|
transport.get_records = amduatd_test_pull_get_records;
|
||||||
|
transport.free_records = amduatd_test_pull_free_records;
|
||||||
|
transport.get_artifact = amduatd_test_pull_get_artifact;
|
||||||
|
|
||||||
|
status = amduatd_fed_pull_until(&store,
|
||||||
|
&pointer_store,
|
||||||
|
&space,
|
||||||
|
"1",
|
||||||
|
1u,
|
||||||
|
4u,
|
||||||
|
&fed_cfg,
|
||||||
|
&transport,
|
||||||
|
&report);
|
||||||
|
expect(status == AMDUATD_FED_PULL_APPLY_ERR_REMOTE, "pull until error");
|
||||||
|
expect(report.rounds_executed == 2u, "pull until error rounds");
|
||||||
|
expect(report.total_records == 1u, "pull until error records");
|
||||||
|
|
||||||
|
amduatd_fed_until_report_free(&report);
|
||||||
|
amduatd_test_pull_round_free(&rounds[0]);
|
||||||
|
free(root);
|
||||||
|
return failures == 0 ? 0 : 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
int main(void) {
|
||||||
|
if (amduatd_test_pull_until_zero() != 0) {
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
if (amduatd_test_pull_until_multi() != 0) {
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
if (amduatd_test_pull_until_error() != 0) {
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
return failures == 0 ? 0 : 1;
|
||||||
|
}
|
||||||
441
tests/test_amduatd_fed_push_until.c
Normal file
441
tests/test_amduatd_fed_push_until.c
Normal file
|
|
@ -0,0 +1,441 @@
|
||||||
|
#ifndef _POSIX_C_SOURCE
|
||||||
|
#define _POSIX_C_SOURCE 200809L
|
||||||
|
#endif
|
||||||
|
|
||||||
|
#include "amduatd_fed_until.h"
|
||||||
|
#include "amduatd_fed_cursor.h"
|
||||||
|
#include "amduatd_space.h"
|
||||||
|
#include "amduatd_store.h"
|
||||||
|
|
||||||
|
#include "amduat/asl/artifact_io.h"
|
||||||
|
#include "amduat/asl/asl_store_fs_meta.h"
|
||||||
|
#include "amduat/asl/log_store.h"
|
||||||
|
|
||||||
|
#include <stdbool.h>
|
||||||
|
#include <stdio.h>
|
||||||
|
#include <stdlib.h>
|
||||||
|
#include <string.h>
|
||||||
|
|
||||||
|
typedef struct {
|
||||||
|
size_t call_count;
|
||||||
|
size_t fail_at;
|
||||||
|
int fail_status;
|
||||||
|
} amduatd_test_push_transport_t;
|
||||||
|
|
||||||
|
static int failures = 0;
|
||||||
|
|
||||||
|
static void expect(bool cond, const char *msg) {
|
||||||
|
if (!cond) {
|
||||||
|
fprintf(stderr, "FAIL: %s\n", msg);
|
||||||
|
failures++;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static char *amduatd_test_make_temp_dir(void) {
|
||||||
|
char tmpl[] = "/tmp/amduatd-fed-push-until-XXXXXX";
|
||||||
|
char *dir = mkdtemp(tmpl);
|
||||||
|
size_t len;
|
||||||
|
char *copy;
|
||||||
|
if (dir == NULL) {
|
||||||
|
perror("mkdtemp");
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
len = strlen(dir);
|
||||||
|
copy = (char *)malloc(len + 1u);
|
||||||
|
if (copy == NULL) {
|
||||||
|
fprintf(stderr, "failed to allocate temp dir copy\n");
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
memcpy(copy, dir, len + 1u);
|
||||||
|
return copy;
|
||||||
|
}
|
||||||
|
|
||||||
|
static bool amduatd_test_store_artifact(amduat_asl_store_t *store,
|
||||||
|
const char *payload,
|
||||||
|
amduat_reference_t *out_ref) {
|
||||||
|
amduat_artifact_t artifact;
|
||||||
|
amduat_octets_t payload_bytes = amduat_octets(NULL, 0u);
|
||||||
|
amduat_asl_index_state_t state;
|
||||||
|
amduat_asl_store_error_t err;
|
||||||
|
|
||||||
|
if (store == NULL || payload == NULL || out_ref == NULL) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
if (!amduat_octets_clone(amduat_octets(payload, strlen(payload)),
|
||||||
|
&payload_bytes)) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
if (!amduat_asl_artifact_from_bytes(payload_bytes,
|
||||||
|
AMDUAT_ASL_IO_RAW,
|
||||||
|
false,
|
||||||
|
amduat_type_tag(0u),
|
||||||
|
&artifact)) {
|
||||||
|
amduat_octets_free(&payload_bytes);
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
err = amduat_asl_store_put_indexed(store, artifact, out_ref, &state);
|
||||||
|
amduat_asl_artifact_free(&artifact);
|
||||||
|
return err == AMDUAT_ASL_STORE_OK;
|
||||||
|
}
|
||||||
|
|
||||||
|
static bool amduatd_test_append_fed_log(amduat_asl_store_t *store,
|
||||||
|
amduat_asl_pointer_store_t *pointer_store,
|
||||||
|
const amduatd_space_t *space,
|
||||||
|
const char *root_path,
|
||||||
|
amduat_reference_t ref) {
|
||||||
|
amduat_asl_log_store_t log_store;
|
||||||
|
amduat_octets_t log_name = amduat_octets(NULL, 0u);
|
||||||
|
amduat_asl_log_entry_t entry;
|
||||||
|
uint64_t offset = 0u;
|
||||||
|
amduat_asl_store_error_t err;
|
||||||
|
|
||||||
|
if (!amduat_asl_log_store_init(&log_store,
|
||||||
|
root_path,
|
||||||
|
store,
|
||||||
|
pointer_store)) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
if (!amduatd_space_scope_name(space, "fed/records", &log_name)) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
memset(&entry, 0, sizeof(entry));
|
||||||
|
entry.kind = AMDUATD_FED_LOG_KIND_ARTIFACT;
|
||||||
|
entry.has_timestamp = false;
|
||||||
|
entry.timestamp = 0u;
|
||||||
|
entry.payload_ref = ref;
|
||||||
|
entry.has_actor = false;
|
||||||
|
entry.actor = amduat_octets(NULL, 0u);
|
||||||
|
err = amduat_asl_log_append(&log_store,
|
||||||
|
(const char *)log_name.data,
|
||||||
|
&entry,
|
||||||
|
1u,
|
||||||
|
&offset);
|
||||||
|
amduat_octets_free(&log_name);
|
||||||
|
return err == AMDUAT_ASL_STORE_OK;
|
||||||
|
}
|
||||||
|
|
||||||
|
static bool amduatd_test_push_post_ingest(void *ctx,
|
||||||
|
amduat_fed_record_type_t record_type,
|
||||||
|
amduat_reference_t ref,
|
||||||
|
amduat_octets_t bytes,
|
||||||
|
int *out_status,
|
||||||
|
char **out_body) {
|
||||||
|
amduatd_test_push_transport_t *t = (amduatd_test_push_transport_t *)ctx;
|
||||||
|
const char *status = "ok";
|
||||||
|
const char *applied = "true";
|
||||||
|
char buf[256];
|
||||||
|
int n;
|
||||||
|
|
||||||
|
(void)record_type;
|
||||||
|
(void)ref;
|
||||||
|
(void)bytes;
|
||||||
|
|
||||||
|
if (out_status == NULL || out_body == NULL || t == NULL) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
t->call_count++;
|
||||||
|
if (t->fail_at != 0u && t->call_count == t->fail_at) {
|
||||||
|
*out_status = t->fail_status;
|
||||||
|
*out_body = strdup("{\"status\":\"error\"}");
|
||||||
|
return *out_body != NULL;
|
||||||
|
}
|
||||||
|
n = snprintf(buf,
|
||||||
|
sizeof(buf),
|
||||||
|
"{\"status\":\"%s\",\"applied\":%s,"
|
||||||
|
"\"ref\":null,\"effective_space\":{"
|
||||||
|
"\"mode\":\"unscoped\",\"space_id\":null}}",
|
||||||
|
status,
|
||||||
|
applied);
|
||||||
|
if (n <= 0 || (size_t)n >= sizeof(buf)) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
*out_status = 200;
|
||||||
|
*out_body = strdup(buf);
|
||||||
|
return *out_body != NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int amduatd_test_push_until_zero(void) {
|
||||||
|
char *root = amduatd_test_make_temp_dir();
|
||||||
|
amduat_asl_store_fs_config_t cfg;
|
||||||
|
amduatd_store_ctx_t store_ctx;
|
||||||
|
amduat_asl_store_t store;
|
||||||
|
amduat_asl_pointer_store_t pointer_store;
|
||||||
|
amduatd_space_t space;
|
||||||
|
amduatd_fed_cfg_t fed_cfg;
|
||||||
|
amduatd_test_push_transport_t stub;
|
||||||
|
amduatd_fed_push_transport_t transport;
|
||||||
|
amduatd_fed_until_report_t report;
|
||||||
|
amduatd_fed_push_apply_status_t status;
|
||||||
|
|
||||||
|
if (root == NULL) {
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
memset(&cfg, 0, sizeof(cfg));
|
||||||
|
if (!amduat_asl_store_fs_init_root(root, NULL, &cfg)) {
|
||||||
|
fprintf(stderr, "failed to init store root\n");
|
||||||
|
free(root);
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
memset(&store_ctx, 0, sizeof(store_ctx));
|
||||||
|
memset(&store, 0, sizeof(store));
|
||||||
|
if (!amduatd_store_init(&store,
|
||||||
|
&cfg,
|
||||||
|
&store_ctx,
|
||||||
|
root,
|
||||||
|
AMDUATD_STORE_BACKEND_INDEX)) {
|
||||||
|
fprintf(stderr, "failed to init store\n");
|
||||||
|
free(root);
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
if (!amduat_asl_pointer_store_init(&pointer_store, root)) {
|
||||||
|
fprintf(stderr, "failed to init pointer store\n");
|
||||||
|
free(root);
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
if (!amduatd_space_init(&space, "demo", false)) {
|
||||||
|
fprintf(stderr, "failed to init space\n");
|
||||||
|
free(root);
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
amduatd_fed_cfg_init(&fed_cfg);
|
||||||
|
fed_cfg.enabled = true;
|
||||||
|
|
||||||
|
memset(&stub, 0, sizeof(stub));
|
||||||
|
memset(&transport, 0, sizeof(transport));
|
||||||
|
transport.ctx = &stub;
|
||||||
|
transport.post_ingest = amduatd_test_push_post_ingest;
|
||||||
|
|
||||||
|
status = amduatd_fed_push_until(&store,
|
||||||
|
&pointer_store,
|
||||||
|
&space,
|
||||||
|
"2",
|
||||||
|
8u,
|
||||||
|
3u,
|
||||||
|
root,
|
||||||
|
&fed_cfg,
|
||||||
|
&transport,
|
||||||
|
&report);
|
||||||
|
expect(status == AMDUATD_FED_PUSH_APPLY_OK, "push until zero ok");
|
||||||
|
expect(report.caught_up, "push until caught up");
|
||||||
|
expect(report.rounds_executed == 1u, "push until rounds executed");
|
||||||
|
expect(report.total_records == 0u, "push until records");
|
||||||
|
expect(report.total_artifacts == 0u, "push until artifacts");
|
||||||
|
|
||||||
|
amduatd_fed_until_report_free(&report);
|
||||||
|
free(root);
|
||||||
|
return failures == 0 ? 0 : 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int amduatd_test_push_until_multi(void) {
|
||||||
|
char *root = amduatd_test_make_temp_dir();
|
||||||
|
amduat_asl_store_fs_config_t cfg;
|
||||||
|
amduatd_store_ctx_t store_ctx;
|
||||||
|
amduat_asl_store_t store;
|
||||||
|
amduat_asl_pointer_store_t pointer_store;
|
||||||
|
amduatd_space_t space;
|
||||||
|
amduatd_fed_cfg_t fed_cfg;
|
||||||
|
amduat_reference_t ref0;
|
||||||
|
amduat_reference_t ref1;
|
||||||
|
amduatd_test_push_transport_t stub;
|
||||||
|
amduatd_fed_push_transport_t transport;
|
||||||
|
amduatd_fed_until_report_t report;
|
||||||
|
amduatd_fed_push_apply_status_t status;
|
||||||
|
|
||||||
|
if (root == NULL) {
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
memset(&cfg, 0, sizeof(cfg));
|
||||||
|
if (!amduat_asl_store_fs_init_root(root, NULL, &cfg)) {
|
||||||
|
fprintf(stderr, "failed to init store root\n");
|
||||||
|
free(root);
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
memset(&store_ctx, 0, sizeof(store_ctx));
|
||||||
|
memset(&store, 0, sizeof(store));
|
||||||
|
if (!amduatd_store_init(&store,
|
||||||
|
&cfg,
|
||||||
|
&store_ctx,
|
||||||
|
root,
|
||||||
|
AMDUATD_STORE_BACKEND_INDEX)) {
|
||||||
|
fprintf(stderr, "failed to init store\n");
|
||||||
|
free(root);
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
if (!amduat_asl_pointer_store_init(&pointer_store, root)) {
|
||||||
|
fprintf(stderr, "failed to init pointer store\n");
|
||||||
|
free(root);
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
if (!amduatd_space_init(&space, "demo", false)) {
|
||||||
|
fprintf(stderr, "failed to init space\n");
|
||||||
|
free(root);
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
amduatd_fed_cfg_init(&fed_cfg);
|
||||||
|
fed_cfg.enabled = true;
|
||||||
|
|
||||||
|
if (!amduatd_test_store_artifact(&store, "alpha", &ref0) ||
|
||||||
|
!amduatd_test_store_artifact(&store, "beta", &ref1)) {
|
||||||
|
fprintf(stderr, "failed to store artifacts\n");
|
||||||
|
free(root);
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
if (!amduatd_test_append_fed_log(&store,
|
||||||
|
&pointer_store,
|
||||||
|
&space,
|
||||||
|
root,
|
||||||
|
ref0) ||
|
||||||
|
!amduatd_test_append_fed_log(&store,
|
||||||
|
&pointer_store,
|
||||||
|
&space,
|
||||||
|
root,
|
||||||
|
ref1)) {
|
||||||
|
fprintf(stderr, "failed to append fed log\n");
|
||||||
|
amduat_reference_free(&ref0);
|
||||||
|
amduat_reference_free(&ref1);
|
||||||
|
free(root);
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
memset(&stub, 0, sizeof(stub));
|
||||||
|
memset(&transport, 0, sizeof(transport));
|
||||||
|
transport.ctx = &stub;
|
||||||
|
transport.post_ingest = amduatd_test_push_post_ingest;
|
||||||
|
|
||||||
|
status = amduatd_fed_push_until(&store,
|
||||||
|
&pointer_store,
|
||||||
|
&space,
|
||||||
|
"2",
|
||||||
|
1u,
|
||||||
|
5u,
|
||||||
|
root,
|
||||||
|
&fed_cfg,
|
||||||
|
&transport,
|
||||||
|
&report);
|
||||||
|
expect(status == AMDUATD_FED_PUSH_APPLY_OK, "push until multi ok");
|
||||||
|
expect(report.caught_up, "push until multi caught up");
|
||||||
|
expect(report.rounds_executed == 3u, "push until multi rounds");
|
||||||
|
expect(report.total_records == 2u, "push until multi records");
|
||||||
|
expect(report.total_artifacts == 2u, "push until multi artifacts");
|
||||||
|
|
||||||
|
amduat_reference_free(&ref0);
|
||||||
|
amduat_reference_free(&ref1);
|
||||||
|
amduatd_fed_until_report_free(&report);
|
||||||
|
free(root);
|
||||||
|
return failures == 0 ? 0 : 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int amduatd_test_push_until_error(void) {
|
||||||
|
char *root = amduatd_test_make_temp_dir();
|
||||||
|
amduat_asl_store_fs_config_t cfg;
|
||||||
|
amduatd_store_ctx_t store_ctx;
|
||||||
|
amduat_asl_store_t store;
|
||||||
|
amduat_asl_pointer_store_t pointer_store;
|
||||||
|
amduatd_space_t space;
|
||||||
|
amduatd_fed_cfg_t fed_cfg;
|
||||||
|
amduat_reference_t ref0;
|
||||||
|
amduat_reference_t ref1;
|
||||||
|
amduatd_test_push_transport_t stub;
|
||||||
|
amduatd_fed_push_transport_t transport;
|
||||||
|
amduatd_fed_until_report_t report;
|
||||||
|
amduatd_fed_push_apply_status_t status;
|
||||||
|
|
||||||
|
if (root == NULL) {
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
memset(&cfg, 0, sizeof(cfg));
|
||||||
|
if (!amduat_asl_store_fs_init_root(root, NULL, &cfg)) {
|
||||||
|
fprintf(stderr, "failed to init store root\n");
|
||||||
|
free(root);
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
memset(&store_ctx, 0, sizeof(store_ctx));
|
||||||
|
memset(&store, 0, sizeof(store));
|
||||||
|
if (!amduatd_store_init(&store,
|
||||||
|
&cfg,
|
||||||
|
&store_ctx,
|
||||||
|
root,
|
||||||
|
AMDUATD_STORE_BACKEND_INDEX)) {
|
||||||
|
fprintf(stderr, "failed to init store\n");
|
||||||
|
free(root);
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
if (!amduat_asl_pointer_store_init(&pointer_store, root)) {
|
||||||
|
fprintf(stderr, "failed to init pointer store\n");
|
||||||
|
free(root);
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
if (!amduatd_space_init(&space, "demo", false)) {
|
||||||
|
fprintf(stderr, "failed to init space\n");
|
||||||
|
free(root);
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
amduatd_fed_cfg_init(&fed_cfg);
|
||||||
|
fed_cfg.enabled = true;
|
||||||
|
|
||||||
|
if (!amduatd_test_store_artifact(&store, "alpha", &ref0) ||
|
||||||
|
!amduatd_test_store_artifact(&store, "beta", &ref1)) {
|
||||||
|
fprintf(stderr, "failed to store artifacts\n");
|
||||||
|
free(root);
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
if (!amduatd_test_append_fed_log(&store,
|
||||||
|
&pointer_store,
|
||||||
|
&space,
|
||||||
|
root,
|
||||||
|
ref0) ||
|
||||||
|
!amduatd_test_append_fed_log(&store,
|
||||||
|
&pointer_store,
|
||||||
|
&space,
|
||||||
|
root,
|
||||||
|
ref1)) {
|
||||||
|
fprintf(stderr, "failed to append fed log\n");
|
||||||
|
amduat_reference_free(&ref0);
|
||||||
|
amduat_reference_free(&ref1);
|
||||||
|
free(root);
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
memset(&stub, 0, sizeof(stub));
|
||||||
|
stub.fail_at = 2u;
|
||||||
|
stub.fail_status = 500;
|
||||||
|
memset(&transport, 0, sizeof(transport));
|
||||||
|
transport.ctx = &stub;
|
||||||
|
transport.post_ingest = amduatd_test_push_post_ingest;
|
||||||
|
|
||||||
|
status = amduatd_fed_push_until(&store,
|
||||||
|
&pointer_store,
|
||||||
|
&space,
|
||||||
|
"2",
|
||||||
|
1u,
|
||||||
|
4u,
|
||||||
|
root,
|
||||||
|
&fed_cfg,
|
||||||
|
&transport,
|
||||||
|
&report);
|
||||||
|
expect(status == AMDUATD_FED_PUSH_APPLY_ERR_REMOTE, "push until error");
|
||||||
|
expect(report.rounds_executed == 2u, "push until error rounds");
|
||||||
|
expect(report.total_records == 1u, "push until error records");
|
||||||
|
|
||||||
|
amduat_reference_free(&ref0);
|
||||||
|
amduat_reference_free(&ref1);
|
||||||
|
amduatd_fed_until_report_free(&report);
|
||||||
|
free(root);
|
||||||
|
return failures == 0 ? 0 : 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
int main(void) {
|
||||||
|
if (amduatd_test_push_until_zero() != 0) {
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
if (amduatd_test_push_until_multi() != 0) {
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
if (amduatd_test_push_until_error() != 0) {
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
return failures == 0 ? 0 : 1;
|
||||||
|
}
|
||||||
Loading…
Reference in a new issue