Skip to main content
The Pipeline IR is the new source of truth for ETL pipelines starting in KaireonAI Flow Phase 1. It replaces the legacy PipelineNode + PipelineEdge relational format with a single typed JSON document that the runtime interprets directly. Pipelines authored in IR mode are authored once and read by:
  • The visual canvas (renders the IR)
  • The AI assistant (proposes diffs against the IR)
  • The MCP server (accepts IR via tool calls)
  • The runtime interpreter (executes the IR)
Same source of truth, four surfaces.

Why IR-first

Generative AI is reliable when it produces structured data, not code. Constrained JSON output validated against a Zod schema cannot drift from the contract. This is what makes “AI builds the pipeline” actually work — every proposal is schema-valid by construction.

Top-level shape

{
  "kind": "pipeline",
  "version": "1.0",
  "id": "starbucks-daily-orders",
  "metadata": {
    "name": "Daily orders ingestion",
    "owner": "data-team",
    "tags": ["batch", "orders"]
  },
  "schedule": {
    "kind": "cron",
    "expression": "0 2 * * *",
    "timezone": "America/Los_Angeles"
  },
  "nodes": [ /* see node kinds below */ ],
  "errorHandling": {
    "dlq": { "enabled": false },
    "retry": { "maxAttempts": 3, "backoff": "exponential" }
  }
}

Node kinds (Phase 1)

KindPurpose
sourceRead from external system (file/DB/API/stream)
transformPer-row column ops — 15 op types
validateRow-level + dataset-level validation
targetWrite to internal schema table
archivePost-load file movement
branchConditional routing
joinMulti-input join
enrichPer-row external call (LLM tag, geocode, ML score)

Source node example

{
  "id": "src",
  "kind": "source",
  "connector": "local_fs",
  "config": {
    "path": "/data/orders/",
    "pattern": { "type": "glob", "value": "orders_*.csv" },
    "ordering": "latest_by_mtime",
    "waitPolicy": { "maxRetries": 6, "intervalMinutes": 10, "onMissAction": "alert" },
    "atomicity": {
      "stagingFolder": ".processing/",
      "successFolder": ".archive/{YYYY-MM-DD}/",
      "failureFolder": ".failed/"
    },
    "format": "csv"
  }
}
Phase 1 scope: the IR schema accepts 7 connector types (s3, gcs, azure_blob, sftp, ftp, local_fs, http_pull), but only local_fs has a runtime executor in Phase 1. The rest land in Phase 3 (file-handling overhaul). Pipelines that reference unsupported connectors will validate but fail at runtime with a clear scope-defer error.

Target node load modes

ModePhase 1 status
appendImplemented
truncateImplemented
upsertImplemented (requires upsertKey)
blue_greenDeferred to Phase 4
incremental_watermarkDeferred to Phase 4 (requires watermarkColumn)
cdc_mirrorDeferred to Phase 4 (requires cdcSource)

Opting in

Pipeline IR is opt-in per tenant via the flowIrEnabled field on tenant_settings. When enabled, new pipelines created via POST /api/v1/pipelines with irVersion: "1.0" and an ir body field are stored in the new pipeline_ir_versions table. Legacy pipelines keep working unchanged.

API

POST /api/v1/pipelines — send { name, connectorId, schemaId, irVersion: "1.0", ir: { ... } } to create an IR-native pipeline. See Pipelines API for the full shape. POST /api/v1/pipelines/:id/run — when the pipeline has irVersion set, the request is handed to the in-process batch interpreter and returns the per-node result synchronously. Legacy pipelines continue going to the BullMQ worker queue.

Validation layers

The IR is enforced at three layers:
  1. Authoring time — Zod schemas in the UI / AI prompt
  2. Save time — server re-validates on POST and stores as pipeline_ir_versions row
  3. Runtime — interpreter re-checks before executing
This makes “AI generated invalid IR” structurally impossible.