Skip to main content

Documentation Index

Fetch the complete documentation index at: https://docs.kaireonai.com/llms.txt

Use this file to discover all available pages before exploring further.

This page is the working-example companion to the platform docs. Every cURL block below was actually executed against a live local platform during the 2026-05-02 testing pass — copy, paste, change the IDs, run.

Setup assumptions

  • Platform running at http://localhost:3000 (or substitute your host).
  • A user is signed in; cookies are persisted in kc.txt (see API Authentication for getting a session cookie programmatically). For server-to-server, swap the -b kc.txt for -H "X-API-Key: krn_...".
  • AWS CLI configured with creds that can aws sts assume-role against the role you’re about to create.

1. Create the IAM role the connector will assume

Trust policy lets your local IAM identity assume the role with an external ID; the policy grants minimal S3 access on one bucket.
cat > /tmp/trust.json <<'JSON'
{
  "Version": "2012-10-17",
  "Statement": [{
    "Effect": "Allow",
    "Principal": { "AWS": "arn:aws:iam::<ACCOUNT_ID>:user/<YOUR_USER>" },
    "Action": "sts:AssumeRole",
    "Condition": { "StringEquals": { "sts:ExternalId": "kaireon-test-2026-05-02" } }
  }]
}
JSON
cat > /tmp/s3-policy.json <<'JSON'
{
  "Version": "2012-10-17",
  "Statement": [{
    "Effect": "Allow",
    "Action": ["s3:GetObject","s3:ListBucket","s3:PutObject","s3:DeleteObject","s3:CopyObject"],
    "Resource": ["arn:aws:s3:::<YOUR_BUCKET>","arn:aws:s3:::<YOUR_BUCKET>/*"]
  }]
}
JSON

aws iam create-role --role-name kaireon-customer-test --assume-role-policy-document file:///tmp/trust.json
aws iam put-role-policy --role-name kaireon-customer-test --policy-name s3-access --policy-document file:///tmp/s3-policy.json

# IAM is eventually consistent; wait ~15s before first AssumeRole.
sleep 15
aws sts assume-role --role-arn arn:aws:iam::<ACCOUNT_ID>:role/kaireon-customer-test \
  --role-session-name verify --external-id kaireon-test-2026-05-02 --duration-seconds 900

2. Upload a sample CSV

aws s3 cp ./customers.csv s3://<YOUR_BUCKET>/customer-test-2026-05-02/customers.csv
The example dataset has the columns customer_id, first_name, last_name, email, phone, address, city, state, zip_code, country, signup_date, segment, status, lifetime_value, orders_count.

3. Create the S3 connector via the API

POST /api/v1/connectors with authMethod: "iam_role" triggers the STS AssumeRole + HeadBucket + ListObjectsV2 verification path.
curl -s -b kc.txt -X POST http://localhost:3000/api/v1/connectors \
  -H "X-Requested-With: XMLHttpRequest" -H "Content-Type: application/json" \
  -d '{
    "name": "Customer S3 (IAM Role)",
    "type": "aws_s3",
    "description": "Reads customer.csv via assumed IAM role",
    "config": {
      "bucket": "<YOUR_BUCKET>",
      "region": "us-east-1",
      "prefix": "customer-test-2026-05-02/"
    },
    "authMethod": "iam_role",
    "authConfig": {
      "roleArn": "arn:aws:iam::<ACCOUNT_ID>:role/kaireon-customer-test",
      "externalId": "kaireon-test-2026-05-02"
    }
  }'
# → { "id": "<CONNECTOR_ID>", "type": "aws_s3", "authMethod": "iam_role", "status": "active", … }
Verify the connection (this exercises STS AssumeRole + HeadBucket end-to-end):
curl -s -b kc.txt -X POST http://localhost:3000/api/v1/connectors/test \
  -H "X-Requested-With: XMLHttpRequest" -H "Content-Type: application/json" \
  -d '{ "id": "<CONNECTOR_ID>" }'
# → { ..., "status": "active" }   (status flips to "error" with lastError if AssumeRole fails)

4. Create the schema with customer_id as the primary key

