Skip to main content
AI Pipeline Mode lives in the right-side AI panel. Open it on any /data/pipelines/* page, describe what you want (“add a daily CSV ingestion of /tmp/orders.csv into starbucks.orders”), and the assistant produces a typed IR patch. The panel renders the patch as a colored diff with three actions: Apply, Show full IR, Reject (reason).

How the AI stays honest

Three validation layers, all enforced server-side:
  1. Constrained JSON generation — the provider’s output is bound by a Zod schema via Vercel AI SDK generateObject. Invalid shapes are rejected at the wire.
  2. Patch application — the AI emits an RFC 6902 JSON Patch, not a full IR. Our applier rejects paths that do not exist in the current IR.
  3. Validate-then-regenerate — the patched IR is fed through the Phase 1 parsePipelineIR validator. If structural checks fail, the server re-prompts the AI with the exact error and retries (up to 3 times). You only ever see IR that the runtime can execute.
Every proposal — success, failure, error, or rejected — is written to AuditLog with entityType = 'pipeline_ai_proposal': full patch, rationale, retry count, token count. Rejections capture your reason text.

Prerequisites

  • Tenant setting flowIrEnabled must be on.
  • AI provider configured in platform settings. Any provider that supports structured output works — Anthropic, OpenAI, Google, AWS Bedrock. Providers that lack constrained-JSON support will return a 400 with provider_unsupported_structured.

Rate limits

  • 100 proposals per tenant per hour. Each proposal may use up to 3 internal retries against your provider. Your provider’s per-request rate limits still apply.

Phase 1 scope

The AI is explicitly warned to stay inside the runtime’s current capabilities:
  • Sources — only local_fs has a runtime executor in Phase 1; the other 6 connector kinds (s3, gcs, azure_blob, sftp, ftp, http_pull) validate but are deferred to Phase 3.
  • Target load modesappend, truncate, and upsert are implemented; blue_green, incremental_watermark, and cdc_mirror are deferred to Phase 4 and will throw at runtime if proposed.
  • Validate node — dataset-level rowCount is enforced; other dataset checks (freshness, fkIntegrity, cardinality, duplicateKey) are recorded but not enforced yet.
If your prompt asks for something outside Phase 1 scope, the AI will respond with an empty patch and a rationale explaining the limit.

API

POST /api/v1/ai/pipeline-chat Request body:
{
  "pipelineId": "optional-uuid-or-name",
  "message": "describe the change you want"
}
Response (200, valid):
{
  "ok": true,
  "rationale": "Adds a validate node that aborts when fewer than 100 rows arrive.",
  "patch": [
    {
      "op": "add",
      "path": "/nodes/-",
      "value": {
        "id": "v",
        "kind": "validate",
        "input": "src",
        "datasetLevel": { "rowCount": { "min": 100, "onFail": "abort" } }
      }
    }
  ],
  "resultingIr": { /* full pipeline IR after the patch */ },
  "warnings": [],
  "retries": 0,
  "tokensUsed": 1842
}
Response (200, retries exhausted):
{
  "ok": false,
  "rationale": "...",
  "errors": ["...", "..."],
  "retries": 2,
  "tokensUsed": 4012
}
Other statuses:
StatusBody errorMeaning
400provider_unsupported_structuredTenant’s AI provider lacks constrained JSON
403flow_ir_disabledEnable flowIrEnabled in tenant settings
429rate_limited100/h budget exhausted; check Retry-After header
503provider_unavailableAI provider down
POST /api/v1/ai/pipeline-chat/feedback Captures rejection reasons:
{
  "pipelineId": "p1",
  "rationale": "<the rationale that was rejected>",
  "patch": [ /* the patch that was rejected */ ],
  "reason": "wrong target table"
}
  • Pipeline IR — the underlying typed document the AI edits.
  • Pipelines API — how to apply an IR via POST /api/v1/pipelines with irVersion: "1.0".