Skip to main content

Documentation Index

Fetch the complete documentation index at: https://docs.kaireonai.com/llms.txt

Use this file to discover all available pages before exploring further.

Pipeline execution modes. batch and micro_batch are fully implemented. streaming is a planned placeholder — it is disabled in the UI and will not spawn a long-lived consumer at the API layer. See Data Platform → Execution Config for details.
Coming-soon connectors. Pipelines whose source connector is amazon_kinesis or braze will no-op at execution time. The executor logs a message and returns zero rows until ingestion support is wired. See Connectors for the full status table.

GET /api/v1/pipelines

List all pipelines with their nodes, edges, connector, and schema references.

Query Parameters

ParameterTypeDefaultDescription
limitinteger20Max results per page
cursorstringCursor for pagination

Response

{
  "data": [
    {
      "id": "pipe_001",
      "tenantId": "tenant_001",
      "name": "Customer Import",
      "description": "Daily customer data sync from Snowflake",
      "status": "draft",
      "schedule": "0 2 * * *",
      "connectorId": "conn_001",
      "schemaId": "schema_001",
      "executionConfig": { "batchSize": 5000, "parallelism": 4 },
      "connector": { "id": "conn_001", "name": "Production Snowflake", "type": "snowflake" },
      "schema": { "id": "schema_001", "name": "customers", "displayName": "Customers" },
      "nodes": [],
      "edges": [],
      "lastRunAt": null,
      "lastRunStatus": null,
      "createdAt": "2026-01-15T09:00:00.000Z",
      "updatedAt": "2026-01-15T09:00:00.000Z"
    }
  ],
  "pagination": {
    "total": 3,
    "hasMore": false,
    "limit": 50,
    "cursor": null
  }
}

POST /api/v1/pipelines

Create a new pipeline with optional nodes and edges.

Request Body

FieldTypeRequiredDescription
namestringYesPipeline name
connectorIdstringYesSource connector ID
schemaIdstringYesTarget schema ID
descriptionstringNoDescription
schedulestringNoCron expression for scheduled execution
executionConfigobjectNoExecution settings (batchSize, parallelism, partitioning)
nodesarrayNoPipeline flow nodes (see node object) — legacy format
edgesarrayNoConnections between nodes — legacy format
irVersionstring | nullNoWhen "1.0", the request is interpreted as IR-native. Omitting this preserves the legacy node/edge format.
irobjectConditionalRequired when irVersion === "1.0". The full Pipeline IR document. See Pipeline IR.

IR-native creation

When the request includes irVersion: "1.0" and an ir body, the legacy nodes / edges fields are ignored. The IR is validated (Zod + structural) and stored as version 1 in the pipeline_ir_versions table. Subsequent runs via POST /pipelines/:id/run are dispatched to the in-process batch interpreter rather than the BullMQ worker queue.

Node Object

FieldTypeDescription
idstringTemporary node ID (used for edge mapping)
nodeTypestringNode type: "source", "transform", "filter", "target", etc.
labelstringDisplay label
configobjectType-specific configuration
positionobject{ x: number, y: number } for visual editor

Edge Object

FieldTypeDescription
sourcestringSource node temp ID
targetstringTarget node temp ID
labelstringEdge label

Example

curl -X POST https://playground.kaireonai.com/api/v1/pipelines \
  -H "Content-Type: application/json" \
  -H "X-Tenant-Id: my-tenant" \
  -d '{
    "name": "Customer Import",
    "connectorId": "conn_001",
    "schemaId": "schema_001",
    "schedule": "0 2 * * *",
    "executionConfig": { "batchSize": 5000, "parallelism": 4 },
    "nodes": [
      { "id": "n1", "nodeType": "source", "label": "Snowflake Source", "config": {} },
      { "id": "n2", "nodeType": "transform", "label": "Rename Fields", "config": { "type": "rename_field" } },
      { "id": "n3", "nodeType": "target", "label": "Customer Table", "config": {} }
    ],
    "edges": [
      { "source": "n1", "target": "n2" },
      { "source": "n2", "target": "n3" }
    ]
  }'