When any field is marked isPrimaryKey: true, the auto-generated id BIGSERIAL column is skipped and your column becomes the table’s PK. See Data Model.
curl -s -b kc.txt -X POST http://localhost:3000/api/v1/schemas \
  -H "X-Requested-With: XMLHttpRequest" -H "Content-Type: application/json" \
  -d '{
    "name": "customer",
    "displayName": "Customer",
    "entityType": "custom",
    "schemaType": "customer",
    "fields": [
      { "name": "customer_id",    "displayName": "Customer Id",    "dataType": "varchar", "isPrimaryKey": true, "isNullable": false, "isUnique": true, "ordinal": 1 },
      { "name": "first_name",     "displayName": "First Name",     "dataType": "varchar", "isNullable": true, "ordinal": 2 },
      { "name": "last_name",      "displayName": "Last Name",      "dataType": "varchar", "isNullable": true, "ordinal": 3 },
      { "name": "email",          "displayName": "Email",          "dataType": "varchar", "isNullable": true, "ordinal": 4 },
      { "name": "phone",          "displayName": "Phone",          "dataType": "varchar", "isNullable": true, "ordinal": 5 },
      { "name": "address",        "displayName": "Address",        "dataType": "varchar", "isNullable": true, "ordinal": 6 },
      { "name": "city",           "displayName": "City",           "dataType": "varchar", "isNullable": true, "ordinal": 7 },
      { "name": "state",          "displayName": "State",          "dataType": "varchar", "isNullable": true, "ordinal": 8 },
      { "name": "zip_code",       "displayName": "Zip Code",       "dataType": "varchar", "isNullable": true, "ordinal": 9 },
      { "name": "country",        "displayName": "Country",        "dataType": "varchar", "isNullable": true, "ordinal": 10 },
      { "name": "signup_date",    "displayName": "Signup Date",    "dataType": "date",    "isNullable": true, "ordinal": 11 },
      { "name": "segment",        "displayName": "Segment",        "dataType": "varchar", "isNullable": true, "ordinal": 12 },
      { "name": "status",         "displayName": "Status",         "dataType": "varchar", "isNullable": true, "ordinal": 13 },
      { "name": "lifetime_value", "displayName": "Lifetime Value", "dataType": "numeric", "precision": 12, "scale": 2, "isNullable": true, "ordinal": 14 },
      { "name": "orders_count",   "displayName": "Orders Count",   "dataType": "integer", "isNullable": true, "ordinal": 15 }
    ]
  }'
# → { "id": "<SCHEMA_ID>", "tableName": "ds_customer", "fields": [ ... 15 ... ] }
Inspect the resulting Postgres table to confirm customer_id is the PK and there is no auto id column:
psql "$DATABASE_URL" -c '\d ds_customer'
# Indexes:
#   "ds_customer_pkey" PRIMARY KEY, btree (customer_id)

5. Create the pipeline IR

The IR has four nodes: sourcetransformvalidatetarget (upsert keyed on customer_id, with the safety bundle: empty-source guard on, validate quarantine to a DLQ table). Note the required irVersion: "1.0" and the errorHandling block.
curl -s -b kc.txt -X POST http://localhost:3000/api/v1/pipelines \
  -H "X-Requested-With: XMLHttpRequest" -H "Content-Type: application/json" \
  -d '{
    "name": "Customer File Test",
    "description": "S3 → transform → validate → upsert with full safety bundle",
    "connectorId": "<CONNECTOR_ID>",
    "schemaId": "<SCHEMA_ID>",
    "irVersion": "1.0",
    "ir": {
      "kind": "pipeline",
      "version": "1.0",
      "id": "customer-file-test",
      "metadata": { "name": "Customer File Test", "owner": "you" },
      "errorHandling": { "dlq": { "enabled": false }, "retry": { "maxAttempts": 3, "backoff": "exponential" } },
      "nodes": [
        {
          "id": "src",
          "kind": "source",
          "connector": "s3",
          "config": {
            "path": "customer-test-2026-05-02/",
            "bucket": "<YOUR_BUCKET>",
            "format": "csv",
            "pattern": { "type": "glob", "value": "*.csv" },
            "ordering": "latest_by_mtime",
            "atomicity": {
              "stagingFolder": ".processing/",
              "successFolder": ".archive/{YYYY-MM-DD}/",
              "failureFolder": ".failed/"
            },
            "waitPolicy": { "maxRetries": 2, "intervalMinutes": 1, "onMissAction": "alert" }
          }
        },
        {
          "id": "uppercase_state",
          "kind": "transform",
          "input": "src",
          "ops": [
            { "type": "expression", "outputField": "state", "formula": "concat(state)" },
            { "type": "rename", "from": "zip_code", "to": "zip_code" }
          ]
        },
        {
          "id": "validate_email",
          "kind": "validate",
          "input": "uppercase_state",
          "rowLevel": [
            { "type": "regex", "field": "email", "pattern": "^[^@]+@[^@]+\\.[^@]+$", "onFail": "warn" },
            { "type": "notNull", "field": "customer_id", "onFail": "abort" }
          ],
          "datasetLevel": { "rowCount": { "min": 1, "max": 1000000, "onFail": "abort" } },
          "quarantine": { "enabled": true, "table": "dlq.customer_rejected" }
        },
        {
          "id": "tgt",
          "kind": "target",
          "input": "validate_email",
          "schema": "public.ds_customer",
          "loadMode": "upsert",
          "upsertKey": ["customer_id"],
          "failOnEmptySource": true
        }
      ]
    }
  }'
