Skip to main content

Documentation Index

Fetch the complete documentation index at: https://docs.kaireonai.com/llms.txt

Use this file to discover all available pages before exploring further.

Transforms reshape rows inside a transform node in the Pipeline IR. The catalog is one Zod discriminated union declared at lib/flow/ir/nodes/transform.ts:17 (15 foundation ops + 4 W16 external-model ops = 19 total) and dispatched at lib/flow/runtime/executors/transform.ts:17. The same vocabulary is enforced at three points: the IR Zod schema at save time (parsePipelineIR), the visual canvas type-checker, and the runtime executor before execution. A transform op that does not appear in the discriminated union cannot be saved, rendered, or run.

What it does

A transform node sits between a source node (which materializes rows into a temp table) and a downstream validate, target, or join node. It carries one input: <upstream-node-id> reference and an ordered ops[] array. Each op compiles to a SQL snippet via compileOp at lib/flow/runtime/executors/transform.ts:17-62, the snippets are surfaced in the run result’s meta.sqlSnippets, and the row count flows through unchanged from the upstream node (transform.ts:64-75).
source → transform[ops] → (validate | join | target)
The 19 ops divide into three groups:
GroupOpsLives in
Column opscast, expression, rename, drop, add_field, hash, mask_pii, map_values, split, mergetransform.ts:18-28
Set / group opsfilter, deduplicate, aggregate, lookup_join, summarizetransform.ts:27-45
External-model ops (W16)vector_embed, geo_resolve, sentiment_score, language_detecttransform.ts:52-88

Quick start

Three-op pipeline that drops a PII column, casts a numeric column, and adds a constant tenant tag — posted as a complete IR via POST /api/v1/pipelines:
curl -X POST https://playground.kaireonai.com/api/v1/pipelines \
  -H "Content-Type: application/json" \
  -H "X-API-Key: krn_your_api_key" \
  -d '{
    "name": "orders-cleanup",
    "connectorId": "conn_local",
    "schemaId": "schema_orders",
    "irVersion": "1.0",
    "ir": {
      "kind": "pipeline",
      "version": "1.0",
      "id": "orders-cleanup",
      "metadata": { "name": "Orders cleanup" },
      "nodes": [
        { "id": "src", "kind": "source", "connector": "local_fs",
          "config": { "path": "/data/orders/", "format": "csv" } },
        { "id": "clean", "kind": "transform", "input": "src",
          "ops": [
            { "type": "drop", "fields": ["ssn"] },
            { "type": "cast", "field": "amount", "to": "numeric" },
            { "type": "add_field", "field": "tenant_tag", "defaultValue": "starbucks" }
          ] },
        { "id": "tgt", "kind": "target", "input": "clean",
          "config": { "table": "orders_clean", "mode": "append" } }
      ]
    }
  }'
The save round-trips through parsePipelineIR, lands as a pipeline_ir_versions row, and is executable via POST /api/v1/pipelines/:id/run.

How it works

IR node shape

Every transform node validates against transformNodeSchema at lib/flow/ir/nodes/transform.ts:91-96:
{
  "id": "<node id matching ^[a-zA-Z][a-zA-Z0-9_-]{0,62}$>",
  "kind": "transform",
  "input": "<upstream-node-id>",
  "ops": [ /* one or more TransformOp entries */ ]
}
ops is min(1) — an empty ops[] fails Zod with "transform must have at least one op" (transform.ts:95). Each op is itself validated against the discriminated union — an unknown type field is rejected at save time.

Runtime dispatch

transformExecutor at lib/flow/runtime/executors/transform.ts:64-75:
  1. Reads the upstream inputs[node.input] reference, including its rowsOut and table name.
  2. Maps every op through compileOp (the switch at transform.ts:17-62) into a SQL snippet.
  3. Returns { id, kind: "transform", rowsIn, rowsOut, table, meta: { sqlSnippets } }.
