amduat-api/federation/transport_unix.c

904 lines
23 KiB
C
Raw Normal View History

2026-01-21 19:51:26 +01:00
#define _POSIX_C_SOURCE 200809L
#include "federation/transport_unix.h"
#include "amduat/asl/ref_text.h"
#include <errno.h>
#include <stdbool.h>
#include <stddef.h>
#include <stdint.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/socket.h>
#include <sys/un.h>
#include <unistd.h>
static const char *amduat_fed_json_skip_ws(const char *p, const char *end) {
while (p < end) {
if (*p == ' ' || *p == '\n' || *p == '\r' || *p == '\t') {
p++;
continue;
}
break;
}
return p;
}
static bool amduat_fed_json_expect(const char **p, const char *end, char c) {
const char *cur;
if (p == NULL || *p == NULL) {
return false;
}
cur = amduat_fed_json_skip_ws(*p, end);
if (cur >= end || *cur != c) {
return false;
}
*p = cur + 1;
return true;
}
static bool amduat_fed_json_parse_string_noesc(const char **p,
const char *end,
const char **out_str,
size_t *out_len) {
const char *cur;
const char *start;
if (p == NULL || *p == NULL || out_str == NULL || out_len == NULL) {
return false;
}
cur = amduat_fed_json_skip_ws(*p, end);
if (cur >= end || *cur != '"') {
return false;
}
start = ++cur;
while (cur < end && *cur != '"') {
if (*cur == '\\') {
return false;
}
cur++;
}
if (cur >= end) {
return false;
}
*out_str = start;
*out_len = (size_t)(cur - start);
*p = cur + 1;
return true;
}
static bool amduat_fed_json_parse_u64(const char **p,
const char *end,
uint64_t *out) {
const char *cur;
char *next = NULL;
uint64_t v;
if (p == NULL || *p == NULL || out == NULL) {
return false;
}
cur = amduat_fed_json_skip_ws(*p, end);
if (cur >= end) {
return false;
}
errno = 0;
v = (uint64_t)strtoull(cur, &next, 10);
if (errno != 0 || next == cur) {
return false;
}
*out = v;
*p = next;
return true;
}
static bool amduat_fed_json_parse_u32(const char **p,
const char *end,
uint32_t *out) {
uint64_t tmp = 0;
if (!amduat_fed_json_parse_u64(p, end, &tmp)) {
return false;
}
if (tmp > UINT32_MAX) {
return false;
}
*out = (uint32_t)tmp;
return true;
}
static bool amduat_fed_json_parse_bool(const char **p,
const char *end,
bool *out) {
const char *cur;
if (p == NULL || *p == NULL || out == NULL) {
return false;
}
cur = amduat_fed_json_skip_ws(*p, end);
if (cur + 4 <= end && strncmp(cur, "true", 4) == 0) {
*out = true;
*p = cur + 4;
return true;
}
if (cur + 5 <= end && strncmp(cur, "false", 5) == 0) {
*out = false;
*p = cur + 5;
return true;
}
return false;
}
static bool amduat_fed_json_skip_string(const char **p, const char *end) {
const char *cur;
if (p == NULL || *p == NULL) {
return false;
}
cur = amduat_fed_json_skip_ws(*p, end);
if (cur >= end || *cur != '"') {
return false;
}
cur++;
while (cur < end && *cur != '"') {
if (*cur == '\\') {
return false;
}
cur++;
}
if (cur >= end) {
return false;
}
*p = cur + 1;
return true;
}
static bool amduat_fed_json_skip_value(const char **p, const char *end, int depth);
static bool amduat_fed_json_skip_array(const char **p,
const char *end,
int depth) {
const char *cur;
if (!amduat_fed_json_expect(p, end, '[')) {
return false;
}
cur = amduat_fed_json_skip_ws(*p, end);
if (cur < end && *cur == ']') {
*p = cur + 1;
return true;
}
for (;;) {
if (!amduat_fed_json_skip_value(p, end, depth + 1)) {
return false;
}
cur = amduat_fed_json_skip_ws(*p, end);
if (cur >= end) {
return false;
}
if (*cur == ',') {
*p = cur + 1;
continue;
}
if (*cur == ']') {
*p = cur + 1;
return true;
}
return false;
}
}
static bool amduat_fed_json_skip_object(const char **p,
const char *end,
int depth) {
const char *cur;
if (!amduat_fed_json_expect(p, end, '{')) {
return false;
}
cur = amduat_fed_json_skip_ws(*p, end);
if (cur < end && *cur == '}') {
*p = cur + 1;
return true;
}
for (;;) {
if (!amduat_fed_json_skip_string(p, end)) {
return false;
}
if (!amduat_fed_json_expect(p, end, ':')) {
return false;
}
if (!amduat_fed_json_skip_value(p, end, depth + 1)) {
return false;
}
cur = amduat_fed_json_skip_ws(*p, end);
if (cur >= end) {
return false;
}
if (*cur == ',') {
*p = cur + 1;
continue;
}
if (*cur == '}') {
*p = cur + 1;
return true;
}
return false;
}
}
static bool amduat_fed_json_skip_value(const char **p,
const char *end,
int depth) {
const char *cur;
if (depth > 64) {
return false;
}
cur = amduat_fed_json_skip_ws(*p, end);
if (cur >= end) {
return false;
}
if (*cur == '"') {
return amduat_fed_json_skip_string(p, end);
}
if (*cur == '{') {
return amduat_fed_json_skip_object(p, end, depth);
}
if (*cur == '[') {
return amduat_fed_json_skip_array(p, end, depth);
}
if (strncmp(cur, "true", 4) == 0) {
*p = cur + 4;
return true;
}
if (strncmp(cur, "false", 5) == 0) {
*p = cur + 5;
return true;
}
if (strncmp(cur, "null", 4) == 0) {
*p = cur + 4;
return true;
}
if ((*cur >= '0' && *cur <= '9') || *cur == '-') {
char *next = NULL;
(void)strtoull(cur, &next, 10);
if (next == cur) {
return false;
}
*p = next;
return true;
}
return false;
}
static bool amduat_fed_transport_parse_record(const char **p,
const char *end,
uint32_t default_domain_id,
amduat_fed_record_t *out) {
const char *key = NULL;
size_t key_len = 0;
const char *sv = NULL;
size_t sv_len = 0;
uint32_t domain_id = default_domain_id;
uint32_t type = 0;
uint32_t visibility = 0;
uint32_t source_domain = 0;
bool has_domain = false;
bool has_type = false;
bool has_ref = false;
bool has_logseq = false;
bool has_snapshot = false;
bool has_log_prefix = false;
bool has_source = false;
amduat_reference_t ref;
memset(&ref, 0, sizeof(ref));
if (!amduat_fed_json_expect(p, end, '{')) {
return false;
}
for (;;) {
const char *cur = amduat_fed_json_skip_ws(*p, end);
if (cur < end && *cur == '}') {
*p = cur + 1;
break;
}
if (!amduat_fed_json_parse_string_noesc(p, end, &key, &key_len) ||
!amduat_fed_json_expect(p, end, ':')) {
return false;
}
if (key_len == strlen("domain_id") &&
memcmp(key, "domain_id", key_len) == 0) {
if (!amduat_fed_json_parse_u32(p, end, &domain_id)) {
return false;
}
has_domain = true;
} else if (key_len == strlen("type") &&
memcmp(key, "type", key_len) == 0) {
if (!amduat_fed_json_parse_u32(p, end, &type)) {
return false;
}
has_type = true;
} else if (key_len == strlen("ref") &&
memcmp(key, "ref", key_len) == 0) {
if (!amduat_fed_json_parse_string_noesc(p, end, &sv, &sv_len)) {
return false;
}
{
char *tmp = (char *)malloc(sv_len + 1u);
if (tmp == NULL) {
return false;
}
memcpy(tmp, sv, sv_len);
tmp[sv_len] = '\0';
if (!amduat_asl_ref_decode_hex(tmp, &ref)) {
free(tmp);
return false;
}
free(tmp);
}
has_ref = true;
} else if (key_len == strlen("logseq") &&
memcmp(key, "logseq", key_len) == 0) {
if (!amduat_fed_json_parse_u64(p, end, &out->logseq)) {
return false;
}
has_logseq = true;
} else if (key_len == strlen("snapshot_id") &&
memcmp(key, "snapshot_id", key_len) == 0) {
if (!amduat_fed_json_parse_u64(p, end, &out->snapshot_id)) {
return false;
}
has_snapshot = true;
} else if (key_len == strlen("log_prefix") &&
memcmp(key, "log_prefix", key_len) == 0) {
if (!amduat_fed_json_parse_u64(p, end, &out->log_prefix)) {
return false;
}
has_log_prefix = true;
} else if (key_len == strlen("visibility") &&
memcmp(key, "visibility", key_len) == 0) {
if (!amduat_fed_json_parse_u32(p, end, &visibility)) {
return false;
}
} else if (key_len == strlen("has_source") &&
memcmp(key, "has_source", key_len) == 0) {
bool tmp = false;
if (!amduat_fed_json_parse_bool(p, end, &tmp)) {
return false;
}
has_source = tmp;
} else if (key_len == strlen("source_domain") &&
memcmp(key, "source_domain", key_len) == 0) {
if (!amduat_fed_json_parse_u32(p, end, &source_domain)) {
return false;
}
} else {
if (!amduat_fed_json_skip_value(p, end, 0)) {
return false;
}
}
{
const char *cur = amduat_fed_json_skip_ws(*p, end);
if (cur >= end) {
return false;
}
if (*cur == ',') {
*p = cur + 1;
continue;
}
if (*cur == '}') {
*p = cur + 1;
break;
}
return false;
}
}
if (!has_ref) {
return false;
}
if (!has_type || !has_logseq || !has_snapshot || !has_log_prefix) {
amduat_reference_free(&ref);
return false;
}
out->meta.domain_id = domain_id;
out->meta.visibility = (uint8_t)visibility;
out->meta.has_source = has_source ? 1u : 0u;
out->meta.source_domain = source_domain;
out->id.type = (amduat_fed_record_type_t)type;
out->id.ref = ref;
(void)has_domain;
return true;
}
static bool amduat_fed_transport_parse_records(const char *body,
size_t body_len,
amduat_fed_record_t **out_records,
size_t *out_len) {
const char *p = body;
const char *end = body + body_len;
const char *key = NULL;
size_t key_len = 0;
uint32_t domain_id = 0;
uint64_t snapshot_id = 0;
uint64_t log_prefix = 0;
uint64_t next_logseq = 0;
bool have_domain = false;
bool have_snapshot = false;
bool have_log_prefix = false;
bool have_next = false;
amduat_fed_record_t *records = NULL;
size_t records_len = 0;
size_t records_cap = 0;
if (out_records == NULL || out_len == NULL) {
return false;
}
*out_records = NULL;
*out_len = 0;
if (!amduat_fed_json_expect(&p, end, '{')) {
return false;
}
for (;;) {
const char *cur = amduat_fed_json_skip_ws(p, end);
if (cur < end && *cur == '}') {
p = cur + 1;
break;
}
if (!amduat_fed_json_parse_string_noesc(&p, end, &key, &key_len) ||
!amduat_fed_json_expect(&p, end, ':')) {
goto parse_fail;
}
if (key_len == strlen("domain_id") &&
memcmp(key, "domain_id", key_len) == 0) {
if (!amduat_fed_json_parse_u32(&p, end, &domain_id)) {
goto parse_fail;
}
have_domain = true;
} else if (key_len == strlen("snapshot_id") &&
memcmp(key, "snapshot_id", key_len) == 0) {
if (!amduat_fed_json_parse_u64(&p, end, &snapshot_id)) {
goto parse_fail;
}
have_snapshot = true;
} else if (key_len == strlen("log_prefix") &&
memcmp(key, "log_prefix", key_len) == 0) {
if (!amduat_fed_json_parse_u64(&p, end, &log_prefix)) {
goto parse_fail;
}
have_log_prefix = true;
} else if (key_len == strlen("next_logseq") &&
memcmp(key, "next_logseq", key_len) == 0) {
if (!amduat_fed_json_parse_u64(&p, end, &next_logseq)) {
goto parse_fail;
}
have_next = true;
} else if (key_len == strlen("records") &&
memcmp(key, "records", key_len) == 0) {
if (!amduat_fed_json_expect(&p, end, '[')) {
goto parse_fail;
}
cur = amduat_fed_json_skip_ws(p, end);
if (cur < end && *cur == ']') {
p = cur + 1;
} else {
for (;;) {
amduat_fed_record_t record;
memset(&record, 0, sizeof(record));
if (!amduat_fed_transport_parse_record(&p, end, domain_id, &record)) {
goto parse_fail;
}
if (records_len == records_cap) {
size_t next_cap = records_cap != 0 ? records_cap * 2u : 64u;
amduat_fed_record_t *next =
(amduat_fed_record_t *)realloc(
records, next_cap * sizeof(*records));
if (next == NULL) {
amduat_reference_free(&record.id.ref);
goto parse_fail;
}
records = next;
records_cap = next_cap;
}
records[records_len++] = record;
cur = amduat_fed_json_skip_ws(p, end);
if (cur < end && *cur == ',') {
p = cur + 1;
continue;
}
if (cur < end && *cur == ']') {
p = cur + 1;
break;
}
goto parse_fail;
}
}
} else {
if (!amduat_fed_json_skip_value(&p, end, 0)) {
goto parse_fail;
}
}
cur = amduat_fed_json_skip_ws(p, end);
if (cur >= end) {
goto parse_fail;
}
if (*cur == ',') {
p = cur + 1;
continue;
}
if (*cur == '}') {
p = cur + 1;
break;
}
goto parse_fail;
}
if (!have_domain || !have_snapshot || !have_log_prefix || !have_next) {
goto parse_fail;
}
(void)snapshot_id;
(void)log_prefix;
(void)next_logseq;
*out_records = records;
*out_len = records_len;
return true;
parse_fail:
if (records != NULL) {
size_t i;
for (i = 0; i < records_len; ++i) {
amduat_reference_free(&records[i].id.ref);
}
free(records);
}
return false;
}
static int amduat_fed_transport_unix_connect(const char *path) {
int fd;
struct sockaddr_un addr;
if (path == NULL || path[0] == '\0') {
return -1;
}
fd = socket(AF_UNIX, SOCK_STREAM, 0);
if (fd < 0) {
return -1;
}
memset(&addr, 0, sizeof(addr));
addr.sun_family = AF_UNIX;
strncpy(addr.sun_path, path, sizeof(addr.sun_path) - 1);
if (connect(fd, (struct sockaddr *)&addr, sizeof(addr)) != 0) {
close(fd);
return -1;
}
return fd;
}
static bool amduat_fed_transport_unix_send_all(int fd,
const char *buf,
size_t len) {
size_t off = 0;
while (off < len) {
ssize_t n = write(fd, buf + off, len - off);
if (n < 0) {
if (errno == EINTR) {
continue;
}
return false;
}
off += (size_t)n;
}
return true;
}
static bool amduat_fed_transport_unix_read_all(int fd,
uint8_t **out_buf,
size_t *out_len) {
uint8_t *buf = NULL;
size_t len = 0;
size_t cap = 0;
if (out_buf == NULL || out_len == NULL) {
return false;
}
for (;;) {
uint8_t tmp[4096];
ssize_t n = read(fd, tmp, sizeof(tmp));
if (n < 0) {
if (errno == EINTR) {
continue;
}
free(buf);
return false;
}
if (n == 0) {
break;
}
if (len + (size_t)n > cap) {
size_t next_cap = cap != 0 ? cap * 2u : 8192u;
while (next_cap < len + (size_t)n) {
next_cap *= 2u;
}
{
uint8_t *next = (uint8_t *)realloc(buf, next_cap);
if (next == NULL) {
free(buf);
return false;
}
buf = next;
cap = next_cap;
}
}
memcpy(buf + len, tmp, (size_t)n);
len += (size_t)n;
}
*out_buf = buf;
*out_len = len;
return true;
}
static bool amduat_fed_transport_unix_split_response(const uint8_t *buf,
size_t len,
const uint8_t **out_body,
size_t *out_body_len,
int *out_status) {
size_t i;
const uint8_t *body = NULL;
if (out_body == NULL || out_body_len == NULL || out_status == NULL) {
return false;
}
*out_body = NULL;
*out_body_len = 0;
*out_status = 0;
if (buf == NULL || len < 12) {
return false;
}
if (memcmp(buf, "HTTP/1.", 7) != 0) {
return false;
}
for (i = 0; i + 3 < len; ++i) {
if (buf[i] == '\r' && buf[i + 1] == '\n' &&
buf[i + 2] == '\r' && buf[i + 3] == '\n') {
body = buf + i + 4u;
*out_body = body;
*out_body_len = len - (i + 4u);
break;
}
}
if (body == NULL) {
return false;
}
{
int status = 0;
if (sscanf((const char *)buf, "HTTP/1.%*c %d", &status) != 1) {
return false;
}
*out_status = status;
}
return true;
}
static int amduat_fed_transport_unix_get_records(void *ctx,
uint32_t domain_id,
uint64_t snapshot_id,
uint64_t log_prefix,
uint64_t from_logseq,
amduat_fed_record_t **out_records,
size_t *out_len) {
amduat_fed_transport_unix_t *transport = (amduat_fed_transport_unix_t *)ctx;
char req[2048];
char space_header[AMDUAT_ASL_POINTER_NAME_MAX + 32u];
const char *space_line = "";
2026-01-21 19:51:26 +01:00
int fd;
uint8_t *buf = NULL;
size_t buf_len = 0;
const uint8_t *body = NULL;
size_t body_len = 0;
int status = 0;
(void)snapshot_id;
(void)log_prefix;
if (transport == NULL || out_records == NULL || out_len == NULL) {
return -1;
}
*out_records = NULL;
*out_len = 0;
if (transport->has_space) {
snprintf(space_header,
sizeof(space_header),
"X-Amduat-Space: %s\r\n",
transport->space_id);
space_line = space_header;
}
2026-01-21 19:51:26 +01:00
snprintf(req, sizeof(req),
"GET /v1/fed/records?domain_id=%u&from_logseq=%llu HTTP/1.1\r\n"
"Host: localhost\r\n"
"%s"
2026-01-21 19:51:26 +01:00
"Connection: close\r\n"
"\r\n",
(unsigned int)domain_id,
(unsigned long long)from_logseq,
space_line);
2026-01-21 19:51:26 +01:00
fd = amduat_fed_transport_unix_connect(transport->socket_path);
if (fd < 0) {
return -1;
}
if (!amduat_fed_transport_unix_send_all(fd, req, strlen(req))) {
close(fd);
return -1;
}
if (!amduat_fed_transport_unix_read_all(fd, &buf, &buf_len)) {
close(fd);
return -1;
}
close(fd);
if (!amduat_fed_transport_unix_split_response(buf,
buf_len,
&body,
&body_len,
&status)) {
free(buf);
return -1;
}
if (status != 200) {
free(buf);
return -1;
}
if (!amduat_fed_transport_parse_records((const char *)body,
body_len,
out_records,
out_len)) {
free(buf);
return -1;
}
free(buf);
return 0;
}
static void amduat_fed_transport_unix_free_records(void *ctx,
amduat_fed_record_t *records,
size_t len) {
size_t i;
(void)ctx;
if (records == NULL) {
return;
}
for (i = 0; i < len; ++i) {
amduat_reference_free(&records[i].id.ref);
}
free(records);
}
static int amduat_fed_transport_unix_get_artifact(void *ctx,
amduat_reference_t ref,
amduat_octets_t *out_bytes) {
amduat_fed_transport_unix_t *transport = (amduat_fed_transport_unix_t *)ctx;
char *ref_hex = NULL;
char req[2048];
char space_header[AMDUAT_ASL_POINTER_NAME_MAX + 32u];
const char *space_line = "";
2026-01-21 19:51:26 +01:00
int fd;
uint8_t *buf = NULL;
size_t buf_len = 0;
const uint8_t *body = NULL;
size_t body_len = 0;
int status = 0;
bool ok;
if (transport == NULL || out_bytes == NULL) {
return -1;
}
*out_bytes = amduat_octets(NULL, 0u);
if (!amduat_asl_ref_encode_hex(ref, &ref_hex)) {
return -1;
}
if (transport->has_space) {
snprintf(space_header,
sizeof(space_header),
"X-Amduat-Space: %s\r\n",
transport->space_id);
space_line = space_header;
}
2026-01-21 19:51:26 +01:00
snprintf(req, sizeof(req),
"GET /v1/fed/artifacts/%s HTTP/1.1\r\n"
"Host: localhost\r\n"
"%s"
2026-01-21 19:51:26 +01:00
"Connection: close\r\n"
"\r\n",
ref_hex,
space_line);
2026-01-21 19:51:26 +01:00
free(ref_hex);
fd = amduat_fed_transport_unix_connect(transport->socket_path);
if (fd < 0) {
return -1;
}
ok = amduat_fed_transport_unix_send_all(fd, req, strlen(req));
if (!ok) {
close(fd);
return -1;
}
if (!amduat_fed_transport_unix_read_all(fd, &buf, &buf_len)) {
close(fd);
return -1;
}
close(fd);
if (!amduat_fed_transport_unix_split_response(buf,
buf_len,
&body,
&body_len,
&status)) {
free(buf);
return -1;
}
if (status != 200) {
free(buf);
return -1;
}
if (!amduat_octets_clone(amduat_octets(body, body_len), out_bytes)) {
free(buf);
return -1;
}
free(buf);
return 0;
}
bool amduat_fed_transport_unix_init(amduat_fed_transport_unix_t *transport,
const char *socket_path) {
if (transport == NULL || socket_path == NULL) {
return false;
}
if (strlen(socket_path) >= AMDUAT_FED_TRANSPORT_UNIX_PATH_MAX) {
return false;
}
memset(transport->socket_path, 0, sizeof(transport->socket_path));
strncpy(transport->socket_path, socket_path,
AMDUAT_FED_TRANSPORT_UNIX_PATH_MAX - 1u);
memset(transport->space_id, 0, sizeof(transport->space_id));
transport->has_space = false;
return true;
}
bool amduat_fed_transport_unix_set_space(amduat_fed_transport_unix_t *transport,
const char *space_id) {
size_t len;
size_t i;
if (transport == NULL) {
return false;
}
memset(transport->space_id, 0, sizeof(transport->space_id));
transport->has_space = false;
if (space_id == NULL || space_id[0] == '\0') {
return true;
}
len = strlen(space_id);
if (len >= sizeof(transport->space_id)) {
return false;
}
for (i = 0; i < len; ++i) {
if (space_id[i] == '\r' || space_id[i] == '\n') {
return false;
}
}
memcpy(transport->space_id, space_id, len);
transport->has_space = true;
2026-01-21 19:51:26 +01:00
return true;
}
amduat_fed_transport_t amduat_fed_transport_unix_ops(
amduat_fed_transport_unix_t *transport) {
amduat_fed_transport_t ops;
memset(&ops, 0, sizeof(ops));
ops.ctx = transport;
ops.get_records = amduat_fed_transport_unix_get_records;
ops.free_records = amduat_fed_transport_unix_free_records;
ops.get_artifact = amduat_fed_transport_unix_get_artifact;
return ops;
}