amduat-api/notes/Operator Pseudocode (ASL + TGK Execution Plan).md
2026-01-17 00:19:49 +01:00

270 lines
7.8 KiB
Markdown
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

Absolutely — heres a **formal draft of operator pseudocode** for the unified ASL + TGK execution plan. This pseudocode captures **shard/SIMD acceleration, deterministic execution, snapshot safety, and DAG traversal**, mapping directly to ENC-ASL-TGK-INDEX.
---
# Operator Pseudocode (ASL + TGK Execution Plan)
---
## 1. SegmentScan Operator
```c
void SegmentScan(segment_t *segment, snapshot_range_t snapshot, record_buffer_t *out_buffer) {
// Iterate over all records in the segment
for (uint64_t i = 0; i < segment->record_count; i++) {
record_t rec = segment->records[i];
// Snapshot visibility
if (rec.logseq > snapshot.logseq_max) continue;
// Append to output buffer
out_buffer->records[out_buffer->count++] = rec;
}
}
```
* Can be **shard-parallelized**
* Works for both **ASL artifacts** and **TGK edges**
* `record_buffer_t` is preallocated, SIMD-aligned
---
## 2. IndexFilter Operator
```c
void IndexFilter(record_buffer_t *in_buffer, filter_t *filter, record_buffer_t *out_buffer) {
for (uint64_t i = 0; i < in_buffer->count; i += SIMD_WIDTH) {
simd_mask_t mask = SIMD_TRUE;
// SIMD filter artifact type
if (filter->has_type_tag) {
mask &= SIMD_EQ(in_buffer->type_tags[i:i+SIMD_WIDTH], filter->artifact_type_tag);
}
// SIMD filter edge type
if (filter->has_edge_type) {
mask &= SIMD_EQ(in_buffer->edge_type_keys[i:i+SIMD_WIDTH], filter->edge_type_key);
}
// SIMD role filter (for TGK edges)
if (filter->role) {
mask &= SIMD_EQ(in_buffer->roles[i:i+SIMD_WIDTH], filter->role);
}
// Store passing records
SIMD_STORE_MASKED(in_buffer->records[i:i+SIMD_WIDTH], mask, out_buffer->records);
}
out_buffer->count = count_masked_records(out_buffer);
}
```
* SIMD ensures **parallel, vectorized evaluation**
* Deterministic since order preserved
---
## 3. Merge Operator
```c
void Merge(record_buffer_t **inputs, int num_inputs, record_buffer_t *out_buffer) {
min_heap_t heap = build_heap(inputs, num_inputs);
while (!heap_empty(heap)) {
record_t rec = heap_pop(heap);
out_buffer->records[out_buffer->count++] = rec;
// Advance from the source buffer
heap_advance_source(heap, rec.source_buffer_id);
}
}
```
* Uses **logseq ascending + canonical ID tie-breaker**
* Deterministic across shards
---
## 4. TGKTraversal Operator
```c
void TGKTraversal(record_buffer_t *in_buffer, uint32_t depth, snapshot_range_t snapshot, record_buffer_t *out_buffer) {
record_buffer_t current_buffer = *in_buffer;
for (uint32_t d = 0; d < depth; d++) {
record_buffer_t next_buffer = allocate_buffer();
for (uint64_t i = 0; i < current_buffer.count; i++) {
record_t rec = current_buffer.records[i];
// Skip if not visible
if (rec.logseq > snapshot.logseq_max) continue;
// Expand edges deterministically
for (uint64_t j = 0; j < rec.to_count; j++) {
record_t edge = lookup_edge(rec.to_nodes[j]);
next_buffer.records[next_buffer.count++] = edge;
}
}
// Merge deterministically
MergeBuffers(&next_buffer, 1, out_buffer);
// Prepare for next depth
current_buffer = next_buffer;
}
}
```
* Expansion **per depth**
* Deterministic ordering guaranteed
* Snapshot-safe traversal
---
## 5. Projection Operator
```c
void Projection(record_buffer_t *in_buffer, projection_mask_t mask, record_buffer_t *out_buffer) {
for (uint64_t i = 0; i < in_buffer->count; i++) {
record_t rec = in_buffer->records[i];
projected_record_t prow;
if (mask.project_artifact_id) prow.artifact_id = rec.artifact_id;
if (mask.project_tgk_edge_id) prow.tgk_edge_id = rec.tgk_edge_id;
if (mask.project_node_id) prow.node_id = rec.node_id;
if (mask.project_type_tag) prow.type_tag = rec.type_tag;
out_buffer->records[out_buffer->count++] = prow;
}
}
```
---
## 6. Aggregation Operator
```c
void Aggregation(record_buffer_t *in_buffer, aggregation_accumulator_t *acc) {
for (uint64_t i = 0; i < in_buffer->count; i++) {
record_t rec = in_buffer->records[i];
if (acc->count_enabled) acc->count++;
if (acc->sum_type_tag_enabled) acc->sum_type_tag += rec.type_tag;
if (acc->union_enabled) acc->union_set.insert(rec.artifact_id);
}
}
```
* Deterministic aggregation across shards due to **pre-merged, ordered input**
---
## 7. TombstoneShadow Operator
```c
void TombstoneShadow(record_buffer_t *in_buffer, snapshot_range_t snapshot, record_buffer_t *out_buffer) {
hashmap_t latest_per_id;
for (uint64_t i = 0; i < in_buffer->count; i++) {
record_t rec = in_buffer->records[i];
// Skip records outside snapshot
if (rec.logseq > snapshot.logseq_max) continue;
record_t *existing = hashmap_get(&latest_per_id, rec.canonical_id);
if (!existing || rec.logseq > existing->logseq) {
hashmap_put(&latest_per_id, rec.canonical_id, &rec);
}
}
// Write deterministic output
for each rec in sorted(latest_per_id by logseq + canonical_id) {
out_buffer->records[out_buffer->count++] = rec;
}
}
```
* Preserves **snapshot safety and deterministic order**
* Handles tombstones/shadowed records
---
## 8. Shard Dispatch & SIMD Integration
* Each shard executes **SegmentScan + IndexFilter** independently
* Results merged via **Merge**
* SIMD acceleration applies **filters inside IndexFilter** or TGKTraversal expansion
* Memory layout ensures **alignment and cache-friendly access**
---
## 9. Execution Plan Runner
```c
void ExecutePlan(exec_plan_t *plan, snapshot_range_t snapshot, record_buffer_t *final_output) {
record_buffer_t buffers[MAX_OPERATORS];
for (uint32_t i = 0; i < plan->operator_count; i++) {
operator_def_t *op = &plan->operators[i];
switch(op->op_type) {
case OP_SEGMENT_SCAN:
SegmentScan(op->params.segment_scan, snapshot, &buffers[i]);
break;
case OP_INDEX_FILTER:
IndexFilter(&buffers[op->inputs[0]], &op->params.index_filter, &buffers[i]);
break;
case OP_MERGE:
Merge(get_input_buffers(op, buffers), op->input_count, &buffers[i]);
break;
case OP_PROJECTION:
Projection(&buffers[op->inputs[0]], op->params.projection, &buffers[i]);
break;
case OP_TGK_TRAVERSAL:
TGKTraversal(&buffers[op->inputs[0]], op->params.tgk_traversal.traversal_depth,
snapshot, &buffers[i]);
break;
case OP_AGGREGATION:
Aggregation(&buffers[op->inputs[0]], &op->params.aggregation);
break;
case OP_TOMBSTONE_SHADOW:
TombstoneShadow(&buffers[op->inputs[0]], snapshot, &buffers[i]);
break;
default:
// Handle other operators (SIMDFilter, ShardDispatch, LimitOffset)
break;
}
}
// Final output from last operator
*final_output = buffers[plan->operator_count - 1];
}
```
* **DAG execution** respects `inputs[]`
* **Buffers indexed by operator ID**
* Deterministic and **snapshot-safe**
---
This pseudocode fully captures:
* Segment scanning and sharding
* SIMD-accelerated filtering
* TGK DAG traversal
* Projection, aggregation, tombstone shadowing
* Deterministic merge of shards
* Snapshot-safe execution
---
Next steps could include:
* **Integration with PEL deterministic DAG programs**, where **PEL program outputs feed into these operators**
* **Optimizations for block caching and prefetching** in ENC-ASL-TGK-INDEX
Do you want me to draft that next?