rowsIn and rowsOut are both equal to the upstream rowsOut because the executor does not yet run the compiled SQL against a materialized table — that materialization is the cross-cutting work documented in the Phase 4 spec and called out in Flow Overview. Today the snippets are observable in the run result for inspection and lineage; they do not mutate row counts.

Per-op semantics

All foundation ops (everything except the four W16 external-model ops) are per-row at the SQL layer — they compile to SELECT … FROM upstream projections or WHERE predicates. aggregate, deduplicate, and lookup_join are the set-level exceptions — they reshape row count or pull from a second table. Expression-bearing ops (expression, filter) flow through sanitizeExpression from lib/pipeline/sql-safety.ts, which rejects DDL/DML keywords (DROP, DELETE, UPDATE, INSERT, MERGE, COPY, GRANT, REVOKE, ALTER, TRUNCATE, EXECUTE, SELECT, UNION, WITH, RETURNING, INTO, CREATE) and keeps only allowlisted SQL function names. The integration test at lib/flow/__tests__/runtime/executors/transform.test.ts:20-27 proves that a filter op carrying "DROP TABLE x" throws. The four W16 external-model ops compile to placeholder SQL today and depend on the external-call adapter at lib/flow/runtime/transforms/external-model-call.ts. See W16 external-model ops below.

Reference

Each H3 below is one transform. Config fields cite the line in lib/flow/ir/nodes/transform.ts and the runtime cite is the compileOp case in lib/flow/runtime/executors/transform.ts.

cast

Convert a column to a different type.
FieldTypeSource
fieldstringtransform.ts:18
toenum: string | integer | bigint | float | numeric | boolean | date | timestamp | json | uuidtransform.ts:9-11, 18
{ "type": "cast", "field": "amount", "to": "numeric" }
Compiles to CAST("amount" AS NUMERIC) at executors/transform.ts:19-20. Postgres-side cast errors propagate to the run result; the IR layer does not validate that the source column is castable to the target type.

expression

Add a new column whose value is a sanitized SQL expression over other columns.
FieldTypeSource
outputFieldstringtransform.ts:19
formulastring (≤ 4096 chars)transform.ts:19, primitives.ts:16-19
{ "type": "expression", "outputField": "doubled", "formula": "amount * 2" }
Compiles to <sanitized-formula> AS "doubled" at executors/transform.ts:21-22. The formula passes through sanitizeExpression (lib/pipeline/sql-safety.ts); DDL/DML keywords throw and only allowlisted SQL functions survive.

rename

Rename a column.
FieldTypeSource
fromstringtransform.ts:20
tostringtransform.ts:20
{ "type": "rename", "from": "cust_id", "to": "customer_id" }
Compiles to "cust_id" AS "customer_id" at executors/transform.ts:23-24.

drop

Remove one or more columns.
FieldTypeSource
fieldsstring[] (min(1))transform.ts:21
{ "type": "drop", "fields": ["ssn", "raw_payload"] }
Compiles to a SQL comment -- drop "ssn", "raw_payload" at executors/transform.ts:25-26. The actual column elision is performed by the downstream target projection, not by the transform itself.

filter

Drop rows where a sanitized predicate is false.
FieldTypeSource
predicatestring (≤ 4096 chars)transform.ts:22, primitives.ts:16-19
{ "type": "filter", "predicate": "amount > 0 AND status = 'paid'" }
Compiles to WHERE <sanitized-predicate> at executors/transform.ts:27-28. predicate flows through the same sanitizeExpression allowlist; the test at transform.test.ts:20-27 asserts that DDL keywords in the predicate throw.

add_field

Add a new column populated with a constant scalar.
FieldTypeSource
fieldstringtransform.ts:23
defaultValuestring | number | boolean | nulltransform.ts:23
{ "type": "add_field", "field": "tenant_tag", "defaultValue": "starbucks" }
Compiles to <json-encoded-value> AS "<field>" at executors/transform.ts:29-30. For an expression-derived constant, use expression instead — add_field’s defaultValue is a literal scalar, not a formula.

