Skip to main content

Documentation Index

Fetch the complete documentation index at: https://docs.kaireonai.com/llms.txt

Use this file to discover all available pages before exploring further.

The Pipeline IR is the new source of truth for ETL pipelines starting in KaireonAI Flow Phase 1. It replaces the legacy PipelineNode + PipelineEdge relational format with a single typed JSON document that the runtime interprets directly. Pipelines authored in IR mode are authored once and read by:
  • The visual canvas (renders the IR)
  • The AI assistant (proposes diffs against the IR)
  • The MCP server (accepts IR via tool calls)
  • The runtime interpreter (executes the IR)
Same source of truth, four surfaces. See Transforms for the full reference of the column ops a transform node can carry. The IR schema accepts 19 op types; the visual editor exposes the 14 most-used ones in its toolbar (complex ops like summarize, vector_embed, geo_resolve, sentiment_score, language_detect are authored via the JSON IR tab or the AI assistant).

Why IR-first

Generative AI is reliable when it produces structured data, not code. Constrained JSON output validated against a Zod schema cannot drift from the contract. This is what makes “AI builds the pipeline” actually work — every proposal is schema-valid by construction.

Top-level shape

{
  "kind": "pipeline",
  "version": "1.0",
  "id": "starbucks-daily-orders",
  "metadata": {
    "name": "Daily orders ingestion",
    "owner": "data-team",
    "tags": ["batch", "orders"]
  },
  "schedule": {
    "kind": "cron",
    "expression": "0 2 * * *",
    "timezone": "America/Los_Angeles"
  },
  "nodes": [ /* see node kinds below */ ],
  "errorHandling": {
    "dlq": { "enabled": false },
    "retry": { "maxAttempts": 3, "backoff": "exponential" }
  }
}

Node kinds (Phase 1)

KindPurpose
sourceRead from external system (file/DB/API/stream)
transformPer-row column ops — 19 op types (Transforms)
validateRow-level + dataset-level validation
targetWrite to internal schema table
archivePost-load file movement
branchConditional routing
joinMulti-input join
enrichPer-row external call (LLM tag, geocode, ML score)

Source node example

{
  "id": "src",
  "kind": "source",
  "connector": "local_fs",
  "config": {
    "path": "/data/orders/",
    "pattern": { "type": "glob", "value": "orders_*.csv" },
    "ordering": "latest_by_mtime",
    "waitPolicy": { "maxRetries": 6, "intervalMinutes": 10, "onMissAction": "alert" },
    "atomicity": {
      "stagingFolder": ".processing/",
      "successFolder": ".archive/{YYYY-MM-DD}/",
      "failureFolder": ".failed/"
    },
    "format": "csv"
  }
}
The IR schema source.kind field accepts a fixed set of cloud-store and filesystem kinds: s3, gcs, azure_blob, sftp, ftp, local_fs, http_pull. Runtime executors for local_fs, s3, gcs, azure_blob, sftp, and http_pull are all live. ftp is documented as deprecated (plaintext) and never reaches a runtime executor. Streaming kinds (kafka, kinesis, pulsar) are gated by FLOW_STREAMING_ENABLED and only land when self-hosters provide their own broker. Note that source.kind is a separate concept from the full connector registry (85 types); pipelines reference a saved Connector record by connectorId, not by kind directly.

Target node load modes

ModeStatusNotes
appendLiveExplicit-column INSERT with NULLIF + per-column CAST
truncateLiveTRUNCATE + INSERT wrapped in prisma.$transaction. Empty-source guard (failOnEmptySource: true default) aborts before the destructive statement when upstream produced 0 rows
upsertLiveRequires upsertKey; throws if every projected column is in upsertKey (no updatable columns)
blue_greenLiveLoads to <table>_new, atomic 3-step rename, drops <table>_old. Empty-source guard applies
incremental_watermarkLiveHigh-water persisted in pipeline_watermarks; INSERT + watermark-upsert run in a single transaction. Requires watermarkColumn
cdc_mirrorGatedRequires FLOW_STREAMING_ENABLED=true env + a configured Debezium connector. Runtime returns a clear “streaming disabled” error otherwise

Optional safety fields on target nodes

FieldTypeBehavior
failOnEmptySourceboolean (default true)Aborts truncate / blue_green BEFORE the destructive statement when upstream produced 0 rows. Set false only when “empty file means clear the table” is the genuine intent
expectedRowCountDelta{ minPct?, maxPct?, windowRuns? }Emits a warning System Health alert when today’s rowsLoaded is outside the band vs. the running mean over the last N successful runs (default 7). Run still completes
backupBeforeLoad{ enabled, retainCount? }Snapshots the target via CREATE TABLE … AS TABLE … before destructive load. Only acts on truncate and blue_green. Backups older than retainCount (default 3) are pruned post-success. On failure, the backup is left in place and the run record’s meta.backupTable carries the snapshot table name

Tenant gate

Pipeline IR is on by defaulttenant_settings.flowIrEnabled defaults to true. New pipelines created via POST /api/v1/pipelines with irVersion: "1.0" and an ir body field are stored in the pipeline_ir_versions table. The flag remains in the schema as an explicit kill-switch: setting flowIrEnabled = false makes pipeline create/run/AI-author endpoints return 403 flow_ir_disabled. The legacy ETL editor and runtime were removed in the 2026-04-28 cleanup, so disabling the flag effectively disables pipelines entirely.

API

POST /api/v1/pipelines — send { name, connectorId, schemaId, irVersion: "1.0", ir: { ... } } to create an IR-native pipeline. See Pipelines API for the full shape. POST /api/v1/pipelines/:id/run — when the pipeline has irVersion set, the request is handed to the in-process batch interpreter and returns the per-node result synchronously. Legacy pipelines continue going to the BullMQ worker queue.

Validation layers

The IR is enforced at three layers:
  1. Authoring time — Zod schemas in the UI / AI prompt
  2. Save time — server re-validates on POST and stores as pipeline_ir_versions row
  3. Runtime — interpreter re-checks before executing
This makes “AI generated invalid IR” structurally impossible.