7.9 KiB
7.9 KiB
NOTE: Superseded by tier1 TGK/1 and vendor/amduat/tier1/tgk-1-core.md; retained for historical context.
Absolutely — here’s a formal draft of operator pseudocode for the unified ASL + TGK execution plan. This pseudocode captures shard/SIMD acceleration, deterministic execution, snapshot safety, and DAG traversal, mapping directly to ENC-ASL-TGK-INDEX.
Operator Pseudocode (ASL + TGK Execution Plan)
1. SegmentScan Operator
void SegmentScan(segment_t *segment, snapshot_range_t snapshot, record_buffer_t *out_buffer) {
// Iterate over all records in the segment
for (uint64_t i = 0; i < segment->record_count; i++) {
record_t rec = segment->records[i];
// Snapshot visibility
if (rec.logseq > snapshot.logseq_max) continue;
// Append to output buffer
out_buffer->records[out_buffer->count++] = rec;
}
}
- Can be shard-parallelized
- Works for both ASL artifacts and TGK edges
record_buffer_tis preallocated, SIMD-aligned
2. IndexFilter Operator
void IndexFilter(record_buffer_t *in_buffer, filter_t *filter, record_buffer_t *out_buffer) {
for (uint64_t i = 0; i < in_buffer->count; i += SIMD_WIDTH) {
simd_mask_t mask = SIMD_TRUE;
// SIMD filter artifact type
if (filter->has_type_tag) {
mask &= SIMD_EQ(in_buffer->type_tags[i:i+SIMD_WIDTH], filter->artifact_type_tag);
}
// SIMD filter edge type
if (filter->has_edge_type) {
mask &= SIMD_EQ(in_buffer->edge_type_keys[i:i+SIMD_WIDTH], filter->edge_type_key);
}
// SIMD role filter (for TGK edges)
if (filter->role) {
mask &= SIMD_EQ(in_buffer->roles[i:i+SIMD_WIDTH], filter->role);
}
// Store passing records
SIMD_STORE_MASKED(in_buffer->records[i:i+SIMD_WIDTH], mask, out_buffer->records);
}
out_buffer->count = count_masked_records(out_buffer);
}
- SIMD ensures parallel, vectorized evaluation
- Deterministic since order preserved
3. Merge Operator
void Merge(record_buffer_t **inputs, int num_inputs, record_buffer_t *out_buffer) {
min_heap_t heap = build_heap(inputs, num_inputs);
while (!heap_empty(heap)) {
record_t rec = heap_pop(heap);
out_buffer->records[out_buffer->count++] = rec;
// Advance from the source buffer
heap_advance_source(heap, rec.source_buffer_id);
}
}
- Uses logseq ascending + canonical ID tie-breaker
- Deterministic across shards
4. TGKTraversal Operator
void TGKTraversal(record_buffer_t *in_buffer, uint32_t depth, snapshot_range_t snapshot, record_buffer_t *out_buffer) {
record_buffer_t current_buffer = *in_buffer;
for (uint32_t d = 0; d < depth; d++) {
record_buffer_t next_buffer = allocate_buffer();
for (uint64_t i = 0; i < current_buffer.count; i++) {
record_t rec = current_buffer.records[i];
// Skip if not visible
if (rec.logseq > snapshot.logseq_max) continue;
// Expand edges deterministically
for (uint64_t j = 0; j < rec.to_count; j++) {
record_t edge = lookup_edge(rec.to_nodes[j]);
next_buffer.records[next_buffer.count++] = edge;
}
}
// Merge deterministically
MergeBuffers(&next_buffer, 1, out_buffer);
// Prepare for next depth
current_buffer = next_buffer;
}
}
- Expansion per depth
- Deterministic ordering guaranteed
- Snapshot-safe traversal
5. Projection Operator
void Projection(record_buffer_t *in_buffer, projection_mask_t mask, record_buffer_t *out_buffer) {
for (uint64_t i = 0; i < in_buffer->count; i++) {
record_t rec = in_buffer->records[i];
projected_record_t prow;
if (mask.project_artifact_id) prow.artifact_id = rec.artifact_id;
if (mask.project_tgk_edge_id) prow.tgk_edge_id = rec.tgk_edge_id;
if (mask.project_node_id) prow.node_id = rec.node_id;
if (mask.project_type_tag) prow.type_tag = rec.type_tag;
out_buffer->records[out_buffer->count++] = prow;
}
}
6. Aggregation Operator
void Aggregation(record_buffer_t *in_buffer, aggregation_accumulator_t *acc) {
for (uint64_t i = 0; i < in_buffer->count; i++) {
record_t rec = in_buffer->records[i];
if (acc->count_enabled) acc->count++;
if (acc->sum_type_tag_enabled) acc->sum_type_tag += rec.type_tag;
if (acc->union_enabled) acc->union_set.insert(rec.artifact_id);
}
}
- Deterministic aggregation across shards due to pre-merged, ordered input
7. TombstoneShadow Operator
void TombstoneShadow(record_buffer_t *in_buffer, snapshot_range_t snapshot, record_buffer_t *out_buffer) {
hashmap_t latest_per_id;
for (uint64_t i = 0; i < in_buffer->count; i++) {
record_t rec = in_buffer->records[i];
// Skip records outside snapshot
if (rec.logseq > snapshot.logseq_max) continue;
record_t *existing = hashmap_get(&latest_per_id, rec.canonical_id);
if (!existing || rec.logseq > existing->logseq) {
hashmap_put(&latest_per_id, rec.canonical_id, &rec);
}
}
// Write deterministic output
for each rec in sorted(latest_per_id by logseq + canonical_id) {
out_buffer->records[out_buffer->count++] = rec;
}
}
- Preserves snapshot safety and deterministic order
- Handles tombstones/shadowed records
8. Shard Dispatch & SIMD Integration
- Each shard executes SegmentScan + IndexFilter independently
- Results merged via Merge
- SIMD acceleration applies filters inside IndexFilter or TGKTraversal expansion
- Memory layout ensures alignment and cache-friendly access
9. Execution Plan Runner
void ExecutePlan(exec_plan_t *plan, snapshot_range_t snapshot, record_buffer_t *final_output) {
record_buffer_t buffers[MAX_OPERATORS];
for (uint32_t i = 0; i < plan->operator_count; i++) {
operator_def_t *op = &plan->operators[i];
switch(op->op_type) {
case OP_SEGMENT_SCAN:
SegmentScan(op->params.segment_scan, snapshot, &buffers[i]);
break;
case OP_INDEX_FILTER:
IndexFilter(&buffers[op->inputs[0]], &op->params.index_filter, &buffers[i]);
break;
case OP_MERGE:
Merge(get_input_buffers(op, buffers), op->input_count, &buffers[i]);
break;
case OP_PROJECTION:
Projection(&buffers[op->inputs[0]], op->params.projection, &buffers[i]);
break;
case OP_TGK_TRAVERSAL:
TGKTraversal(&buffers[op->inputs[0]], op->params.tgk_traversal.traversal_depth,
snapshot, &buffers[i]);
break;
case OP_AGGREGATION:
Aggregation(&buffers[op->inputs[0]], &op->params.aggregation);
break;
case OP_TOMBSTONE_SHADOW:
TombstoneShadow(&buffers[op->inputs[0]], snapshot, &buffers[i]);
break;
default:
// Handle other operators (SIMDFilter, ShardDispatch, LimitOffset)
break;
}
}
// Final output from last operator
*final_output = buffers[plan->operator_count - 1];
}
- DAG execution respects
inputs[] - Buffers indexed by operator ID
- Deterministic and snapshot-safe
This pseudocode fully captures:
- Segment scanning and sharding
- SIMD-accelerated filtering
- TGK DAG traversal
- Projection, aggregation, tombstone shadowing
- Deterministic merge of shards
- Snapshot-safe execution
Next steps could include:
- Integration with PEL deterministic DAG programs, where PEL program outputs feed into these operators
- Optimizations for block caching and prefetching in ENC-ASL-TGK-INDEX
Do you want me to draft that next?