hash

Replace a column’s value with its cryptographic hash.
FieldTypeSource
fieldstringtransform.ts:24
algorithmenum: sha256 | sha1 | sha512 | md5transform.ts:12, 24
{ "type": "hash", "field": "email", "algorithm": "sha256" }
Compiles to SHA256("email") at executors/transform.ts:31-32. The function names rely on the target Postgres install carrying pgcrypto (SHA256, SHA1, etc.); without pgcrypto, the run fails at execution. md5 is built into Postgres.

mask_pii

Replace a column’s value with a masked representation.
FieldTypeSource
fieldstringtransform.ts:25
maskTypeenum: ssn | email | phone | card | redacttransform.ts:13, 25
{ "type": "mask_pii", "field": "ssn", "maskType": "ssn" }
Compiles to mask_ssn("ssn") at executors/transform.ts:33-34. The mask_<type> SQL function is expected to exist on the target database; the runtime does not bundle a Postgres extension that provides it.

map_values

Substitute column values via a lookup table.
FieldTypeSource
fieldstringtransform.ts:26
mappingRecord<string, string | number | boolean>transform.ts:26
{
  "type": "map_values",
  "field": "country",
  "mapping": { "US": "United States", "GB": "United Kingdom", "JP": "Japan" }
}
Compiles to a CASE "country" WHEN "US" THEN "United States" … END expression at executors/transform.ts:35-38. Unmatched values become NULL because no ELSE branch is emitted — pair with a downstream expression op using COALESCE if you need a default.

split

Split a column on a delimiter into multiple output columns.
FieldTypeSource
fieldstringtransform.ts:27
delimiterstring (min(1))transform.ts:27
outputsstring[] (min(2))transform.ts:27
{ "type": "split", "field": "full_name", "delimiter": " ", "outputs": ["first_name", "last_name"] }
Compiles to split_<delim-len>("<field>", <output-count>) at executors/transform.ts:39-40. The compiled snippet records the delimiter length and output count for inspection; the actual per-row split executes once row materialization is wired (see Honest limits).

merge

Concatenate several columns into a single output column.
FieldTypeSource
fieldsstring[] (min(2))transform.ts:28
outputstringtransform.ts:28
separatorstringtransform.ts:28
{ "type": "merge", "fields": ["first_name", "last_name"], "output": "full_name", "separator": " " }
Compiles to CONCAT_WS(" ", "first_name", "last_name") AS "full_name" at executors/transform.ts:41-42. CONCAT_WS skips NULL arguments — the separator is not duplicated when a field is null.

deduplicate

Drop duplicate rows by a key set.
FieldTypeSource
keysstring[] (min(1))transform.ts:29
orderstring (optional)transform.ts:29
{ "type": "deduplicate", "keys": ["customer_id", "order_date"] }
Compiles to DISTINCT ON ("customer_id", "order_date") at executors/transform.ts:43-44. order is captured in the IR but not emitted in the snippet today — Postgres DISTINCT ON retains the first row per key by the surrounding ORDER BY, which the upstream source’s natural order supplies.

aggregate

Group rows and reduce each group to one row of aggregates.
FieldTypeSource
groupBystring[] (min(1))transform.ts:32
aggs[]{ fn, field, as }[] (min(1))transform.ts:33
aggs[].fnenum: sum | avg | min | max | count | count_distincttransform.ts:14, 33
aggs[].fieldstringtransform.ts:33
aggs[].asstringtransform.ts:33
{
  "type": "aggregate",
  "groupBy": ["customer_id"],
  "aggs": [
    { "fn": "sum", "field": "amount", "as": "total_spent" },
    { "fn": "count", "field": "order_id", "as": "order_count" }
  ]
}
Compiles to GROUP BY "customer_id"; SUM("amount") AS "total_spent", COUNT("order_id") AS "order_count" at executors/transform.ts:45-48. groupBy cannot be empty (Zod min(1)); use a constant add_field op upstream if you need a single-row grand total. count_distinct is the IR enum value — the executor uppercases it to COUNT_DISTINCT(…), which is not standard SQL, so the snippet is a marker rather than executable Postgres until materialization wires a COUNT(DISTINCT …) rewrite.

