amduat-api/notes/Operator Pseudocode (ASL + TGK Execution Plan).md

272 lines
7.9 KiB
Markdown
Raw Normal View History

NOTE: Superseded by tier1 TGK/1 and vendor/amduat/tier1/tgk-1-core.md; retained for historical context.
Absolutely — heres a **formal draft of operator pseudocode** for the unified ASL + TGK execution plan. This pseudocode captures **shard/SIMD acceleration, deterministic execution, snapshot safety, and DAG traversal**, mapping directly to ENC-ASL-TGK-INDEX.
---
# Operator Pseudocode (ASL + TGK Execution Plan)
---
## 1. SegmentScan Operator
```c
void SegmentScan(segment_t *segment, snapshot_range_t snapshot, record_buffer_t *out_buffer) {
// Iterate over all records in the segment
for (uint64_t i = 0; i < segment->record_count; i++) {
record_t rec = segment->records[i];
// Snapshot visibility
if (rec.logseq > snapshot.logseq_max) continue;
// Append to output buffer
out_buffer->records[out_buffer->count++] = rec;
}
}
```
* Can be **shard-parallelized**
* Works for both **ASL artifacts** and **TGK edges**
* `record_buffer_t` is preallocated, SIMD-aligned
---
## 2. IndexFilter Operator
```c
void IndexFilter(record_buffer_t *in_buffer, filter_t *filter, record_buffer_t *out_buffer) {
for (uint64_t i = 0; i < in_buffer->count; i += SIMD_WIDTH) {
simd_mask_t mask = SIMD_TRUE;
// SIMD filter artifact type
if (filter->has_type_tag) {
mask &= SIMD_EQ(in_buffer->type_tags[i:i+SIMD_WIDTH], filter->artifact_type_tag);
}
// SIMD filter edge type
if (filter->has_edge_type) {
mask &= SIMD_EQ(in_buffer->edge_type_keys[i:i+SIMD_WIDTH], filter->edge_type_key);
}
// SIMD role filter (for TGK edges)
if (filter->role) {
mask &= SIMD_EQ(in_buffer->roles[i:i+SIMD_WIDTH], filter->role);
}
// Store passing records
SIMD_STORE_MASKED(in_buffer->records[i:i+SIMD_WIDTH], mask, out_buffer->records);
}
out_buffer->count = count_masked_records(out_buffer);
}
```
* SIMD ensures **parallel, vectorized evaluation**
* Deterministic since order preserved
---
## 3. Merge Operator
```c
void Merge(record_buffer_t **inputs, int num_inputs, record_buffer_t *out_buffer) {
min_heap_t heap = build_heap(inputs, num_inputs);
while (!heap_empty(heap)) {
record_t rec = heap_pop(heap);
out_buffer->records[out_buffer->count++] = rec;
// Advance from the source buffer
heap_advance_source(heap, rec.source_buffer_id);
}
}
```
* Uses **logseq ascending + canonical ID tie-breaker**
* Deterministic across shards
---
## 4. TGKTraversal Operator
```c
void TGKTraversal(record_buffer_t *in_buffer, uint32_t depth, snapshot_range_t snapshot, record_buffer_t *out_buffer) {
record_buffer_t current_buffer = *in_buffer;
for (uint32_t d = 0; d < depth; d++) {
record_buffer_t next_buffer = allocate_buffer();
for (uint64_t i = 0; i < current_buffer.count; i++) {
record_t rec = current_buffer.records[i];
// Skip if not visible
if (rec.logseq > snapshot.logseq_max) continue;
// Expand edges deterministically
for (uint64_t j = 0; j < rec.to_count; j++) {
record_t edge = lookup_edge(rec.to_nodes[j]);
next_buffer.records[next_buffer.count++] = edge;
}
}
// Merge deterministically
MergeBuffers(&next_buffer, 1, out_buffer);
// Prepare for next depth
current_buffer = next_buffer;
}
}
```
* Expansion **per depth**
* Deterministic ordering guaranteed
* Snapshot-safe traversal
---
## 5. Projection Operator
```c
void Projection(record_buffer_t *in_buffer, projection_mask_t mask, record_buffer_t *out_buffer) {
for (uint64_t i = 0; i < in_buffer->count; i++) {
record_t rec = in_buffer->records[i];
projected_record_t prow;
if (mask.project_artifact_id) prow.artifact_id = rec.artifact_id;
if (mask.project_tgk_edge_id) prow.tgk_edge_id = rec.tgk_edge_id;
if (mask.project_node_id) prow.node_id = rec.node_id;
if (mask.project_type_tag) prow.type_tag = rec.type_tag;
out_buffer->records[out_buffer->count++] = prow;
}
}
```
---
## 6. Aggregation Operator
```c
void Aggregation(record_buffer_t *in_buffer, aggregation_accumulator_t *acc) {
for (uint64_t i = 0; i < in_buffer->count; i++) {
record_t rec = in_buffer->records[i];
if (acc->count_enabled) acc->count++;
if (acc->sum_type_tag_enabled) acc->sum_type_tag += rec.type_tag;
if (acc->union_enabled) acc->union_set.insert(rec.artifact_id);
}
}
```
* Deterministic aggregation across shards due to **pre-merged, ordered input**
---
## 7. TombstoneShadow Operator
```c
void TombstoneShadow(record_buffer_t *in_buffer, snapshot_range_t snapshot, record_buffer_t *out_buffer) {
hashmap_t latest_per_id;
for (uint64_t i = 0; i < in_buffer->count; i++) {
record_t rec = in_buffer->records[i];
// Skip records outside snapshot
if (rec.logseq > snapshot.logseq_max) continue;
record_t *existing = hashmap_get(&latest_per_id, rec.canonical_id);
if (!existing || rec.logseq > existing->logseq) {
hashmap_put(&latest_per_id, rec.canonical_id, &rec);
}
}
// Write deterministic output
for each rec in sorted(latest_per_id by logseq + canonical_id) {
out_buffer->records[out_buffer->count++] = rec;
}
}
```
* Preserves **snapshot safety and deterministic order**
* Handles tombstones/shadowed records
---
## 8. Shard Dispatch & SIMD Integration
* Each shard executes **SegmentScan + IndexFilter** independently
* Results merged via **Merge**
* SIMD acceleration applies **filters inside IndexFilter** or TGKTraversal expansion
* Memory layout ensures **alignment and cache-friendly access**
---
## 9. Execution Plan Runner
```c
void ExecutePlan(exec_plan_t *plan, snapshot_range_t snapshot, record_buffer_t *final_output) {
record_buffer_t buffers[MAX_OPERATORS];
for (uint32_t i = 0; i < plan->operator_count; i++) {
operator_def_t *op = &plan->operators[i];
switch(op->op_type) {
case OP_SEGMENT_SCAN:
SegmentScan(op->params.segment_scan, snapshot, &buffers[i]);
break;
case OP_INDEX_FILTER:
IndexFilter(&buffers[op->inputs[0]], &op->params.index_filter, &buffers[i]);
break;
case OP_MERGE:
Merge(get_input_buffers(op, buffers), op->input_count, &buffers[i]);
break;
case OP_PROJECTION:
Projection(&buffers[op->inputs[0]], op->params.projection, &buffers[i]);
break;
case OP_TGK_TRAVERSAL:
TGKTraversal(&buffers[op->inputs[0]], op->params.tgk_traversal.traversal_depth,
snapshot, &buffers[i]);
break;
case OP_AGGREGATION:
Aggregation(&buffers[op->inputs[0]], &op->params.aggregation);
break;
case OP_TOMBSTONE_SHADOW:
TombstoneShadow(&buffers[op->inputs[0]], snapshot, &buffers[i]);
break;
default:
// Handle other operators (SIMDFilter, ShardDispatch, LimitOffset)
break;
}
}
// Final output from last operator
*final_output = buffers[plan->operator_count - 1];
}
```
* **DAG execution** respects `inputs[]`
* **Buffers indexed by operator ID**
* Deterministic and **snapshot-safe**
---
This pseudocode fully captures:
* Segment scanning and sharding
* SIMD-accelerated filtering
* TGK DAG traversal
* Projection, aggregation, tombstone shadowing
* Deterministic merge of shards
* Snapshot-safe execution
---
Next steps could include:
* **Integration with PEL deterministic DAG programs**, where **PEL program outputs feed into these operators**
* **Optimizations for block caching and prefetching** in ENC-ASL-TGK-INDEX
Do you want me to draft that next?