Documentation Index
Fetch the complete documentation index at: https://docs.kaireonai.com/llms.txt
Use this file to discover all available pages before exploring further.
Pipeline execution modes. batch and micro_batch are fully
implemented. streaming is a planned placeholder — it is disabled in the UI
and will not spawn a long-lived consumer at the API layer. See
Data Platform → Execution Config for details.
Coming-soon connectors. Pipelines whose source connector is
amazon_kinesis or braze will no-op at execution time. The
executor logs a message and returns zero rows until ingestion support
is wired. See Connectors for the full
status table.
GET /api/v1/pipelines
List all pipelines with their nodes, edges, connector, and schema references.
Query Parameters
| Parameter | Type | Default | Description |
|---|
limit | integer | 20 | Max results per page |
cursor | string | — | Cursor for pagination |
Response
{
"data": [
{
"id": "pipe_001",
"tenantId": "tenant_001",
"name": "Customer Import",
"description": "Daily customer data sync from Snowflake",
"status": "draft",
"schedule": "0 2 * * *",
"connectorId": "conn_001",
"schemaId": "schema_001",
"executionConfig": { "batchSize": 5000, "parallelism": 4 },
"connector": { "id": "conn_001", "name": "Production Snowflake", "type": "snowflake" },
"schema": { "id": "schema_001", "name": "customers", "displayName": "Customers" },
"nodes": [],
"edges": [],
"lastRunAt": null,
"lastRunStatus": null,
"createdAt": "2026-01-15T09:00:00.000Z",
"updatedAt": "2026-01-15T09:00:00.000Z"
}
],
"pagination": {
"total": 3,
"hasMore": false,
"limit": 50,
"cursor": null
}
}
POST /api/v1/pipelines
Create a new pipeline with optional nodes and edges.
Request Body
| Field | Type | Required | Description |
|---|
name | string | Yes | Pipeline name |
connectorId | string | Yes | Source connector ID |
schemaId | string | Yes | Target schema ID |
description | string | No | Description |
schedule | string | No | Cron expression for scheduled execution |
executionConfig | object | No | Execution settings (batchSize, parallelism, partitioning) |
nodes | array | No | Pipeline flow nodes (see node object) — legacy format |
edges | array | No | Connections between nodes — legacy format |
irVersion | string | null | No | When "1.0", the request is interpreted as IR-native. Omitting this preserves the legacy node/edge format. |
ir | object | Conditional | Required when irVersion === "1.0". The full Pipeline IR document. See Pipeline IR. |
IR-native creation
When the request includes irVersion: "1.0" and an ir body, the legacy
nodes / edges fields are ignored. The IR is validated (Zod + structural)
and stored as version 1 in the pipeline_ir_versions table. Subsequent
runs via POST /pipelines/:id/run are dispatched to the in-process batch
interpreter rather than the BullMQ worker queue.
Node Object
| Field | Type | Description |
|---|
id | string | Temporary node ID (used for edge mapping) |
nodeType | string | Node type: "source", "transform", "filter", "target", etc. |
label | string | Display label |
config | object | Type-specific configuration |
position | object | { x: number, y: number } for visual editor |
Edge Object
| Field | Type | Description |
|---|
source | string | Source node temp ID |
target | string | Target node temp ID |
label | string | Edge label |
Example
curl -X POST https://playground.kaireonai.com/api/v1/pipelines \
-H "Content-Type: application/json" \
-H "X-Tenant-Id: my-tenant" \
-d '{
"name": "Customer Import",
"connectorId": "conn_001",
"schemaId": "schema_001",
"schedule": "0 2 * * *",
"executionConfig": { "batchSize": 5000, "parallelism": 4 },
"nodes": [
{ "id": "n1", "nodeType": "source", "label": "Snowflake Source", "config": {} },
{ "id": "n2", "nodeType": "transform", "label": "Rename Fields", "config": { "type": "rename_field" } },
{ "id": "n3", "nodeType": "target", "label": "Customer Table", "config": {} }
],
"edges": [
{ "source": "n1", "target": "n2" },
{ "source": "n2", "target": "n3" }
]
}'
Response: 201 Created
PUT /api/v1/pipelines
Update a pipeline. When nodes are provided, existing nodes and edges are replaced entirely.
Request Body
| Field | Type | Required | Description |
|---|
id | string | Yes | Pipeline ID |
name | string | No | Updated name |
description | string | No | Updated description |
schedule | string | No | Updated cron schedule |
executionConfig | object | No | Updated execution config |
nodes | array | No | Replaces all nodes |
edges | array | No | Replaces all edges (requires nodes) |
Response: 200 OK
DELETE /api/v1/pipelines
Delete a pipeline and its nodes/edges.
| Parameter | Type | Required | Description |
|---|
id | string | Yes | Pipeline ID (query parameter) |
Response: 204 No Content
POST /api/v1/pipelines//run
Trigger a pipeline execution. Creates a PipelineRun record and enqueues the job via BullMQ for worker pod processing.
Response
{
"id": "run_001",
"pipelineId": "pipe_001",
"status": "pending",
"createdAt": "2026-03-16T14:30:00.000Z"
}
Response: 201 Created
GET /api/v1/pipelines//runs
List recent execution runs for a pipeline (last 20 runs, newest first).
Response
[
{
"id": "run_001",
"pipelineId": "pipe_001",
"status": "completed",
"createdAt": "2026-03-16T02:00:00.000Z",
"completedAt": "2026-03-16T02:05:30.000Z"
}
]
GET /api/v1/pipelines//ir/versions
List the IR version history for a pipeline, descending by version number. Backed by listPipelineIrVersions from src/lib/flow/repo/pipeline-ir-repo.ts:78. Each save through POST /api/v1/pipelines/{id}/ir writes a new row to pipeline_ir_versions with the next version number — this endpoint reads them back for diff/rollback UIs.
The route at src/app/api/v1/pipelines/[id]/ir/versions/route.ts:26-30 runs a tenant-scoped prisma.pipeline.findFirst BEFORE the version lookup so a caller from a different tenant cannot probe pipeline ids by reading an empty [] (200) — they get a 404 instead.
Path Parameters
| Parameter | Type | Description |
|---|
id | string | Pipeline.id — must belong to the tenant. |
Response
Returned at versions/route.ts:33-41. The full IR is intentionally NOT included — only the version metadata. Fetch a specific IR via GET /api/v1/pipelines/{id}/ir?version=N.
[
{
"version": 7,
"authoredBy": "alice@example.com",
"comment": "Add filter for high-value customers",
"createdAt": "2026-04-30T14:00:00.000Z"
},
{
"version": 6,
"authoredBy": "bob@example.com",
"comment": null,
"createdAt": "2026-04-29T10:30:00.000Z"
}
]
Monotonically increasing per pipeline. The first save produces version: 1.
Operator id supplied to savePipelineIr at the time of the save (pipeline-ir-repo.ts:18).
Optional human-readable comment attached at save time.
ISO timestamp of when the version row was inserted.
Status codes
| Code | When | Source |
|---|
| 200 | Returns the version array (possibly empty for pipelines with no IR yet) | versions/route.ts:33 |
| 401 | Caller is not authenticated | requireRole |
| 403 | Caller is not viewer, editor, or admin | versions/route.ts:20 |
| 404 | Pipeline not found for tenant | versions/route.ts:30 |
Roles
admin, editor, viewer.
Roles
| Endpoint | Allowed Roles |
|---|
GET /pipelines | admin, editor, viewer |
POST /pipelines | admin, editor |
PUT /pipelines | admin, editor |
DELETE /pipelines | admin, editor |
POST /pipelines/{id}/run | admin, editor |
GET /pipelines/{id}/runs | any authenticated |
GET /pipelines/{id}/ir/versions | admin, editor, viewer |
See also: Data Platform