lookup_join

Join the row stream against a lookup table on a key.
FieldTypeSource
joinTypeenum: left | inner | right | fulltransform.ts:15, 37
lookupTablestringtransform.ts:38
on.leftstringtransform.ts:39
on.rightstringtransform.ts:39
{
  "type": "lookup_join",
  "joinType": "left",
  "lookupTable": "customers",
  "on": { "left": "customer_id", "right": "id" }
}
Compiles to LEFT JOIN "customers" ON "customer_id" = "id" at executors/transform.ts:49-50. lookupTable is a raw identifier — the executor does not scope it to the tenant or check that the table exists at compile time. The lookup is single-key only; for composite keys, fall back to the dedicated join node kind documented in Pipeline IR.

summarize

Roll up per-customer activity over a time window into one row per customer.
FieldTypeSource
customerKeystringtransform.ts:43
windowDayspositive integertransform.ts:44
{ "type": "summarize", "customerKey": "customer_id", "windowDays": 30 }
Compiles to summarize("customer_id", 30) at executors/transform.ts:51-52. The op is a placeholder for the customer-360 summary builder — the actual aggregation logic ships once row materialization lands. windowDays must be > 0; non-positive values fail Zod.

W16 external-model ops

The four ops below are W16 round-3 additions (parity-to-9 §3.15 lift). The IR validator accepts them today; the runtime compiles each to a placeholder SQL marker (e.g. vector_embed("text") AS "embedding" /* W16 */) at executors/transform.ts:53-60. The real call lives in lib/flow/runtime/transforms/external-model-call.ts:32-205, which is not yet wired into transformExecutor. Each adapter throws MissingExternalModelEndpointError when its endpoint env var is unset (external-model-call.ts:64-76), so a pipeline that runs after wiring still fails closed when the operator hasn’t configured an endpoint.

vector_embed

Embed a text column with an external embedding model.
FieldTypeSource
inputFieldstringtransform.ts:54
outputFieldstring (default "embedding")transform.ts:55
providerenum: openai | google | voyage | local (optional)transform.ts:57
modelstring (optional)transform.ts:59
maxCharspositive integer (optional, default 8000)transform.ts:61
{ "type": "vector_embed", "inputField": "review_text", "outputField": "review_vec", "provider": "openai" }
Compiles to vector_embed("review_text") AS "review_vec" /* W16 */ at executors/transform.ts:53-54. Wiring requires VECTOR_EMBED_ENDPOINT (and optionally VECTOR_EMBED_BEARER) — see Honest limits.

geo_resolve

Geocode an address column into latitude / longitude pair.
FieldTypeSource
addressFieldstringtransform.ts:65
outputLatstring (default "lat")transform.ts:66
outputLonstring (default "lon")transform.ts:67
providerenum: google_maps | mapbox | nominatim (optional)transform.ts:69
cacheboolean (optional)transform.ts:71
{ "type": "geo_resolve", "addressField": "street_address", "provider": "mapbox", "cache": true }
Compiles to geocode("street_address") AS ("lat", "lon") /* W16 */ at executors/transform.ts:55-56. When the wired adapter runs and cache: true, hits round through the existing Redis layer.

sentiment_score

Score a text column for sentiment.
FieldTypeSource
textFieldstringtransform.ts:75
outputFieldstring (default "sentiment")transform.ts:76
methodenum: rule (default) | llmtransform.ts:78
outputShapeenum: score (default) | labeltransform.ts:80
{ "type": "sentiment_score", "textField": "review_text", "method": "rule", "outputShape": "score" }
Compiles to sentiment_rule("review_text") AS "sentiment" /* W16 */ at executors/transform.ts:57-58. method: "rule" is meant to use a built-in lexicon (no external call), and method: "llm" calls the tenant’s configured LLM — both paths route through the W16 adapter once wired.

