Documentation Index
Fetch the complete documentation index at: https://docs.kaireonai.com/llms.txt
Use this file to discover all available pages before exploring further.
Phase 4 closes the four areas the Phase 1 runtime explicitly stubbed:
- All six target load modes are now real (with
cdc_mirror reserved for
the streaming runtime in Phase 6).
- Pre/post hooks fire around the load.
- Dataset-level validators run as parameterized SQL probes.
- Row-level rules evaluate in memory, with optional DLQ quarantine.
Target load modes
| Mode | When to use | What runs |
|---|
append | Insert new rows alongside existing data | Explicit-column projection from the staging rows into the target table (column-aware projection that nulls out empty strings) |
truncate | Replace all rows on every run | Truncates the target table and reloads from staging — wrapped in a single Postgres transaction so a failed reload rolls back the truncate. Aborts before any destructive statement runs when upstream produced 0 rows (see Empty-source guard below). |
upsert | Idempotent merge by primary-like key | INSERT … ON CONFLICT (key) DO UPDATE SET nonkey = EXCLUDED.nonkey (requires upsertKey; throws if every projected column is in upsertKey since there’s nothing to update) |
blue_green | Atomic swap with zero downtime | Load to <table>_new, then 3-step rename triple, drop <table>_old. Aborts before the load when upstream produced 0 rows. |
incremental_watermark | Append-only with high-watermark filter | Reads the persisted high-water from pipeline_watermarks, INSERT-and-watermark-upsert wrapped in a single transaction so a failed load rolls back the watermark advance (requires watermarkColumn) |
cdc_mirror | Streaming change-data-capture mirror | Gated behind FLOW_STREAMING_ENABLED=true. When the env is off, throws “streaming disabled”. When on but no Debezium connector is configured, throws “Debezium connector not configured” with a docs link. |
Empty-source guard
When loadMode is truncate or blue_green and the upstream produced
0 rows, the runtime aborts the load before issuing the destructive
statement. The run fails with:
target <schema>.<table>: source is empty (0 rows from <input>); aborting <mode> to prevent data loss.
Set failOnEmptySource:false to override.
This default closes a real footgun: if your scheduled file lands empty
or the source connector has a glitch, your live table doesn’t get wiped
to zero rows. Existing pipelines that genuinely want “empty file means
clear the table” set failOnEmptySource: false on the target node.
A side-effect: when an onMissAction: alert source comes up empty,
the source executor also emits a warning alert into the
System Health feed so operators see it in
the topbar widget without having to open the runs page.
Mode-switch validation
Picking a load mode in the UI surfaces inline errors before save when:
upsert is chosen without an upsertKey, or with keys not in the
destination schema, or with keys covering every column.
incremental_watermark is chosen without a watermarkColumn, or
the column doesn’t exist in the destination schema.
cdc_mirror is chosen without a cdcSource.
truncate is chosen on a “full-refresh-shaped” pipeline (exactly
one source + one target). Soft warning recommends blue_green; you
can keep truncate if intentional.
The same checks run server-side in parsePipelineIR so the JSON IR
tab can’t bypass them.
Blue-green details
Blue-green loads the new dataset into a sibling <table>_new, runs the
post-hook (e.g., a stats-collection command or a materialized-view
refresh), then issues three sequential alter table … rename
statements:
target → target_old
target_new → target
DROP target_old
If the INSERT into <table>_new fails, <table>_new is dropped and the
upstream error rethrows so the existing target is untouched.
Incremental-watermark details
The high-water mark lives in the pipeline_watermarks table keyed by
(tenantId, pipelineId, targetSchema, targetTable). Each run:
- Reads the persisted
watermarkValue (falls back to
SELECT MAX(<watermarkColumn>) FROM <target> on the first run after
an upgrade so we don’t replay history).
- Computes the new MAX from the staging table.
- Wraps the
INSERT … WHERE col > $1 and the watermark upsert in a
single Postgres transaction so a failed INSERT rolls back the
watermark advance.
When both the target and source are empty, the watermark defaults to
0001-01-01 (timestamp) or 0 (integer/bigint) so the first
non-empty run loads everything.
Row-count anomaly detection (optional)
Add expectedRowCountDelta to a target node to receive a warning
alert when today’s load is wildly outside the recent average:
"expectedRowCountDelta": {
"minPct": -20, // alarm when today < 80% of recent avg
"maxPct": 200, // alarm when today > 3× recent avg
"windowRuns": 7
}
The runtime reads the last windowRuns successful runs (default 7),
computes the running mean of rowsProcessed, and emits a warning
alert into the System Health feed if today’s load is outside the band.
The run still completes — anomaly detection never fails the run on its
own. Skipped when there are fewer than 3 prior successful runs.
Hooks
The IR preHook and postHook slots accept four hook kinds. The
pre-hook runs before the load; the post-hook runs only on
successful load.
type Hook =
| { type: "sql", statement: string } // admin SQL only — no DML
| { type: "refresh_mat_view", view: string } // REFRESH MATERIALIZED VIEW
| { type: "webhook", url: string } // https-only POST via SSRF guard
| { type: "custom_function", function: string } // Phase 5 stub
SQL hook safety
Hook SQL is admin-grade. The runtime forbids any statement starting with
a data-modifying verb (drop, delete, update, truncate, insert,
merge, copy, grant, revoke, alter). Allowed verbs include
analyze, refresh, cluster, vacuum, set, explain, etc.
Embedded ; and SQL comments are also rejected.
Webhook safety
Webhook URLs flow through lib/security/url-validator.validateAndResolve
— the same SSRF guard the rest of the platform uses. Private IP ranges
and DNS rebinding attempts are blocked before the fetch fires.
Dataset validators
"datasetLevel": {
"rowCount": { "min": 100, "max": 1000000, "onFail": "abort" },
"freshness": { "withinHours": 24, "column": "updated_at", "onFail": "warn" },
"fkIntegrity": { "column": "customer_id", "refTable": "starbucks.customers", "refColumn": "id", "onFail": "abort" },
"cardinality": { "column": "country", "minDistinct": 5, "onFail": "warn" },
"duplicateKey": { "columns": ["customer_id", "order_date"], "onFail": "abort" }
}
Each validator runs a parameterized SQL probe and returns a structured
{ ok, message?, observed? } result. The validate executor aggregates
them and applies each check’s onFail:
onFail | Behavior |
|---|
abort | Throws; the run fails |
warn | Pushes a string into the warnings array; downstream still runs |
Row-level rules
"rowLevel": [
{ "type": "notNull", "field": "id", "onFail": "abort" },
{ "type": "regex", "field": "email", "pattern": "^.+@.+$", "onFail": "skip" },
{ "type": "range", "field": "age", "min": 0, "max": 150, "onFail": "warn" },
{ "type": "maxLength", "field": "name", "max": 200, "onFail": "skip" },
{ "type": "fieldType", "field": "id", "expected": "uuid", "onFail": "abort" }
]
Row-level rules evaluate in memory against rows the upstream node
produced (upstream.meta.rows). Each rule’s onFail:
onFail | Behavior |
|---|
abort | Throws on first violation |
warn | Logs; row continues downstream |
skip | Drops the row; recorded in failed[] for optional DLQ writes |
fieldType.expected supports: string, integer, float, boolean,
date, timestamp, json, uuid.
DLQ (quarantine)
"quarantine": { "enabled": true, "table": "dlq.orders" }
When enabled, every row that violates a skip- or warn-mode rule is
written to the configured DLQ table. The table is created idempotently
on first failed-row write:
CREATE TABLE IF NOT EXISTS "dlq"."orders" (
id BIGSERIAL PRIMARY KEY,
"pipelineId" TEXT NOT NULL,
"runId" TEXT NOT NULL,
"nodeId" TEXT NOT NULL,
"ruleId" TEXT NOT NULL,
"row" JSONB NOT NULL,
"createdAt" TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
CREATE INDEX IF NOT EXISTS "orders_pipeline_idx" ON "dlq"."orders"("pipelineId", "runId");
The MCP inspectFlowError tool reads this table; the future “Errors”
UI will render the same data row by row.
Honest residuals
- Streaming CDC runtime —
cdc_mirror is gated by
FLOW_STREAMING_ENABLED. Self-hosters with a real Debezium connector
enable streaming and complete the wiring; playground keeps it off.
custom_function hook implementation — requires the plugin SDK.
- Cross-load-mode transactions across hooks —
truncate,
incremental_watermark, append, and upsert now run their
destructive + INSERT statements inside prisma.$transaction. Pre/post
hooks still run in their own sessions, so a hook failing after a
successful load won’t roll back the load.
- Backup before destructive load — opt-in
backupBeforeLoad flag (lands in PR 5) creates a pre-load snapshot
table for paranoid users; not yet shipped.