amduat-api/notes/Operator Pseudocode (ASL + TGK Execution Plan).md
2026-01-17 00:19:49 +01:00

7.8 KiB
Raw Blame History

Absolutely — heres a formal draft of operator pseudocode for the unified ASL + TGK execution plan. This pseudocode captures shard/SIMD acceleration, deterministic execution, snapshot safety, and DAG traversal, mapping directly to ENC-ASL-TGK-INDEX.


Operator Pseudocode (ASL + TGK Execution Plan)


1. SegmentScan Operator

void SegmentScan(segment_t *segment, snapshot_range_t snapshot, record_buffer_t *out_buffer) {
    // Iterate over all records in the segment
    for (uint64_t i = 0; i < segment->record_count; i++) {
        record_t rec = segment->records[i];

        // Snapshot visibility
        if (rec.logseq > snapshot.logseq_max) continue;

        // Append to output buffer
        out_buffer->records[out_buffer->count++] = rec;
    }
}
  • Can be shard-parallelized
  • Works for both ASL artifacts and TGK edges
  • record_buffer_t is preallocated, SIMD-aligned

2. IndexFilter Operator

void IndexFilter(record_buffer_t *in_buffer, filter_t *filter, record_buffer_t *out_buffer) {
    for (uint64_t i = 0; i < in_buffer->count; i += SIMD_WIDTH) {
        simd_mask_t mask = SIMD_TRUE;

        // SIMD filter artifact type
        if (filter->has_type_tag) {
            mask &= SIMD_EQ(in_buffer->type_tags[i:i+SIMD_WIDTH], filter->artifact_type_tag);
        }

        // SIMD filter edge type
        if (filter->has_edge_type) {
            mask &= SIMD_EQ(in_buffer->edge_type_keys[i:i+SIMD_WIDTH], filter->edge_type_key);
        }

        // SIMD role filter (for TGK edges)
        if (filter->role) {
            mask &= SIMD_EQ(in_buffer->roles[i:i+SIMD_WIDTH], filter->role);
        }

        // Store passing records
        SIMD_STORE_MASKED(in_buffer->records[i:i+SIMD_WIDTH], mask, out_buffer->records);
    }
    out_buffer->count = count_masked_records(out_buffer);
}
  • SIMD ensures parallel, vectorized evaluation
  • Deterministic since order preserved

3. Merge Operator

void Merge(record_buffer_t **inputs, int num_inputs, record_buffer_t *out_buffer) {
    min_heap_t heap = build_heap(inputs, num_inputs);

    while (!heap_empty(heap)) {
        record_t rec = heap_pop(heap);

        out_buffer->records[out_buffer->count++] = rec;

        // Advance from the source buffer
        heap_advance_source(heap, rec.source_buffer_id);
    }
}
  • Uses logseq ascending + canonical ID tie-breaker
  • Deterministic across shards

4. TGKTraversal Operator

void TGKTraversal(record_buffer_t *in_buffer, uint32_t depth, snapshot_range_t snapshot, record_buffer_t *out_buffer) {
    record_buffer_t current_buffer = *in_buffer;

    for (uint32_t d = 0; d < depth; d++) {
        record_buffer_t next_buffer = allocate_buffer();

        for (uint64_t i = 0; i < current_buffer.count; i++) {
            record_t rec = current_buffer.records[i];

            // Skip if not visible
            if (rec.logseq > snapshot.logseq_max) continue;

            // Expand edges deterministically
            for (uint64_t j = 0; j < rec.to_count; j++) {
                record_t edge = lookup_edge(rec.to_nodes[j]);
                next_buffer.records[next_buffer.count++] = edge;
            }
        }

        // Merge deterministically
        MergeBuffers(&next_buffer, 1, out_buffer);

        // Prepare for next depth
        current_buffer = next_buffer;
    }
}
  • Expansion per depth
  • Deterministic ordering guaranteed
  • Snapshot-safe traversal

5. Projection Operator