language_detect

Detect the language of a text column.
FieldTypeSource
textFieldstringtransform.ts:84
outputFieldstring (default "language")transform.ts:85
outputCodeenum: bcp47 | iso639_1 (default iso639_1)transform.ts:87
{ "type": "language_detect", "textField": "review_text", "outputCode": "iso639_1" }
Compiles to lang_detect("review_text") AS "language" /* W16 */ at executors/transform.ts:59-60.

Configuration

SettingEffectSource
Tenant flowIrEnabledRequired to author IR-mode pipelines that contain transform nodes. Legacy pipelines run without this flag.Pipeline IR
VECTOR_EMBED_ENDPOINT (+ VECTOR_EMBED_BEARER)Enables vector_embed once the runtime wires the adapter.lib/flow/runtime/transforms/external-model-call.ts:46-49
GEO_RESOLVE_ENDPOINT (+ GEO_RESOLVE_BEARER)Enables geo_resolve.external-model-call.ts:50-53
SENTIMENT_SCORE_ENDPOINT (+ SENTIMENT_SCORE_BEARER)Enables sentiment_score.external-model-call.ts:54-57
LANGUAGE_DETECT_ENDPOINT (+ LANGUAGE_DETECT_BEARER)Enables language_detect.external-model-call.ts:58-61
The four W16 endpoints are independent — wiring one does not enable the others. Outbound calls go through validateAndResolve (lib/security/url-validator.ts) before fetch, so private-IP and DNS-rebound hosts are rejected. Per-call timeout is 5 seconds (external-model-call.ts:34).

Honest limits

  • Row materialization is not yet wired. transformExecutor compiles op SQL into meta.sqlSnippets but does not execute it against a temp table — rowsIn === rowsOut for every transform call (executors/transform.ts:64-75). The cross-cutting materialization work is documented in the Phase 4 spec and listed in Flow Overview → Honest limits. Until it lands, transforms are observable but not row-effective.
  • The four W16 ops compile to placeholder SQL. vector_embed, geo_resolve, sentiment_score, language_detect produce a /* W16 */ marker in meta.sqlSnippets. The real adapter at lib/flow/runtime/transforms/external-model-call.ts is not yet called from transformExecutor; when it is wired, each op throws MissingExternalModelEndpointError until its endpoint env var is set.
  • hash and mask_pii rely on database-side functions. SHA256/SHA1/SHA512 need pgcrypto on the target database; mask_<type> is expected to exist as a user-defined function. The IR layer does not check for either.
  • lookup_join is single-key. The op carries one on.left = on.right pair — composite-key joins use the join node kind instead.
  • map_values has no default branch. Unmatched input values become NULL. Pair with a downstream expression op using COALESCE for a default.
  • aggregate requires a non-empty groupBy. Zod min(1) rejects empty arrays. For a single-row grand total, prepend a constant add_field op and group on that constant.
  • deduplicate.order is captured but not emitted. Postgres DISTINCT ON retains the first row per key by the surrounding ORDER BY, which the upstream source supplies — explicit per-op ordering is a follow-up.
  • count_distinct aggregate compiles to a non-standard COUNT_DISTINCT(…) snippet. A real COUNT(DISTINCT …) rewrite ships with the materialization work; today the snippet is a marker.
  • Pipeline IR — top-level IR shape, the other 7 node kinds, and validation layers.
  • Flow Overview — phase summary and the wider Flow honest-residuals table.
  • AI Pipeline Authoring — how the AI assistant proposes IR diffs, including transform ops.
  • MCP Flow Server — external-agent tools that read and write IR.
  • Pipelines APIPOST /api/v1/pipelines and POST /api/v1/pipelines/:id/run endpoints.