# → { "id": "<PIPELINE_ID>", "irVersion": "1.0", "status": "draft", ... }

6. Run the pipeline

curl -s -b kc.txt -X POST http://localhost:3000/api/v1/pipelines/<PIPELINE_ID>/run \
  -H "X-Requested-With: XMLHttpRequest" -H "Content-Type: application/json" -d '{}'
Expected response on success:
{
  "runId": "9f71b01c-b90d-4dab-8fc0-6e2d05d417a0",
  "irVersion": 1,
  "ok": true,
  "nodeResults": [
    { "id": "src", "kind": "source", "rowsOut": 100, "meta": { "matchedFiles": 1, "archivedTo": ".archive/2026-05-02/" } },
    { "id": "uppercase_state", "kind": "transform", "rowsOut": 100 },
    { "id": "validate_email", "kind": "validate", "rowsOut": 100, "meta": { "rowLevelRules": 2, "rowLevelFailed": 0 } },
    { "id": "tgt", "kind": "target", "rowsLoaded": 100, "meta": { "missingFromSource": ["created_at", "updated_at"] } }
  ]
}

7. Switch the load mode and re-run

The IR is updated via POST /api/v1/pipelines/:id/ir (creates a new version atomically). Pattern: GET the current IR, mutate the target node, POST the new envelope. Re-upload the source file each run because the source executor archives it after success.

Switch to truncate (with the empty-source guard)

# Get current IR
curl -s -b kc.txt http://localhost:3000/api/v1/pipelines/<PIPELINE_ID>/ir \
  -H "X-Requested-With: XMLHttpRequest" > /tmp/ir.json

# Mutate the target node with jq (or your favorite JSON tool)
jq '.ir.nodes |= map(if .kind == "target" then .loadMode = "truncate" | del(.upsertKey) else . end) | { ir, comment: "switch to truncate" }' \
  /tmp/ir.json > /tmp/ir-truncate.json

curl -s -b kc.txt -X POST http://localhost:3000/api/v1/pipelines/<PIPELINE_ID>/ir \
  -H "X-Requested-With: XMLHttpRequest" -H "Content-Type: application/json" \
  -d @/tmp/ir-truncate.json
Re-upload the file and run:
aws s3 cp ./customers.csv s3://<YOUR_BUCKET>/customer-test-2026-05-02/customers.csv
curl -s -b kc.txt -X POST http://localhost:3000/api/v1/pipelines/<PIPELINE_ID>/run \
  -H "X-Requested-With: XMLHttpRequest" -H "Content-Type: application/json" -d '{}'

Test the empty-source guard

Don’t re-upload the file. The next run produces an empty source, and the runtime aborts truncate BEFORE the destructive statement so your table is preserved:
curl -s -b kc.txt -X POST http://localhost:3000/api/v1/pipelines/<PIPELINE_ID>/run \
  -H "X-Requested-With: XMLHttpRequest" -H "Content-Type: application/json" -d '{}'
# → { "ok": false, "failedNodeId": "tgt",
#     "error": "target public.ds_customer: source is empty (0 rows from validate_email);
#               aborting truncate to prevent data loss. Set failOnEmptySource:false to override." }
A warning System Health alert (“Pipeline source had no files”) and an error alert (“Pipeline run failed: Customer File Test”) fire automatically. Read them via GET /api/v1/system-health (see System Health).

Switch to blue_green

jq '.ir.nodes |= map(if .kind == "target" then .loadMode = "blue_green" | del(.upsertKey, .watermarkColumn) else . end) | { ir, comment: "switch to blue_green" }' \
  /tmp/ir.json > /tmp/ir-bg.json
curl -s -b kc.txt -X POST http://localhost:3000/api/v1/pipelines/<PIPELINE_ID>/ir \
  -H "X-Requested-With: XMLHttpRequest" -H "Content-Type: application/json" -d @/tmp/ir-bg.json