void Projection(record_buffer_t *in_buffer, projection_mask_t mask, record_buffer_t *out_buffer) {
    for (uint64_t i = 0; i < in_buffer->count; i++) {
        record_t rec = in_buffer->records[i];
        projected_record_t prow;

        if (mask.project_artifact_id) prow.artifact_id = rec.artifact_id;
        if (mask.project_tgk_edge_id) prow.tgk_edge_id = rec.tgk_edge_id;
        if (mask.project_node_id) prow.node_id = rec.node_id;
        if (mask.project_type_tag) prow.type_tag = rec.type_tag;

        out_buffer->records[out_buffer->count++] = prow;
    }
}

6. Aggregation Operator

void Aggregation(record_buffer_t *in_buffer, aggregation_accumulator_t *acc) {
    for (uint64_t i = 0; i < in_buffer->count; i++) {
        record_t rec = in_buffer->records[i];

        if (acc->count_enabled) acc->count++;
        if (acc->sum_type_tag_enabled) acc->sum_type_tag += rec.type_tag;
        if (acc->union_enabled) acc->union_set.insert(rec.artifact_id);
    }
}
  • Deterministic aggregation across shards due to pre-merged, ordered input

7. TombstoneShadow Operator

void TombstoneShadow(record_buffer_t *in_buffer, snapshot_range_t snapshot, record_buffer_t *out_buffer) {
    hashmap_t latest_per_id;

    for (uint64_t i = 0; i < in_buffer->count; i++) {
        record_t rec = in_buffer->records[i];

        // Skip records outside snapshot
        if (rec.logseq > snapshot.logseq_max) continue;

        record_t *existing = hashmap_get(&latest_per_id, rec.canonical_id);

        if (!existing || rec.logseq > existing->logseq) {
            hashmap_put(&latest_per_id, rec.canonical_id, &rec);
        }
    }

    // Write deterministic output
    for each rec in sorted(latest_per_id by logseq + canonical_id) {
        out_buffer->records[out_buffer->count++] = rec;
    }
}
  • Preserves snapshot safety and deterministic order
  • Handles tombstones/shadowed records

8. Shard Dispatch & SIMD Integration

  • Each shard executes SegmentScan + IndexFilter independently
  • Results merged via Merge
  • SIMD acceleration applies filters inside IndexFilter or TGKTraversal expansion
  • Memory layout ensures alignment and cache-friendly access

9. Execution Plan Runner

void ExecutePlan(exec_plan_t *plan, snapshot_range_t snapshot, record_buffer_t *final_output) {
    record_buffer_t buffers[MAX_OPERATORS];

    for (uint32_t i = 0; i < plan->operator_count; i++) {
        operator_def_t *op = &plan->operators[i];

        switch(op->op_type) {
            case OP_SEGMENT_SCAN:
                SegmentScan(op->params.segment_scan, snapshot, &buffers[i]);
                break;
            case OP_INDEX_FILTER:
                IndexFilter(&buffers[op->inputs[0]], &op->params.index_filter, &buffers[i]);
                break;
            case OP_MERGE:
                Merge(get_input_buffers(op, buffers), op->input_count, &buffers[i]);
                break;
            case OP_PROJECTION:
                Projection(&buffers[op->inputs[0]], op->params.projection, &buffers[i]);
                break;
            case OP_TGK_TRAVERSAL:
                TGKTraversal(&buffers[op->inputs[0]], op->params.tgk_traversal.traversal_depth,
                             snapshot, &buffers[i]);
                break;
            case OP_AGGREGATION:
                Aggregation(&buffers[op->inputs[0]], &op->params.aggregation);
                break;
            case OP_TOMBSTONE_SHADOW:
                TombstoneShadow(&buffers[op->inputs[0]], snapshot, &buffers[i]);
                break;
            default:
                // Handle other operators (SIMDFilter, ShardDispatch, LimitOffset)
                break;
        }
    }

    // Final output from last operator
    *final_output = buffers[plan->operator_count - 1];
}
  • DAG execution respects inputs[]
  • Buffers indexed by operator ID
  • Deterministic and snapshot-safe

This pseudocode fully captures:

  • Segment scanning and sharding
  • SIMD-accelerated filtering
  • TGK DAG traversal
  • Projection, aggregation, tombstone shadowing
  • Deterministic merge of shards
  • Snapshot-safe execution

Next steps could include:

  • Integration with PEL deterministic DAG programs, where PEL program outputs feed into these operators
  • Optimizations for block caching and prefetching in ENC-ASL-TGK-INDEX

Do you want me to draft that next?