asl_store_index_fs: stream log snapshot/truncate
This commit is contained in:
parent
0667cee17d
commit
017bc87e04
|
|
@ -21,6 +21,7 @@ typedef struct {
|
||||||
typedef struct {
|
typedef struct {
|
||||||
amduat_asl_store_config_t config;
|
amduat_asl_store_config_t config;
|
||||||
amduat_asl_store_index_fs_snapshot_policy_t snapshot_policy;
|
amduat_asl_store_index_fs_snapshot_policy_t snapshot_policy;
|
||||||
|
uint16_t shard_count;
|
||||||
char root_path[AMDUAT_ASL_STORE_INDEX_FS_ROOT_MAX];
|
char root_path[AMDUAT_ASL_STORE_INDEX_FS_ROOT_MAX];
|
||||||
uint64_t pending_snapshot_bytes;
|
uint64_t pending_snapshot_bytes;
|
||||||
uint64_t last_ingest_time_ns;
|
uint64_t last_ingest_time_ns;
|
||||||
|
|
@ -36,6 +37,10 @@ void amduat_asl_store_index_fs_set_snapshot_policy(
|
||||||
amduat_asl_store_index_fs_t *fs,
|
amduat_asl_store_index_fs_t *fs,
|
||||||
amduat_asl_store_index_fs_snapshot_policy_t policy);
|
amduat_asl_store_index_fs_snapshot_policy_t policy);
|
||||||
|
|
||||||
|
void amduat_asl_store_index_fs_set_shard_count(
|
||||||
|
amduat_asl_store_index_fs_t *fs,
|
||||||
|
uint16_t shard_count);
|
||||||
|
|
||||||
amduat_asl_store_error_t amduat_asl_store_index_fs_snapshot_create(
|
amduat_asl_store_error_t amduat_asl_store_index_fs_snapshot_create(
|
||||||
amduat_asl_store_index_fs_t *fs,
|
amduat_asl_store_index_fs_t *fs,
|
||||||
amduat_asl_snapshot_id_t snapshot_id,
|
amduat_asl_snapshot_id_t snapshot_id,
|
||||||
|
|
|
||||||
File diff suppressed because it is too large
Load diff
|
|
@ -121,6 +121,155 @@ bool amduat_asl_store_index_fs_layout_build_log_path(const char *root_path,
|
||||||
return ok;
|
return ok;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
bool amduat_asl_store_index_fs_layout_build_shards_path(
|
||||||
|
const char *root_path,
|
||||||
|
char **out_path) {
|
||||||
|
char *index_path;
|
||||||
|
bool ok;
|
||||||
|
|
||||||
|
if (!amduat_asl_store_index_fs_layout_build_index_path(root_path,
|
||||||
|
&index_path)) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
ok = amduat_asl_store_index_fs_layout_join(index_path, "shards", out_path);
|
||||||
|
free(index_path);
|
||||||
|
return ok;
|
||||||
|
}
|
||||||
|
|
||||||
|
bool amduat_asl_store_index_fs_layout_build_shard_path(
|
||||||
|
const char *root_path,
|
||||||
|
uint16_t shard_id,
|
||||||
|
char **out_path) {
|
||||||
|
char *shards_path;
|
||||||
|
char *name;
|
||||||
|
bool ok;
|
||||||
|
|
||||||
|
if (!amduat_asl_store_index_fs_layout_build_shards_path(root_path,
|
||||||
|
&shards_path)) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
if (!amduat_asl_store_index_fs_layout_format_id("shard-",
|
||||||
|
(uint64_t)shard_id,
|
||||||
|
"",
|
||||||
|
&name)) {
|
||||||
|
free(shards_path);
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
ok = amduat_asl_store_index_fs_layout_join(shards_path, name, out_path);
|
||||||
|
free(name);
|
||||||
|
free(shards_path);
|
||||||
|
return ok;
|
||||||
|
}
|
||||||
|
|
||||||
|
bool amduat_asl_store_index_fs_layout_build_shard_segments_path(
|
||||||
|
const char *root_path,
|
||||||
|
uint16_t shard_id,
|
||||||
|
char **out_path) {
|
||||||
|
char *shard_path;
|
||||||
|
bool ok;
|
||||||
|
|
||||||
|
if (!amduat_asl_store_index_fs_layout_build_shard_path(root_path,
|
||||||
|
shard_id,
|
||||||
|
&shard_path)) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
ok = amduat_asl_store_index_fs_layout_join(shard_path, "segments", out_path);
|
||||||
|
free(shard_path);
|
||||||
|
return ok;
|
||||||
|
}
|
||||||
|
|
||||||
|
bool amduat_asl_store_index_fs_layout_build_shard_blocks_path(
|
||||||
|
const char *root_path,
|
||||||
|
uint16_t shard_id,
|
||||||
|
char **out_path) {
|
||||||
|
char *shard_path;
|
||||||
|
bool ok;
|
||||||
|
|
||||||
|
if (!amduat_asl_store_index_fs_layout_build_shard_path(root_path,
|
||||||
|
shard_id,
|
||||||
|
&shard_path)) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
ok = amduat_asl_store_index_fs_layout_join(shard_path, "blocks", out_path);
|
||||||
|
free(shard_path);
|
||||||
|
return ok;
|
||||||
|
}
|
||||||
|
|
||||||
|
bool amduat_asl_store_index_fs_layout_build_shard_segment_meta_path(
|
||||||
|
const char *root_path,
|
||||||
|
uint16_t shard_id,
|
||||||
|
char **out_path) {
|
||||||
|
char *segments_path;
|
||||||
|
bool ok;
|
||||||
|
|
||||||
|
if (!amduat_asl_store_index_fs_layout_build_shard_segments_path(root_path,
|
||||||
|
shard_id,
|
||||||
|
&segments_path)) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
ok = amduat_asl_store_index_fs_layout_join(segments_path,
|
||||||
|
"next_id",
|
||||||
|
out_path);
|
||||||
|
free(segments_path);
|
||||||
|
return ok;
|
||||||
|
}
|
||||||
|
|
||||||
|
bool amduat_asl_store_index_fs_layout_build_shard_segment_path(
|
||||||
|
const char *root_path,
|
||||||
|
uint16_t shard_id,
|
||||||
|
uint64_t segment_id,
|
||||||
|
char **out_path) {
|
||||||
|
char *segments_path;
|
||||||
|
char *name;
|
||||||
|
bool ok;
|
||||||
|
|
||||||
|
if (!amduat_asl_store_index_fs_layout_build_shard_segments_path(root_path,
|
||||||
|
shard_id,
|
||||||
|
&segments_path)) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
if (!amduat_asl_store_index_fs_layout_format_id("segment-",
|
||||||
|
segment_id,
|
||||||
|
".asl",
|
||||||
|
&name)) {
|
||||||
|
free(segments_path);
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
ok = amduat_asl_store_index_fs_layout_join(segments_path, name, out_path);
|
||||||
|
free(name);
|
||||||
|
free(segments_path);
|
||||||
|
return ok;
|
||||||
|
}
|
||||||
|
|
||||||
|
bool amduat_asl_store_index_fs_layout_build_shard_block_path(
|
||||||
|
const char *root_path,
|
||||||
|
uint16_t shard_id,
|
||||||
|
uint64_t block_id,
|
||||||
|
char **out_path) {
|
||||||
|
char *blocks_path;
|
||||||
|
char *name;
|
||||||
|
bool ok;
|
||||||
|
|
||||||
|
if (!amduat_asl_store_index_fs_layout_build_shard_blocks_path(root_path,
|
||||||
|
shard_id,
|
||||||
|
&blocks_path)) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
if (!amduat_asl_store_index_fs_layout_format_id("block-",
|
||||||
|
block_id,
|
||||||
|
".asl",
|
||||||
|
&name)) {
|
||||||
|
free(blocks_path);
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
ok = amduat_asl_store_index_fs_layout_join(blocks_path, name, out_path);
|
||||||
|
free(name);
|
||||||
|
free(blocks_path);
|
||||||
|
return ok;
|
||||||
|
}
|
||||||
|
|
||||||
bool amduat_asl_store_index_fs_layout_build_snapshots_path(
|
bool amduat_asl_store_index_fs_layout_build_snapshots_path(
|
||||||
const char *root_path,
|
const char *root_path,
|
||||||
char **out_path) {
|
char **out_path) {
|
||||||
|
|
|
||||||
|
|
@ -21,6 +21,42 @@ bool amduat_asl_store_index_fs_layout_build_blocks_path(const char *root_path,
|
||||||
bool amduat_asl_store_index_fs_layout_build_log_path(const char *root_path,
|
bool amduat_asl_store_index_fs_layout_build_log_path(const char *root_path,
|
||||||
char **out_path);
|
char **out_path);
|
||||||
|
|
||||||
|
bool amduat_asl_store_index_fs_layout_build_shards_path(
|
||||||
|
const char *root_path,
|
||||||
|
char **out_path);
|
||||||
|
|
||||||
|
bool amduat_asl_store_index_fs_layout_build_shard_path(
|
||||||
|
const char *root_path,
|
||||||
|
uint16_t shard_id,
|
||||||
|
char **out_path);
|
||||||
|
|
||||||
|
bool amduat_asl_store_index_fs_layout_build_shard_segments_path(
|
||||||
|
const char *root_path,
|
||||||
|
uint16_t shard_id,
|
||||||
|
char **out_path);
|
||||||
|
|
||||||
|
bool amduat_asl_store_index_fs_layout_build_shard_blocks_path(
|
||||||
|
const char *root_path,
|
||||||
|
uint16_t shard_id,
|
||||||
|
char **out_path);
|
||||||
|
|
||||||
|
bool amduat_asl_store_index_fs_layout_build_shard_segment_meta_path(
|
||||||
|
const char *root_path,
|
||||||
|
uint16_t shard_id,
|
||||||
|
char **out_path);
|
||||||
|
|
||||||
|
bool amduat_asl_store_index_fs_layout_build_shard_segment_path(
|
||||||
|
const char *root_path,
|
||||||
|
uint16_t shard_id,
|
||||||
|
uint64_t segment_id,
|
||||||
|
char **out_path);
|
||||||
|
|
||||||
|
bool amduat_asl_store_index_fs_layout_build_shard_block_path(
|
||||||
|
const char *root_path,
|
||||||
|
uint16_t shard_id,
|
||||||
|
uint64_t block_id,
|
||||||
|
char **out_path);
|
||||||
|
|
||||||
bool amduat_asl_store_index_fs_layout_build_snapshots_path(
|
bool amduat_asl_store_index_fs_layout_build_snapshots_path(
|
||||||
const char *root_path,
|
const char *root_path,
|
||||||
char **out_path);
|
char **out_path);
|
||||||
|
|
|
||||||
|
|
@ -42,6 +42,8 @@ typedef struct {
|
||||||
const char *store_id;
|
const char *store_id;
|
||||||
const char *profile;
|
const char *profile;
|
||||||
const char *hash;
|
const char *hash;
|
||||||
|
uint16_t shard_count;
|
||||||
|
bool has_shard_count;
|
||||||
bool force;
|
bool force;
|
||||||
bool quiet;
|
bool quiet;
|
||||||
} amduat_asl_cli_init_opts_t;
|
} amduat_asl_cli_init_opts_t;
|
||||||
|
|
@ -97,7 +99,8 @@ static void amduat_asl_cli_print_usage(FILE *stream) {
|
||||||
" amduat-asl log inspect [--root PATH]\n"
|
" amduat-asl log inspect [--root PATH]\n"
|
||||||
" amduat-asl index init [--root PATH] [--store-id ID]\n"
|
" amduat-asl index init [--root PATH] [--store-id ID]\n"
|
||||||
" [--profile PROFILE_ID|name]\n"
|
" [--profile PROFILE_ID|name]\n"
|
||||||
" [--hash HASH_ID|name] [--force] [--quiet]\n"
|
" [--hash HASH_ID|name] [--shards N]\n"
|
||||||
|
" [--force] [--quiet]\n"
|
||||||
" amduat-asl index state [--root PATH]\n"
|
" amduat-asl index state [--root PATH]\n"
|
||||||
" amduat-asl segment verify [--root PATH] [--segment ID]\n"
|
" amduat-asl segment verify [--root PATH] [--segment ID]\n"
|
||||||
"\n"
|
"\n"
|
||||||
|
|
@ -274,6 +277,19 @@ static bool amduat_asl_cli_build_blocks_path(const char *root_path,
|
||||||
return ok;
|
return ok;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static bool amduat_asl_cli_build_index_config_path(const char *root_path,
|
||||||
|
char **out_path) {
|
||||||
|
char *index_path;
|
||||||
|
bool ok;
|
||||||
|
|
||||||
|
if (!amduat_asl_cli_build_index_path(root_path, &index_path)) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
ok = amduat_asl_cli_join_path(index_path, "config", out_path);
|
||||||
|
free(index_path);
|
||||||
|
return ok;
|
||||||
|
}
|
||||||
|
|
||||||
static bool amduat_asl_cli_build_segment_path(const char *root_path,
|
static bool amduat_asl_cli_build_segment_path(const char *root_path,
|
||||||
uint64_t segment_id,
|
uint64_t segment_id,
|
||||||
char **out_path) {
|
char **out_path) {
|
||||||
|
|
@ -326,9 +342,134 @@ static bool amduat_asl_cli_is_index_store_root(const char *root_path) {
|
||||||
return ok;
|
return ok;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static bool amduat_asl_cli_parse_u16(const char *value, uint16_t *out) {
|
||||||
|
char *endptr;
|
||||||
|
unsigned long parsed;
|
||||||
|
|
||||||
|
if (value == NULL || out == NULL) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
parsed = strtoul(value, &endptr, 10);
|
||||||
|
if (endptr == value || *endptr != '\0') {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
if (parsed == 0 || parsed > UINT16_MAX) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
*out = (uint16_t)parsed;
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
static bool amduat_asl_cli_read_index_shards(const char *root_path,
|
||||||
|
uint16_t *out_shards) {
|
||||||
|
char *config_path;
|
||||||
|
uint8_t *bytes;
|
||||||
|
size_t len;
|
||||||
|
char *text;
|
||||||
|
char *token;
|
||||||
|
bool ok;
|
||||||
|
uint16_t shards;
|
||||||
|
|
||||||
|
if (out_shards == NULL) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
*out_shards = 1u;
|
||||||
|
|
||||||
|
if (!amduat_asl_cli_build_index_config_path(root_path, &config_path)) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
if (!amduat_asl_cli_path_is_file(config_path)) {
|
||||||
|
free(config_path);
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
bytes = NULL;
|
||||||
|
len = 0u;
|
||||||
|
if (!amduat_asl_read_path(config_path, &bytes, &len)) {
|
||||||
|
free(config_path);
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
free(config_path);
|
||||||
|
|
||||||
|
text = (char *)malloc(len + 1u);
|
||||||
|
if (text == NULL) {
|
||||||
|
free(bytes);
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
memcpy(text, bytes, len);
|
||||||
|
text[len] = '\0';
|
||||||
|
free(bytes);
|
||||||
|
|
||||||
|
token = strtok(text, " \t\r\n");
|
||||||
|
if (token == NULL || strcmp(token, "amduat-asl-index-v1") != 0) {
|
||||||
|
free(text);
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
ok = false;
|
||||||
|
shards = 1u;
|
||||||
|
while ((token = strtok(NULL, " \t\r\n")) != NULL) {
|
||||||
|
if (strncmp(token, "shards=", 7u) == 0) {
|
||||||
|
if (!amduat_asl_cli_parse_u16(token + 7u, &shards)) {
|
||||||
|
free(text);
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
ok = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
free(text);
|
||||||
|
if (!ok) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
*out_shards = shards;
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
static bool amduat_asl_cli_write_index_config(const char *root_path,
|
||||||
|
uint16_t shard_count,
|
||||||
|
bool force) {
|
||||||
|
char *config_path;
|
||||||
|
char buffer[128];
|
||||||
|
int len;
|
||||||
|
uint16_t existing_shards;
|
||||||
|
|
||||||
|
if (root_path == NULL || shard_count == 0u) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
if (!amduat_asl_cli_build_index_config_path(root_path, &config_path)) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!force && amduat_asl_cli_path_is_file(config_path)) {
|
||||||
|
if (!amduat_asl_cli_read_index_shards(root_path, &existing_shards)) {
|
||||||
|
free(config_path);
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
free(config_path);
|
||||||
|
return existing_shards == shard_count;
|
||||||
|
}
|
||||||
|
|
||||||
|
len = snprintf(buffer,
|
||||||
|
sizeof(buffer),
|
||||||
|
"amduat-asl-index-v1\nshards=%" PRIu16 "\n",
|
||||||
|
shard_count);
|
||||||
|
if (len <= 0 || (size_t)len >= sizeof(buffer)) {
|
||||||
|
free(config_path);
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
if (!amduat_asl_write_path(config_path, (const uint8_t *)buffer,
|
||||||
|
(size_t)len)) {
|
||||||
|
free(config_path);
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
free(config_path);
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
static bool amduat_asl_cli_open_store(const char *root_path,
|
static bool amduat_asl_cli_open_store(const char *root_path,
|
||||||
amduat_asl_cli_store_ctx_t *out_ctx) {
|
amduat_asl_cli_store_ctx_t *out_ctx) {
|
||||||
bool use_index;
|
bool use_index;
|
||||||
|
uint16_t shard_count;
|
||||||
|
|
||||||
if (root_path == NULL || out_ctx == NULL) {
|
if (root_path == NULL || out_ctx == NULL) {
|
||||||
return false;
|
return false;
|
||||||
|
|
@ -342,11 +483,15 @@ static bool amduat_asl_cli_open_store(const char *root_path,
|
||||||
use_index = amduat_asl_cli_is_index_store_root(root_path);
|
use_index = amduat_asl_cli_is_index_store_root(root_path);
|
||||||
out_ctx->is_index = use_index;
|
out_ctx->is_index = use_index;
|
||||||
if (use_index) {
|
if (use_index) {
|
||||||
|
if (!amduat_asl_cli_read_index_shards(root_path, &shard_count)) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
if (!amduat_asl_store_index_fs_init(&out_ctx->index_fs,
|
if (!amduat_asl_store_index_fs_init(&out_ctx->index_fs,
|
||||||
out_ctx->cfg.config,
|
out_ctx->cfg.config,
|
||||||
root_path)) {
|
root_path)) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
amduat_asl_store_index_fs_set_shard_count(&out_ctx->index_fs, shard_count);
|
||||||
amduat_asl_store_init(&out_ctx->store,
|
amduat_asl_store_init(&out_ctx->store,
|
||||||
out_ctx->cfg.config,
|
out_ctx->cfg.config,
|
||||||
amduat_asl_store_index_fs_ops(),
|
amduat_asl_store_index_fs_ops(),
|
||||||
|
|
@ -1169,6 +1314,7 @@ static int amduat_asl_cli_cmd_index(int argc, char **argv) {
|
||||||
if (strcmp(argv[0], "init") == 0) {
|
if (strcmp(argv[0], "init") == 0) {
|
||||||
memset(&opts, 0, sizeof(opts));
|
memset(&opts, 0, sizeof(opts));
|
||||||
opts.root = AMDUAT_ASL_CLI_DEFAULT_ROOT;
|
opts.root = AMDUAT_ASL_CLI_DEFAULT_ROOT;
|
||||||
|
opts.shard_count = 1u;
|
||||||
|
|
||||||
for (i = 1; i < (size_t)argc; ++i) {
|
for (i = 1; i < (size_t)argc; ++i) {
|
||||||
if (strcmp(argv[i], "--root") == 0) {
|
if (strcmp(argv[i], "--root") == 0) {
|
||||||
|
|
@ -1195,6 +1341,16 @@ static int amduat_asl_cli_cmd_index(int argc, char **argv) {
|
||||||
return AMDUAT_ASL_CLI_EXIT_USAGE;
|
return AMDUAT_ASL_CLI_EXIT_USAGE;
|
||||||
}
|
}
|
||||||
opts.hash = argv[++i];
|
opts.hash = argv[++i];
|
||||||
|
} else if (strcmp(argv[i], "--shards") == 0) {
|
||||||
|
if (i + 1 >= (size_t)argc) {
|
||||||
|
fprintf(stderr, "error: --shards requires a value\n");
|
||||||
|
return AMDUAT_ASL_CLI_EXIT_USAGE;
|
||||||
|
}
|
||||||
|
if (!amduat_asl_cli_parse_u16(argv[++i], &opts.shard_count)) {
|
||||||
|
fprintf(stderr, "error: invalid shard count\n");
|
||||||
|
return AMDUAT_ASL_CLI_EXIT_USAGE;
|
||||||
|
}
|
||||||
|
opts.has_shard_count = true;
|
||||||
} else if (strcmp(argv[i], "--force") == 0) {
|
} else if (strcmp(argv[i], "--force") == 0) {
|
||||||
opts.force = true;
|
opts.force = true;
|
||||||
} else if (strcmp(argv[i], "--quiet") == 0) {
|
} else if (strcmp(argv[i], "--quiet") == 0) {
|
||||||
|
|
@ -1277,6 +1433,13 @@ static int amduat_asl_cli_cmd_index(int argc, char **argv) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (!amduat_asl_cli_write_index_config(root,
|
||||||
|
opts.shard_count,
|
||||||
|
opts.force)) {
|
||||||
|
fprintf(stderr, "error: failed to write index config\n");
|
||||||
|
return AMDUAT_ASL_CLI_EXIT_IO;
|
||||||
|
}
|
||||||
|
|
||||||
if (!amduat_asl_cli_write_empty_log(root)) {
|
if (!amduat_asl_cli_write_empty_log(root)) {
|
||||||
fprintf(stderr, "error: failed to initialize log file\n");
|
fprintf(stderr, "error: failed to initialize log file\n");
|
||||||
return AMDUAT_ASL_CLI_EXIT_IO;
|
return AMDUAT_ASL_CLI_EXIT_IO;
|
||||||
|
|
@ -1319,6 +1482,14 @@ static int amduat_asl_cli_cmd_index(int argc, char **argv) {
|
||||||
fprintf(stderr, "error: failed to initialize index store\n");
|
fprintf(stderr, "error: failed to initialize index store\n");
|
||||||
return AMDUAT_ASL_CLI_EXIT_CONFIG;
|
return AMDUAT_ASL_CLI_EXIT_CONFIG;
|
||||||
}
|
}
|
||||||
|
{
|
||||||
|
uint16_t shard_count = 1u;
|
||||||
|
if (!amduat_asl_cli_read_index_shards(root, &shard_count)) {
|
||||||
|
fprintf(stderr, "error: failed to load index config\n");
|
||||||
|
return AMDUAT_ASL_CLI_EXIT_CONFIG;
|
||||||
|
}
|
||||||
|
amduat_asl_store_index_fs_set_shard_count(&index_fs, shard_count);
|
||||||
|
}
|
||||||
amduat_asl_store_init(&store, cfg.config,
|
amduat_asl_store_init(&store, cfg.config,
|
||||||
amduat_asl_store_index_fs_ops(), &index_fs);
|
amduat_asl_store_index_fs_ops(), &index_fs);
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -1,5 +1,6 @@
|
||||||
#include "amduat/asl/asl_store_index_fs.h"
|
#include "amduat/asl/asl_store_index_fs.h"
|
||||||
#include "amduat/asl/index_bloom.h"
|
#include "amduat/asl/index_bloom.h"
|
||||||
|
#include "amduat/asl/ref_derive.h"
|
||||||
#include "amduat/asl/store.h"
|
#include "amduat/asl/store.h"
|
||||||
#include "amduat/enc/asl1_core.h"
|
#include "amduat/enc/asl1_core.h"
|
||||||
#include "amduat/enc/asl_core_index.h"
|
#include "amduat/enc/asl_core_index.h"
|
||||||
|
|
@ -148,6 +149,22 @@ static bool read_file(const char *path, uint8_t **out_bytes, size_t *out_len) {
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static bool build_log_path(const char *root, char **out_path) {
|
||||||
|
char *index_path = NULL;
|
||||||
|
bool ok;
|
||||||
|
|
||||||
|
if (root == NULL || out_path == NULL) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!join_path(root, "index", &index_path)) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
ok = join_path(index_path, "log.asl", out_path);
|
||||||
|
free(index_path);
|
||||||
|
return ok;
|
||||||
|
}
|
||||||
|
|
||||||
static bool build_segment_path(const char *root,
|
static bool build_segment_path(const char *root,
|
||||||
uint64_t segment_id,
|
uint64_t segment_id,
|
||||||
char **out_path) {
|
char **out_path) {
|
||||||
|
|
@ -180,6 +197,71 @@ static bool build_segment_path(const char *root,
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static bool build_shard_segment_path(const char *root,
|
||||||
|
uint16_t shard_id,
|
||||||
|
uint64_t segment_id,
|
||||||
|
char **out_path) {
|
||||||
|
int needed;
|
||||||
|
char *buffer;
|
||||||
|
|
||||||
|
if (root == NULL || out_path == NULL) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
needed = snprintf(NULL,
|
||||||
|
0,
|
||||||
|
"%s/index/shards/shard-%016" PRIx64
|
||||||
|
"/segments/segment-%016" PRIx64 ".asl",
|
||||||
|
root,
|
||||||
|
(uint64_t)shard_id,
|
||||||
|
segment_id);
|
||||||
|
if (needed <= 0) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
buffer = (char *)malloc((size_t)needed + 1u);
|
||||||
|
if (buffer == NULL) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
snprintf(buffer,
|
||||||
|
(size_t)needed + 1u,
|
||||||
|
"%s/index/shards/shard-%016" PRIx64
|
||||||
|
"/segments/segment-%016" PRIx64 ".asl",
|
||||||
|
root,
|
||||||
|
(uint64_t)shard_id,
|
||||||
|
segment_id);
|
||||||
|
*out_path = buffer;
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
static uint64_t fnv1a64(const uint8_t *data, size_t len, uint64_t seed) {
|
||||||
|
size_t i;
|
||||||
|
uint64_t hash = seed;
|
||||||
|
|
||||||
|
for (i = 0; i < len; ++i) {
|
||||||
|
hash ^= data[i];
|
||||||
|
hash *= 1099511628211ull;
|
||||||
|
}
|
||||||
|
return hash;
|
||||||
|
}
|
||||||
|
|
||||||
|
static uint16_t ref_shard(amduat_reference_t ref, uint16_t shard_count) {
|
||||||
|
uint64_t hash;
|
||||||
|
uint8_t hash_id_bytes[2];
|
||||||
|
|
||||||
|
if (shard_count == 0u) {
|
||||||
|
return 0u;
|
||||||
|
}
|
||||||
|
hash_id_bytes[0] = (uint8_t)(ref.hash_id & 0xffu);
|
||||||
|
hash_id_bytes[1] = (uint8_t)((ref.hash_id >> 8) & 0xffu);
|
||||||
|
hash = fnv1a64(hash_id_bytes, sizeof(hash_id_bytes),
|
||||||
|
14695981039346656037ull);
|
||||||
|
if (ref.digest.len != 0u && ref.digest.data != NULL) {
|
||||||
|
hash = fnv1a64(ref.digest.data, ref.digest.len, hash);
|
||||||
|
}
|
||||||
|
return (uint16_t)(hash % shard_count);
|
||||||
|
}
|
||||||
|
|
||||||
static char *make_temp_root(void) {
|
static char *make_temp_root(void) {
|
||||||
char *templ;
|
char *templ;
|
||||||
const char template_prefix[] = "/tmp/amduat_test_asl_store_index_fs_XXXXXX";
|
const char template_prefix[] = "/tmp/amduat_test_asl_store_index_fs_XXXXXX";
|
||||||
|
|
@ -334,6 +416,264 @@ cleanup:
|
||||||
return exit_code;
|
return exit_code;
|
||||||
}
|
}
|
||||||
|
|
||||||
int main(void) {
|
static int test_shard_routing(void) {
|
||||||
return test_round_trip();
|
amduat_asl_store_config_t config;
|
||||||
|
amduat_asl_store_index_fs_t fs;
|
||||||
|
amduat_asl_store_t store;
|
||||||
|
char *root;
|
||||||
|
uint8_t payload_a[2] = {0x10, 0x11};
|
||||||
|
uint8_t payload_b[2] = {0x20, 0x21};
|
||||||
|
amduat_reference_t ref_a;
|
||||||
|
amduat_reference_t ref_b;
|
||||||
|
amduat_octets_t bytes_a;
|
||||||
|
amduat_octets_t bytes_b;
|
||||||
|
amduat_hash_id_t hash_id;
|
||||||
|
uint16_t shard_a;
|
||||||
|
uint16_t shard_b;
|
||||||
|
uint64_t local_counts[2] = {0u, 0u};
|
||||||
|
int exit_code = 1;
|
||||||
|
size_t i;
|
||||||
|
bool found = false;
|
||||||
|
|
||||||
|
root = make_temp_root();
|
||||||
|
if (root == NULL) {
|
||||||
|
fprintf(stderr, "temp root failed\n");
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
memset(&config, 0, sizeof(config));
|
||||||
|
config.encoding_profile_id = AMDUAT_ENC_ASL1_CORE_V1;
|
||||||
|
config.hash_id = AMDUAT_HASH_ASL1_ID_SHA256;
|
||||||
|
if (!amduat_asl_store_index_fs_init(&fs, config, root)) {
|
||||||
|
fprintf(stderr, "index fs init failed\n");
|
||||||
|
goto cleanup;
|
||||||
|
}
|
||||||
|
amduat_asl_store_index_fs_set_shard_count(&fs, 2);
|
||||||
|
amduat_asl_store_init(&store, config, amduat_asl_store_index_fs_ops(), &fs);
|
||||||
|
|
||||||
|
hash_id = config.hash_id;
|
||||||
|
ref_a = amduat_reference(0u, amduat_octets(NULL, 0u));
|
||||||
|
ref_b = amduat_reference(0u, amduat_octets(NULL, 0u));
|
||||||
|
bytes_a = amduat_octets(NULL, 0u);
|
||||||
|
bytes_b = amduat_octets(NULL, 0u);
|
||||||
|
|
||||||
|
for (i = 0; i < 256; ++i) {
|
||||||
|
payload_a[0] = (uint8_t)i;
|
||||||
|
payload_b[0] = (uint8_t)(255u - i);
|
||||||
|
amduat_reference_free(&ref_a);
|
||||||
|
amduat_reference_free(&ref_b);
|
||||||
|
amduat_octets_free(&bytes_a);
|
||||||
|
amduat_octets_free(&bytes_b);
|
||||||
|
if (!amduat_asl_ref_derive(amduat_artifact(amduat_octets(payload_a,
|
||||||
|
sizeof(payload_a))),
|
||||||
|
AMDUAT_ENC_ASL1_CORE_V1,
|
||||||
|
hash_id,
|
||||||
|
&ref_a,
|
||||||
|
&bytes_a) ||
|
||||||
|
!amduat_asl_ref_derive(amduat_artifact(amduat_octets(payload_b,
|
||||||
|
sizeof(payload_b))),
|
||||||
|
AMDUAT_ENC_ASL1_CORE_V1,
|
||||||
|
hash_id,
|
||||||
|
&ref_b,
|
||||||
|
&bytes_b)) {
|
||||||
|
fprintf(stderr, "ref derive failed\n");
|
||||||
|
goto cleanup;
|
||||||
|
}
|
||||||
|
shard_a = ref_shard(ref_a, 2);
|
||||||
|
shard_b = ref_shard(ref_b, 2);
|
||||||
|
if (shard_a != shard_b) {
|
||||||
|
found = true;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!found) {
|
||||||
|
fprintf(stderr, "shard selection failed\n");
|
||||||
|
goto cleanup;
|
||||||
|
}
|
||||||
|
|
||||||
|
{
|
||||||
|
amduat_reference_t stored_ref;
|
||||||
|
amduat_asl_index_state_t state;
|
||||||
|
uint64_t segment_id;
|
||||||
|
char *segment_path = NULL;
|
||||||
|
amduat_asl_store_error_t err;
|
||||||
|
|
||||||
|
stored_ref = amduat_reference(0u, amduat_octets(NULL, 0u));
|
||||||
|
err = amduat_asl_store_put_indexed(&store,
|
||||||
|
amduat_artifact(amduat_octets(payload_a,
|
||||||
|
sizeof(payload_a))),
|
||||||
|
&stored_ref,
|
||||||
|
&state);
|
||||||
|
if (err != AMDUAT_ASL_STORE_OK) {
|
||||||
|
fprintf(stderr, "put shard a failed\n");
|
||||||
|
amduat_reference_free(&stored_ref);
|
||||||
|
goto cleanup;
|
||||||
|
}
|
||||||
|
amduat_reference_free(&stored_ref);
|
||||||
|
local_counts[shard_a] += 1u;
|
||||||
|
segment_id = ((uint64_t)shard_a << 48) | local_counts[shard_a];
|
||||||
|
if (!build_shard_segment_path(root, shard_a, segment_id, &segment_path)) {
|
||||||
|
fprintf(stderr, "segment path failed\n");
|
||||||
|
goto cleanup;
|
||||||
|
}
|
||||||
|
if (access(segment_path, F_OK) != 0) {
|
||||||
|
fprintf(stderr, "segment missing in shard a\n");
|
||||||
|
free(segment_path);
|
||||||
|
goto cleanup;
|
||||||
|
}
|
||||||
|
free(segment_path);
|
||||||
|
}
|
||||||
|
|
||||||
|
{
|
||||||
|
amduat_reference_t stored_ref;
|
||||||
|
amduat_asl_index_state_t state;
|
||||||
|
uint64_t segment_id;
|
||||||
|
char *segment_path = NULL;
|
||||||
|
amduat_asl_store_error_t err;
|
||||||
|
|
||||||
|
stored_ref = amduat_reference(0u, amduat_octets(NULL, 0u));
|
||||||
|
err = amduat_asl_store_put_indexed(&store,
|
||||||
|
amduat_artifact(amduat_octets(payload_b,
|
||||||
|
sizeof(payload_b))),
|
||||||
|
&stored_ref,
|
||||||
|
&state);
|
||||||
|
if (err != AMDUAT_ASL_STORE_OK) {
|
||||||
|
fprintf(stderr, "put shard b failed\n");
|
||||||
|
amduat_reference_free(&stored_ref);
|
||||||
|
goto cleanup;
|
||||||
|
}
|
||||||
|
amduat_reference_free(&stored_ref);
|
||||||
|
local_counts[shard_b] += 1u;
|
||||||
|
segment_id = ((uint64_t)shard_b << 48) | local_counts[shard_b];
|
||||||
|
if (!build_shard_segment_path(root, shard_b, segment_id, &segment_path)) {
|
||||||
|
fprintf(stderr, "segment path failed\n");
|
||||||
|
goto cleanup;
|
||||||
|
}
|
||||||
|
if (access(segment_path, F_OK) != 0) {
|
||||||
|
fprintf(stderr, "segment missing in shard b\n");
|
||||||
|
free(segment_path);
|
||||||
|
goto cleanup;
|
||||||
|
}
|
||||||
|
free(segment_path);
|
||||||
|
}
|
||||||
|
|
||||||
|
exit_code = 0;
|
||||||
|
|
||||||
|
cleanup:
|
||||||
|
amduat_reference_free(&ref_a);
|
||||||
|
amduat_reference_free(&ref_b);
|
||||||
|
amduat_octets_free(&bytes_a);
|
||||||
|
amduat_octets_free(&bytes_b);
|
||||||
|
if (!remove_tree(root)) {
|
||||||
|
fprintf(stderr, "cleanup failed\n");
|
||||||
|
exit_code = 1;
|
||||||
|
}
|
||||||
|
free(root);
|
||||||
|
return exit_code;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int test_snapshot_truncation(void) {
|
||||||
|
amduat_asl_store_config_t config;
|
||||||
|
amduat_asl_store_index_fs_t fs;
|
||||||
|
amduat_asl_store_t store;
|
||||||
|
amduat_asl_store_error_t err;
|
||||||
|
amduat_asl_index_state_t state;
|
||||||
|
amduat_artifact_t artifact;
|
||||||
|
amduat_artifact_t loaded;
|
||||||
|
amduat_reference_t ref;
|
||||||
|
uint8_t payload[8];
|
||||||
|
char *root;
|
||||||
|
char *log_path = NULL;
|
||||||
|
uint8_t *log_before = NULL;
|
||||||
|
uint8_t *log_after = NULL;
|
||||||
|
size_t log_before_len = 0u;
|
||||||
|
size_t log_after_len = 0u;
|
||||||
|
int exit_code = 1;
|
||||||
|
|
||||||
|
root = make_temp_root();
|
||||||
|
if (root == NULL) {
|
||||||
|
fprintf(stderr, "temp root failed\n");
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
memset(&config, 0, sizeof(config));
|
||||||
|
config.encoding_profile_id = AMDUAT_ENC_ASL1_CORE_V1;
|
||||||
|
config.hash_id = AMDUAT_HASH_ASL1_ID_SHA256;
|
||||||
|
if (!amduat_asl_store_index_fs_init(&fs, config, root)) {
|
||||||
|
fprintf(stderr, "index fs init failed\n");
|
||||||
|
goto cleanup;
|
||||||
|
}
|
||||||
|
amduat_asl_store_init(&store, config, amduat_asl_store_index_fs_ops(), &fs);
|
||||||
|
|
||||||
|
memset(payload, 0x33, sizeof(payload));
|
||||||
|
artifact = amduat_artifact(amduat_octets(payload, sizeof(payload)));
|
||||||
|
ref = amduat_reference(0u, amduat_octets(NULL, 0u));
|
||||||
|
err = amduat_asl_store_put_indexed(&store, artifact, &ref, &state);
|
||||||
|
if (err != AMDUAT_ASL_STORE_OK) {
|
||||||
|
fprintf(stderr, "put_indexed failed: %d\n", err);
|
||||||
|
goto cleanup;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!build_log_path(root, &log_path) ||
|
||||||
|
!read_file(log_path, &log_before, &log_before_len)) {
|
||||||
|
fprintf(stderr, "log read before snapshot failed\n");
|
||||||
|
goto cleanup;
|
||||||
|
}
|
||||||
|
|
||||||
|
err = amduat_asl_store_index_fs_snapshot_create(&fs, 1u, NULL, NULL);
|
||||||
|
if (err != AMDUAT_ASL_STORE_OK) {
|
||||||
|
fprintf(stderr, "snapshot create failed: %d\n", err);
|
||||||
|
goto cleanup;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!read_file(log_path, &log_after, &log_after_len)) {
|
||||||
|
fprintf(stderr, "log read after snapshot failed\n");
|
||||||
|
goto cleanup;
|
||||||
|
}
|
||||||
|
if (log_after_len >= log_before_len) {
|
||||||
|
fprintf(stderr, "log did not shrink after snapshot\n");
|
||||||
|
goto cleanup;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!amduat_asl_index_current_state(&store, &state)) {
|
||||||
|
fprintf(stderr, "current_state failed\n");
|
||||||
|
goto cleanup;
|
||||||
|
}
|
||||||
|
loaded = amduat_artifact(amduat_octets(NULL, 0u));
|
||||||
|
err = amduat_asl_store_get_indexed(&store, ref, state, &loaded);
|
||||||
|
if (err != AMDUAT_ASL_STORE_OK) {
|
||||||
|
fprintf(stderr, "get_indexed after snapshot failed: %d\n", err);
|
||||||
|
goto cleanup;
|
||||||
|
}
|
||||||
|
if (!amduat_artifact_eq(artifact, loaded)) {
|
||||||
|
fprintf(stderr, "artifact mismatch after snapshot\n");
|
||||||
|
amduat_artifact_free(&loaded);
|
||||||
|
goto cleanup;
|
||||||
|
}
|
||||||
|
amduat_artifact_free(&loaded);
|
||||||
|
|
||||||
|
exit_code = 0;
|
||||||
|
|
||||||
|
cleanup:
|
||||||
|
free(log_path);
|
||||||
|
free(log_before);
|
||||||
|
free(log_after);
|
||||||
|
amduat_reference_free(&ref);
|
||||||
|
if (!remove_tree(root)) {
|
||||||
|
fprintf(stderr, "cleanup failed\n");
|
||||||
|
exit_code = 1;
|
||||||
|
}
|
||||||
|
free(root);
|
||||||
|
return exit_code;
|
||||||
|
}
|
||||||
|
|
||||||
|
int main(void) {
|
||||||
|
if (test_round_trip() != 0) {
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
if (test_snapshot_truncation() != 0) {
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
return test_shard_routing();
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue