The Pipeline IR is the new source of truth for ETL pipelines starting in
KaireonAI Flow Phase 1. It replaces the legacy PipelineNode + PipelineEdge
relational format with a single typed JSON document that the runtime
interprets directly. Pipelines authored in IR mode are authored once and
read by:
- The visual canvas (renders the IR)
- The AI assistant (proposes diffs against the IR)
- The MCP server (accepts IR via tool calls)
- The runtime interpreter (executes the IR)
Same source of truth, four surfaces.
Why IR-first
Generative AI is reliable when it produces structured data, not code.
Constrained JSON output validated against a Zod schema cannot drift
from the contract. This is what makes “AI builds the pipeline” actually
work — every proposal is schema-valid by construction.
Top-level shape
{
"kind": "pipeline",
"version": "1.0",
"id": "starbucks-daily-orders",
"metadata": {
"name": "Daily orders ingestion",
"owner": "data-team",
"tags": ["batch", "orders"]
},
"schedule": {
"kind": "cron",
"expression": "0 2 * * *",
"timezone": "America/Los_Angeles"
},
"nodes": [ /* see node kinds below */ ],
"errorHandling": {
"dlq": { "enabled": false },
"retry": { "maxAttempts": 3, "backoff": "exponential" }
}
}
Node kinds (Phase 1)
| Kind | Purpose |
|---|
source | Read from external system (file/DB/API/stream) |
transform | Per-row column ops — 15 op types |
validate | Row-level + dataset-level validation |
target | Write to internal schema table |
archive | Post-load file movement |
branch | Conditional routing |
join | Multi-input join |
enrich | Per-row external call (LLM tag, geocode, ML score) |
Source node example
{
"id": "src",
"kind": "source",
"connector": "local_fs",
"config": {
"path": "/data/orders/",
"pattern": { "type": "glob", "value": "orders_*.csv" },
"ordering": "latest_by_mtime",
"waitPolicy": { "maxRetries": 6, "intervalMinutes": 10, "onMissAction": "alert" },
"atomicity": {
"stagingFolder": ".processing/",
"successFolder": ".archive/{YYYY-MM-DD}/",
"failureFolder": ".failed/"
},
"format": "csv"
}
}
Phase 1 scope: the IR schema accepts 7 connector types (s3, gcs,
azure_blob, sftp, ftp, local_fs, http_pull), but only
local_fs has a runtime executor in Phase 1. The rest land in Phase 3
(file-handling overhaul). Pipelines that reference unsupported connectors
will validate but fail at runtime with a clear scope-defer error.
Target node load modes
| Mode | Phase 1 status |
|---|
append | Implemented |
truncate | Implemented |
upsert | Implemented (requires upsertKey) |
blue_green | Deferred to Phase 4 |
incremental_watermark | Deferred to Phase 4 (requires watermarkColumn) |
cdc_mirror | Deferred to Phase 4 (requires cdcSource) |
Opting in
Pipeline IR is opt-in per tenant via the flowIrEnabled field on
tenant_settings. When enabled, new pipelines created via
POST /api/v1/pipelines with irVersion: "1.0" and an ir body field
are stored in the new pipeline_ir_versions table. Legacy pipelines
keep working unchanged.
API
POST /api/v1/pipelines — send { name, connectorId, schemaId, irVersion: "1.0", ir: { ... } }
to create an IR-native pipeline. See Pipelines API
for the full shape.
POST /api/v1/pipelines/:id/run — when the pipeline has irVersion
set, the request is handed to the in-process batch interpreter and
returns the per-node result synchronously. Legacy pipelines continue
going to the BullMQ worker queue.
Validation layers
The IR is enforced at three layers:
- Authoring time — Zod schemas in the UI / AI prompt
- Save time — server re-validates on POST and stores as
pipeline_ir_versions row
- Runtime — interpreter re-checks before executing
This makes “AI generated invalid IR” structurally impossible.