Skip to main content

Documentation Index

Fetch the complete documentation index at: https://docs.kaireonai.com/llms.txt

Use this file to discover all available pages before exploring further.

Phase 4 closes the four areas the Phase 1 runtime explicitly stubbed:
  • All six target load modes are now real (with cdc_mirror reserved for the streaming runtime in Phase 6).
  • Pre/post hooks fire around the load.
  • Dataset-level validators run as parameterized SQL probes.
  • Row-level rules evaluate in memory, with optional DLQ quarantine.

Target load modes

ModeWhen to useWhat runs
appendInsert new rows alongside existing dataExplicit-column projection from the staging rows into the target table (column-aware projection that nulls out empty strings)
truncateReplace all rows on every runTruncates the target table and reloads from staging — wrapped in a single Postgres transaction so a failed reload rolls back the truncate. Aborts before any destructive statement runs when upstream produced 0 rows (see Empty-source guard below).
upsertIdempotent merge by primary-like keyINSERT … ON CONFLICT (key) DO UPDATE SET nonkey = EXCLUDED.nonkey (requires upsertKey; throws if every projected column is in upsertKey since there’s nothing to update)
blue_greenAtomic swap with zero downtimeLoad to <table>_new, then 3-step rename triple, drop <table>_old. Aborts before the load when upstream produced 0 rows.
incremental_watermarkAppend-only with high-watermark filterReads the persisted high-water from pipeline_watermarks, INSERT-and-watermark-upsert wrapped in a single transaction so a failed load rolls back the watermark advance (requires watermarkColumn)
cdc_mirrorStreaming change-data-capture mirrorGated behind FLOW_STREAMING_ENABLED=true. When the env is off, throws “streaming disabled”. When on but no Debezium connector is configured, throws “Debezium connector not configured” with a docs link.

Empty-source guard

When loadMode is truncate or blue_green and the upstream produced 0 rows, the runtime aborts the load before issuing the destructive statement. The run fails with:
target <schema>.<table>: source is empty (0 rows from <input>); aborting <mode> to prevent data loss.
Set failOnEmptySource:false to override.
This default closes a real footgun: if your scheduled file lands empty or the source connector has a glitch, your live table doesn’t get wiped to zero rows. Existing pipelines that genuinely want “empty file means clear the table” set failOnEmptySource: false on the target node. A side-effect: when an onMissAction: alert source comes up empty, the source executor also emits a warning alert into the System Health feed so operators see it in the topbar widget without having to open the runs page.

Mode-switch validation

Picking a load mode in the UI surfaces inline errors before save when:
  • upsert is chosen without an upsertKey, or with keys not in the destination schema, or with keys covering every column.
  • incremental_watermark is chosen without a watermarkColumn, or the column doesn’t exist in the destination schema.
  • cdc_mirror is chosen without a cdcSource.
  • truncate is chosen on a “full-refresh-shaped” pipeline (exactly one source + one target). Soft warning recommends blue_green; you can keep truncate if intentional.
The same checks run server-side in parsePipelineIR so the JSON IR tab can’t bypass them.

Blue-green details

Blue-green loads the new dataset into a sibling <table>_new, runs the post-hook (e.g., a stats-collection command or a materialized-view refresh), then issues three sequential alter table … rename statements:
target → target_old
target_new → target
DROP target_old
If the INSERT into <table>_new fails, <table>_new is dropped and the upstream error rethrows so the existing target is untouched.

Incremental-watermark details

The high-water mark lives in the pipeline_watermarks table keyed by (tenantId, pipelineId, targetSchema, targetTable). Each run:
  1. Reads the persisted watermarkValue (falls back to SELECT MAX(<watermarkColumn>) FROM <target> on the first run after an upgrade so we don’t replay history).
  2. Computes the new MAX from the staging table.
  3. Wraps the INSERT … WHERE col > $1 and the watermark upsert in a single Postgres transaction so a failed INSERT rolls back the watermark advance.
When both the target and source are empty, the watermark defaults to 0001-01-01 (timestamp) or 0 (integer/bigint) so the first non-empty run loads everything.

Row-count anomaly detection (optional)

Add expectedRowCountDelta to a target node to receive a warning alert when today’s load is wildly outside the recent average:
"expectedRowCountDelta": {
  "minPct": -20,    // alarm when today < 80% of recent avg
  "maxPct": 200,    // alarm when today > 3× recent avg
  "windowRuns": 7
}
The runtime reads the last windowRuns successful runs (default 7), computes the running mean of rowsProcessed, and emits a warning alert into the System Health feed if today’s load is outside the band. The run still completes — anomaly detection never fails the run on its own. Skipped when there are fewer than 3 prior successful runs.