Run as before. The runtime loads to ds_customer_new, then issues the atomic 3-step rename (target → target_old, target_new → target, DROP target_old). Empty-source guard applies — same protection as truncate.

Switch to incremental_watermark

jq '.ir.nodes |= map(if .kind == "target" then .loadMode = "incremental_watermark" | .watermarkColumn = "signup_date" | del(.upsertKey) else . end) | { ir, comment: "switch to watermark" }' \
  /tmp/ir.json > /tmp/ir-wm.json
curl -s -b kc.txt -X POST http://localhost:3000/api/v1/pipelines/<PIPELINE_ID>/ir \
  -H "X-Requested-With: XMLHttpRequest" -H "Content-Type: application/json" -d @/tmp/ir-wm.json
The first run reads MAX(signup_date) from the target and writes the high-water to pipeline_watermarks. Subsequent runs only load rows where signup_date > <persisted watermark>. The INSERT and watermark upsert run inside a single Postgres transaction so a failed load rolls back the watermark advance.

Try cdc_mirror (env-gated)

cdc_mirror requires FLOW_STREAMING_ENABLED=true and a configured Debezium connector. Without those, the runtime returns a clear “streaming disabled” error — useful as a smoke test that the gate is wired:
jq '.ir.nodes |= map(if .kind == "target" then .loadMode = "cdc_mirror" | .cdcSource = "debezium://orders-cdc" | del(.upsertKey, .watermarkColumn) else . end) | { ir }' \
  /tmp/ir.json > /tmp/ir-cdc.json
# IR save will fail with the validation message:
# → { "errors": ["cdc_mirror is not yet wired in this build (FLOW_STREAMING_ENABLED is off)"] }

8. Verify rows landed

psql "$DATABASE_URL" -c "SELECT count(*), max(signup_date) FROM ds_customer;"
For lineage:
curl -s -b kc.txt 'http://localhost:3000/api/v1/lineage?schema=public&table=ds_customer&limit=5' \
  -H "X-Requested-With: XMLHttpRequest"
# → { "rows": [ { "row": { "customer_id": "CUST-00001", "lifetime_value": "10576.95", ... },
#                 "lineage": { "runId": "...", "pipelineId": "...", "sourceNodeId": "src" } } ], ... }
The lifetime_value arrives as a string (“10576.95”) because the route runs through the safeJson helper that converts Postgres numeric / Decimal to its string form (preserves precision past Number.MAX_SAFE_INTEGER). See Flow Lineage.

9. Read the System Health feed

Every operational error from the run path lands here:
curl -s -b kc.txt 'http://localhost:3000/api/v1/system-health?limit=10' \
  -H "X-Requested-With: XMLHttpRequest"
Mark one read:
curl -s -b kc.txt -X PATCH 'http://localhost:3000/api/v1/system-health/<ALERT_ID>/read' \
  -H "X-Requested-With: XMLHttpRequest"
See System Health for the full API surface (read-all, dismiss, severity / source filtering, retention purge cron).

10. Optional: drive the same pipeline via MCP

The MCP server exposes the same operations as tools. From an MCP-aware client (Claude Desktop, Cursor):
ToolWhat it does
listFlowPipelinesReturns the same data as GET /api/v1/pipelines
runFlowPipelineSame as POST /api/v1/pipelines/:id/run
getFlowPipelineIrSame as GET /api/v1/pipelines/:id/ir
updateFlowPipelineIrPOST a new IR version
inspectFlowErrorReturns DLQ rows for the latest failed run
replayFlowRunRe-runs a pipeline from a specific run id
Start the server: cd platform && npm run mcp (set KAIREONAI_API_KEY and KAIREONAI_TENANT_ID in env). See MCP Flow Server.

Cleanup

# Drop the schema (cascades to ds_customer)
curl -s -b kc.txt -X DELETE 'http://localhost:3000/api/v1/schemas?id=<SCHEMA_ID>' \
  -H "X-Requested-With: XMLHttpRequest"
# Delete the connector
curl -s -b kc.txt -X DELETE 'http://localhost:3000/api/v1/connectors?id=<CONNECTOR_ID>' \
  -H "X-Requested-With: XMLHttpRequest"
# Tear down the IAM role
aws iam delete-role-policy --role-name kaireon-customer-test --policy-name s3-access
aws iam delete-role --role-name kaireon-customer-test