Absolutely — here’s a **formal draft of operator pseudocode** for the unified ASL + TGK execution plan. This pseudocode captures **shard/SIMD acceleration, deterministic execution, snapshot safety, and DAG traversal**, mapping directly to ENC-ASL-TGK-INDEX. --- # Operator Pseudocode (ASL + TGK Execution Plan) --- ## 1. SegmentScan Operator ```c void SegmentScan(segment_t *segment, snapshot_range_t snapshot, record_buffer_t *out_buffer) { // Iterate over all records in the segment for (uint64_t i = 0; i < segment->record_count; i++) { record_t rec = segment->records[i]; // Snapshot visibility if (rec.logseq > snapshot.logseq_max) continue; // Append to output buffer out_buffer->records[out_buffer->count++] = rec; } } ``` * Can be **shard-parallelized** * Works for both **ASL artifacts** and **TGK edges** * `record_buffer_t` is preallocated, SIMD-aligned --- ## 2. IndexFilter Operator ```c void IndexFilter(record_buffer_t *in_buffer, filter_t *filter, record_buffer_t *out_buffer) { for (uint64_t i = 0; i < in_buffer->count; i += SIMD_WIDTH) { simd_mask_t mask = SIMD_TRUE; // SIMD filter artifact type if (filter->has_type_tag) { mask &= SIMD_EQ(in_buffer->type_tags[i:i+SIMD_WIDTH], filter->artifact_type_tag); } // SIMD filter edge type if (filter->has_edge_type) { mask &= SIMD_EQ(in_buffer->edge_type_keys[i:i+SIMD_WIDTH], filter->edge_type_key); } // SIMD role filter (for TGK edges) if (filter->role) { mask &= SIMD_EQ(in_buffer->roles[i:i+SIMD_WIDTH], filter->role); } // Store passing records SIMD_STORE_MASKED(in_buffer->records[i:i+SIMD_WIDTH], mask, out_buffer->records); } out_buffer->count = count_masked_records(out_buffer); } ``` * SIMD ensures **parallel, vectorized evaluation** * Deterministic since order preserved --- ## 3. Merge Operator ```c void Merge(record_buffer_t **inputs, int num_inputs, record_buffer_t *out_buffer) { min_heap_t heap = build_heap(inputs, num_inputs); while (!heap_empty(heap)) { record_t rec = heap_pop(heap); out_buffer->records[out_buffer->count++] = rec; // Advance from the source buffer heap_advance_source(heap, rec.source_buffer_id); } } ``` * Uses **logseq ascending + canonical ID tie-breaker** * Deterministic across shards --- ## 4. TGKTraversal Operator ```c void TGKTraversal(record_buffer_t *in_buffer, uint32_t depth, snapshot_range_t snapshot, record_buffer_t *out_buffer) { record_buffer_t current_buffer = *in_buffer; for (uint32_t d = 0; d < depth; d++) { record_buffer_t next_buffer = allocate_buffer(); for (uint64_t i = 0; i < current_buffer.count; i++) { record_t rec = current_buffer.records[i]; // Skip if not visible if (rec.logseq > snapshot.logseq_max) continue; // Expand edges deterministically for (uint64_t j = 0; j < rec.to_count; j++) { record_t edge = lookup_edge(rec.to_nodes[j]); next_buffer.records[next_buffer.count++] = edge; } } // Merge deterministically MergeBuffers(&next_buffer, 1, out_buffer); // Prepare for next depth current_buffer = next_buffer; } } ``` * Expansion **per depth** * Deterministic ordering guaranteed * Snapshot-safe traversal --- ## 5. Projection Operator ```c void Projection(record_buffer_t *in_buffer, projection_mask_t mask, record_buffer_t *out_buffer) { for (uint64_t i = 0; i < in_buffer->count; i++) { record_t rec = in_buffer->records[i]; projected_record_t prow; if (mask.project_artifact_id) prow.artifact_id = rec.artifact_id; if (mask.project_tgk_edge_id) prow.tgk_edge_id = rec.tgk_edge_id; if (mask.project_node_id) prow.node_id = rec.node_id; if (mask.project_type_tag) prow.type_tag = rec.type_tag; out_buffer->records[out_buffer->count++] = prow; } } ``` --- ## 6. Aggregation Operator ```c void Aggregation(record_buffer_t *in_buffer, aggregation_accumulator_t *acc) { for (uint64_t i = 0; i < in_buffer->count; i++) { record_t rec = in_buffer->records[i]; if (acc->count_enabled) acc->count++; if (acc->sum_type_tag_enabled) acc->sum_type_tag += rec.type_tag; if (acc->union_enabled) acc->union_set.insert(rec.artifact_id); } } ``` * Deterministic aggregation across shards due to **pre-merged, ordered input** --- ## 7. TombstoneShadow Operator ```c void TombstoneShadow(record_buffer_t *in_buffer, snapshot_range_t snapshot, record_buffer_t *out_buffer) { hashmap_t latest_per_id; for (uint64_t i = 0; i < in_buffer->count; i++) { record_t rec = in_buffer->records[i]; // Skip records outside snapshot if (rec.logseq > snapshot.logseq_max) continue; record_t *existing = hashmap_get(&latest_per_id, rec.canonical_id); if (!existing || rec.logseq > existing->logseq) { hashmap_put(&latest_per_id, rec.canonical_id, &rec); } } // Write deterministic output for each rec in sorted(latest_per_id by logseq + canonical_id) { out_buffer->records[out_buffer->count++] = rec; } } ``` * Preserves **snapshot safety and deterministic order** * Handles tombstones/shadowed records --- ## 8. Shard Dispatch & SIMD Integration * Each shard executes **SegmentScan + IndexFilter** independently * Results merged via **Merge** * SIMD acceleration applies **filters inside IndexFilter** or TGKTraversal expansion * Memory layout ensures **alignment and cache-friendly access** --- ## 9. Execution Plan Runner ```c void ExecutePlan(exec_plan_t *plan, snapshot_range_t snapshot, record_buffer_t *final_output) { record_buffer_t buffers[MAX_OPERATORS]; for (uint32_t i = 0; i < plan->operator_count; i++) { operator_def_t *op = &plan->operators[i]; switch(op->op_type) { case OP_SEGMENT_SCAN: SegmentScan(op->params.segment_scan, snapshot, &buffers[i]); break; case OP_INDEX_FILTER: IndexFilter(&buffers[op->inputs[0]], &op->params.index_filter, &buffers[i]); break; case OP_MERGE: Merge(get_input_buffers(op, buffers), op->input_count, &buffers[i]); break; case OP_PROJECTION: Projection(&buffers[op->inputs[0]], op->params.projection, &buffers[i]); break; case OP_TGK_TRAVERSAL: TGKTraversal(&buffers[op->inputs[0]], op->params.tgk_traversal.traversal_depth, snapshot, &buffers[i]); break; case OP_AGGREGATION: Aggregation(&buffers[op->inputs[0]], &op->params.aggregation); break; case OP_TOMBSTONE_SHADOW: TombstoneShadow(&buffers[op->inputs[0]], snapshot, &buffers[i]); break; default: // Handle other operators (SIMDFilter, ShardDispatch, LimitOffset) break; } } // Final output from last operator *final_output = buffers[plan->operator_count - 1]; } ``` * **DAG execution** respects `inputs[]` * **Buffers indexed by operator ID** * Deterministic and **snapshot-safe** --- This pseudocode fully captures: * Segment scanning and sharding * SIMD-accelerated filtering * TGK DAG traversal * Projection, aggregation, tombstone shadowing * Deterministic merge of shards * Snapshot-safe execution --- Next steps could include: * **Integration with PEL deterministic DAG programs**, where **PEL program outputs feed into these operators** * **Optimizations for block caching and prefetching** in ENC-ASL-TGK-INDEX Do you want me to draft that next?