Skip to main content
Phase 4 closes the four areas the Phase 1 runtime explicitly stubbed:
  • All six target load modes are now real (with cdc_mirror reserved for the streaming runtime in Phase 6).
  • Pre/post hooks fire around the load.
  • Dataset-level validators run as parameterized SQL probes.
  • Row-level rules evaluate in memory, with optional DLQ quarantine.

Target load modes

ModeWhen to useWhat runs
appendInsert new rows alongside existing dataINSERT INTO target SELECT * FROM source
truncateReplace all rows on every runTRUNCATE then INSERT
upsertIdempotent merge by primary-like keyINSERT … ON CONFLICT (...) DO UPDATE (requires upsertKey)
blue_greenAtomic swap with zero downtimeLoad to <table>_new, then 3-step rename triple, drop <table>_old
incremental_watermarkAppend-only with high-watermark filterSELECT MAX(<watermarkColumn>) then INSERT … WHERE … > $1 (requires watermarkColumn)
cdc_mirrorStreaming change-data-capture mirrorPhase 6 stub — requires Flink/Debezium runtime; current runtime throws a clear scope-defer error

Blue-green details

Blue-green loads the new dataset into a sibling <table>_new, runs the post-hook (e.g., ANALYZE or REFRESH MATERIALIZED VIEW), then issues three sequential ALTER TABLE ... RENAME statements:
target → target_old
target_new → target
DROP target_old
If the INSERT into <table>_new fails, <table>_new is dropped and the upstream error rethrows so the existing target is untouched.

Incremental-watermark details

SELECT MAX(<watermarkColumn>) AS max FROM <target> produces the high water; INSERT INTO <target> SELECT * FROM <source> WHERE <watermarkColumn> > $1 appends only newer rows. When the target is empty, the watermark defaults to '0001-01-01' so the first run loads everything.

Hooks

The IR preHook and postHook slots accept four hook kinds. The pre-hook runs before the load; the post-hook runs only on successful load.
type Hook =
  | { type: "sql", statement: string }            // admin SQL only — no DML
  | { type: "refresh_mat_view", view: string }    // REFRESH MATERIALIZED VIEW
  | { type: "webhook", url: string }              // https-only POST via SSRF guard
  | { type: "custom_function", function: string } // Phase 5 stub

SQL hook safety

Hook SQL is admin-grade. The runtime forbids any statement starting with a data-modifying verb (DROP, DELETE, UPDATE, TRUNCATE, INSERT, MERGE, COPY, GRANT, REVOKE, ALTER). Allowed verbs include ANALYZE, REFRESH, CLUSTER, VACUUM, SET, EXPLAIN, etc. Embedded ; and SQL comments are also rejected.

Webhook safety

Webhook URLs flow through lib/security/url-validator.validateAndResolve — the same SSRF guard the rest of the platform uses. Private IP ranges and DNS rebinding attempts are blocked before the fetch fires.

Dataset validators

"datasetLevel": {
  "rowCount":     { "min": 100, "max": 1000000, "onFail": "abort" },
  "freshness":    { "withinHours": 24, "column": "updated_at", "onFail": "warn" },
  "fkIntegrity":  { "column": "customer_id", "refTable": "starbucks.customers", "refColumn": "id", "onFail": "abort" },
  "cardinality":  { "column": "country", "minDistinct": 5, "onFail": "warn" },
  "duplicateKey": { "columns": ["customer_id", "order_date"], "onFail": "abort" }
}
Each validator runs a parameterized SQL probe and returns a structured { ok, message?, observed? } result. The validate executor aggregates them and applies each check’s onFail:
onFailBehavior
abortThrows; the run fails
warnPushes a string into the warnings array; downstream still runs

Row-level rules

"rowLevel": [
  { "type": "notNull",   "field": "id",     "onFail": "abort" },
  { "type": "regex",     "field": "email",  "pattern": "^.+@.+$", "onFail": "skip" },
  { "type": "range",     "field": "age",    "min": 0, "max": 150, "onFail": "warn" },
  { "type": "maxLength", "field": "name",   "max": 200, "onFail": "skip" },
  { "type": "fieldType", "field": "id",     "expected": "uuid", "onFail": "abort" }
]
Row-level rules evaluate in memory against rows the upstream node produced (upstream.meta.rows). Each rule’s onFail:
onFailBehavior
abortThrows on first violation
warnLogs; row continues downstream
skipDrops the row; recorded in failed[] for optional DLQ writes
fieldType.expected supports: string, integer, float, boolean, date, timestamp, json, uuid.

DLQ (quarantine)

"quarantine": { "enabled": true, "table": "dlq.orders" }
When enabled, every row that violates a skip- or warn-mode rule is written to the configured DLQ table. The table is created idempotently on first failed-row write:
CREATE TABLE IF NOT EXISTS "dlq"."orders" (
  id BIGSERIAL PRIMARY KEY,
  "pipelineId" TEXT NOT NULL,
  "runId" TEXT NOT NULL,
  "nodeId" TEXT NOT NULL,
  "ruleId" TEXT NOT NULL,
  "row" JSONB NOT NULL,
  "createdAt" TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
CREATE INDEX IF NOT EXISTS "orders_pipeline_idx" ON "dlq"."orders"("pipelineId", "runId");
The MCP inspectFlowError tool reads this table; the future “Errors” UI will render the same data row by row.

Out of scope

  • Real source materialization — the existing tests stub Prisma; production runs against a live Postgres still require the source executor to actually CREATE + INSERT rows, which is a separate cross-cutting concern.
  • Streaming CDC runtimecdc_mirror keeps its scope-defer error until Phase 6.
  • custom_function hook implementation — requires the Phase 5 plugin SDK.
  • Cross-session transaction coordination — pre/load/post hooks each run in their own session today; coordinated transactions land in Phase 6.