Response: 201 Created

PUT /api/v1/pipelines

Update a pipeline. When nodes are provided, existing nodes and edges are replaced entirely.

Request Body

FieldTypeRequiredDescription
idstringYesPipeline ID
namestringNoUpdated name
descriptionstringNoUpdated description
schedulestringNoUpdated cron schedule
executionConfigobjectNoUpdated execution config
nodesarrayNoReplaces all nodes
edgesarrayNoReplaces all edges (requires nodes)
Response: 200 OK

DELETE /api/v1/pipelines

Delete a pipeline and its nodes/edges.
ParameterTypeRequiredDescription
idstringYesPipeline ID (query parameter)
Response: 204 No Content

POST /api/v1/pipelines//run

Trigger a pipeline execution. Creates a PipelineRun record and enqueues the job via BullMQ for worker pod processing.

Response

{
  "id": "run_001",
  "pipelineId": "pipe_001",
  "status": "pending",
  "createdAt": "2026-03-16T14:30:00.000Z"
}
Response: 201 Created

GET /api/v1/pipelines//runs

List recent execution runs for a pipeline (last 20 runs, newest first).

Response

[
  {
    "id": "run_001",
    "pipelineId": "pipe_001",
    "status": "completed",
    "createdAt": "2026-03-16T02:00:00.000Z",
    "completedAt": "2026-03-16T02:05:30.000Z"
  }
]

GET /api/v1/pipelines//ir/versions

List the IR version history for a pipeline, descending by version number. Backed by listPipelineIrVersions from src/lib/flow/repo/pipeline-ir-repo.ts:78. Each save through POST /api/v1/pipelines/{id}/ir writes a new row to pipeline_ir_versions with the next version number — this endpoint reads them back for diff/rollback UIs. The route at src/app/api/v1/pipelines/[id]/ir/versions/route.ts:26-30 runs a tenant-scoped prisma.pipeline.findFirst BEFORE the version lookup so a caller from a different tenant cannot probe pipeline ids by reading an empty [] (200) — they get a 404 instead.

Path Parameters

ParameterTypeDescription
idstringPipeline.id — must belong to the tenant.

Response

Returned at versions/route.ts:33-41. The full IR is intentionally NOT included — only the version metadata. Fetch a specific IR via GET /api/v1/pipelines/{id}/ir?version=N.
[
  {
    "version": 7,
    "authoredBy": "alice@example.com",
    "comment": "Add filter for high-value customers",
    "createdAt": "2026-04-30T14:00:00.000Z"
  },
  {
    "version": 6,
    "authoredBy": "bob@example.com",
    "comment": null,
    "createdAt": "2026-04-29T10:30:00.000Z"
  }
]
version
number
Monotonically increasing per pipeline. The first save produces version: 1.
authoredBy
string | null
Operator id supplied to savePipelineIr at the time of the save (pipeline-ir-repo.ts:18).
comment
string | null
Optional human-readable comment attached at save time.
createdAt
string
ISO timestamp of when the version row was inserted.

Status codes

CodeWhenSource
200Returns the version array (possibly empty for pipelines with no IR yet)versions/route.ts:33
401Caller is not authenticatedrequireRole
403Caller is not viewer, editor, or adminversions/route.ts:20
404Pipeline not found for tenantversions/route.ts:30

Roles

admin, editor, viewer.

Roles

EndpointAllowed Roles
GET /pipelinesadmin, editor, viewer
POST /pipelinesadmin, editor
PUT /pipelinesadmin, editor
DELETE /pipelinesadmin, editor
POST /pipelines/{id}/runadmin, editor
GET /pipelines/{id}/runsany authenticated
GET /pipelines/{id}/ir/versionsadmin, editor, viewer
See also: Data Platform