Hooks

The IR preHook and postHook slots accept four hook kinds. The pre-hook runs before the load; the post-hook runs only on successful load.
type Hook =
  | { type: "sql", statement: string }            // admin SQL only — no DML
  | { type: "refresh_mat_view", view: string }    // REFRESH MATERIALIZED VIEW
  | { type: "webhook", url: string }              // https-only POST via SSRF guard
  | { type: "custom_function", function: string } // Phase 5 stub

SQL hook safety

Hook SQL is admin-grade. The runtime forbids any statement starting with a data-modifying verb (drop, delete, update, truncate, insert, merge, copy, grant, revoke, alter). Allowed verbs include analyze, refresh, cluster, vacuum, set, explain, etc. Embedded ; and SQL comments are also rejected.

Webhook safety

Webhook URLs flow through lib/security/url-validator.validateAndResolve — the same SSRF guard the rest of the platform uses. Private IP ranges and DNS rebinding attempts are blocked before the fetch fires.

Dataset validators

"datasetLevel": {
  "rowCount":     { "min": 100, "max": 1000000, "onFail": "abort" },
  "freshness":    { "withinHours": 24, "column": "updated_at", "onFail": "warn" },
  "fkIntegrity":  { "column": "customer_id", "refTable": "starbucks.customers", "refColumn": "id", "onFail": "abort" },
  "cardinality":  { "column": "country", "minDistinct": 5, "onFail": "warn" },
  "duplicateKey": { "columns": ["customer_id", "order_date"], "onFail": "abort" }
}
Each validator runs a parameterized SQL probe and returns a structured { ok, message?, observed? } result. The validate executor aggregates them and applies each check’s onFail:
onFailBehavior
abortThrows; the run fails
warnPushes a string into the warnings array; downstream still runs

Row-level rules

"rowLevel": [
  { "type": "notNull",   "field": "id",     "onFail": "abort" },
  { "type": "regex",     "field": "email",  "pattern": "^.+@.+$", "onFail": "skip" },
  { "type": "range",     "field": "age",    "min": 0, "max": 150, "onFail": "warn" },
  { "type": "maxLength", "field": "name",   "max": 200, "onFail": "skip" },
  { "type": "fieldType", "field": "id",     "expected": "uuid", "onFail": "abort" }
]
Row-level rules evaluate in memory against rows the upstream node produced (upstream.meta.rows). Each rule’s onFail:
onFailBehavior
abortThrows on first violation
warnLogs; row continues downstream
skipDrops the row; recorded in failed[] for optional DLQ writes
fieldType.expected supports: string, integer, float, boolean, date, timestamp, json, uuid.

DLQ (quarantine)

"quarantine": { "enabled": true, "table": "dlq.orders" }
When enabled, every row that violates a skip- or warn-mode rule is written to the configured DLQ table. The table is created idempotently on first failed-row write:
CREATE TABLE IF NOT EXISTS "dlq"."orders" (
  id BIGSERIAL PRIMARY KEY,
  "pipelineId" TEXT NOT NULL,
  "runId" TEXT NOT NULL,
  "nodeId" TEXT NOT NULL,
  "ruleId" TEXT NOT NULL,
  "row" JSONB NOT NULL,
  "createdAt" TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
CREATE INDEX IF NOT EXISTS "orders_pipeline_idx" ON "dlq"."orders"("pipelineId", "runId");
The MCP inspectFlowError tool reads this table; the future “Errors” UI will render the same data row by row.

Honest residuals

  • Streaming CDC runtimecdc_mirror is gated by FLOW_STREAMING_ENABLED. Self-hosters with a real Debezium connector enable streaming and complete the wiring; playground keeps it off.
  • custom_function hook implementation — requires the plugin SDK.
  • Cross-load-mode transactions across hookstruncate, incremental_watermark, append, and upsert now run their destructive + INSERT statements inside prisma.$transaction. Pre/post hooks still run in their own sessions, so a hook failing after a successful load won’t roll back the load.
  • Backup before destructive load — opt-in backupBeforeLoad flag (lands in PR 5) creates a pre-load snapshot table for paranoid users; not yet shipped.