The Pipeline IR is the new source of truth for ETL pipelines starting in KaireonAI Flow Phase 1. It replaces the legacyDocumentation Index
Fetch the complete documentation index at: https://docs.kaireonai.com/llms.txt
Use this file to discover all available pages before exploring further.
PipelineNode + PipelineEdge
relational format with a single typed JSON document that the runtime
interprets directly. Pipelines authored in IR mode are authored once and
read by:
- The visual canvas (renders the IR)
- The AI assistant (proposes diffs against the IR)
- The MCP server (accepts IR via tool calls)
- The runtime interpreter (executes the IR)
transform node can carry. The IR schema accepts 19 op
types; the visual editor exposes the 14 most-used ones in its toolbar
(complex ops like summarize, vector_embed, geo_resolve,
sentiment_score, language_detect are authored via the JSON IR tab
or the AI assistant).
Why IR-first
Generative AI is reliable when it produces structured data, not code. Constrained JSON output validated against a Zod schema cannot drift from the contract. This is what makes “AI builds the pipeline” actually work — every proposal is schema-valid by construction.Top-level shape
Node kinds (Phase 1)
| Kind | Purpose |
|---|---|
source | Read from external system (file/DB/API/stream) |
transform | Per-row column ops — 19 op types (Transforms) |
validate | Row-level + dataset-level validation |
target | Write to internal schema table |
archive | Post-load file movement |
branch | Conditional routing |
join | Multi-input join |
enrich | Per-row external call (LLM tag, geocode, ML score) |
Source node example
The IR schema
source.kind field accepts a fixed set of cloud-store
and filesystem kinds: s3, gcs, azure_blob, sftp, ftp,
local_fs, http_pull. Runtime executors for local_fs, s3,
gcs, azure_blob, sftp, and http_pull are all live. ftp is
documented as deprecated (plaintext) and never reaches a runtime
executor. Streaming kinds (kafka, kinesis, pulsar) are gated by
FLOW_STREAMING_ENABLED and only land when self-hosters provide their
own broker. Note that source.kind is a separate concept from the full
connector registry (85 types); pipelines reference a saved Connector
record by connectorId, not by kind directly.Target node load modes
| Mode | Status | Notes |
|---|---|---|
append | Live | Explicit-column INSERT with NULLIF + per-column CAST |
truncate | Live | TRUNCATE + INSERT wrapped in prisma.$transaction. Empty-source guard (failOnEmptySource: true default) aborts before the destructive statement when upstream produced 0 rows |
upsert | Live | Requires upsertKey; throws if every projected column is in upsertKey (no updatable columns) |
blue_green | Live | Loads to <table>_new, atomic 3-step rename, drops <table>_old. Empty-source guard applies |
incremental_watermark | Live | High-water persisted in pipeline_watermarks; INSERT + watermark-upsert run in a single transaction. Requires watermarkColumn |
cdc_mirror | Gated | Requires FLOW_STREAMING_ENABLED=true env + a configured Debezium connector. Runtime returns a clear “streaming disabled” error otherwise |
Optional safety fields on target nodes
| Field | Type | Behavior |
|---|---|---|
failOnEmptySource | boolean (default true) | Aborts truncate / blue_green BEFORE the destructive statement when upstream produced 0 rows. Set false only when “empty file means clear the table” is the genuine intent |
expectedRowCountDelta | { minPct?, maxPct?, windowRuns? } | Emits a warning System Health alert when today’s rowsLoaded is outside the band vs. the running mean over the last N successful runs (default 7). Run still completes |
backupBeforeLoad | { enabled, retainCount? } | Snapshots the target via CREATE TABLE … AS TABLE … before destructive load. Only acts on truncate and blue_green. Backups older than retainCount (default 3) are pruned post-success. On failure, the backup is left in place and the run record’s meta.backupTable carries the snapshot table name |
Tenant gate
Pipeline IR is on by default —tenant_settings.flowIrEnabled
defaults to true. New pipelines created via POST /api/v1/pipelines
with irVersion: "1.0" and an ir body field are stored in the
pipeline_ir_versions table.
The flag remains in the schema as an explicit kill-switch: setting
flowIrEnabled = false makes pipeline create/run/AI-author endpoints
return 403 flow_ir_disabled. The legacy ETL editor and runtime were
removed in the 2026-04-28 cleanup, so disabling the flag effectively
disables pipelines entirely.
API
POST /api/v1/pipelines — send { name, connectorId, schemaId, irVersion: "1.0", ir: { ... } }
to create an IR-native pipeline. See Pipelines API
for the full shape.
POST /api/v1/pipelines/:id/run — when the pipeline has irVersion
set, the request is handed to the in-process batch interpreter and
returns the per-node result synchronously. Legacy pipelines continue
going to the BullMQ worker queue.
Validation layers
The IR is enforced at three layers:- Authoring time — Zod schemas in the UI / AI prompt
- Save time — server re-validates on POST and stores as
pipeline_ir_versionsrow - Runtime — interpreter re-checks before executing