Skip to main content

Documentation Index

Fetch the complete documentation index at: https://docs.kaireonai.com/llms.txt

Use this file to discover all available pages before exploring further.

Connector status snapshot. 85 connector types are registered in the UI today. The connectors documented on this page are production-ready except for amazon_kinesis and braze, which ship as coming-soon: they expose forms but pipeline runs that use them no-op in the executor (logged, zero rows). The remaining 26 coming-soon types are the W16 expansion entries documented on Connectors Expanded. Coming-soon connectors are visibly badged and disabled in the connector picker.

Overview

The Data module is the foundation of KaireonAI. Everything the platform decides on — offers, scores, journeys — runs on clean, structured data flowing in from your systems. There is one workflow you’ll repeat for every entity you bring in:
Connect (Connector) → Define (Schema) → Build & Run (Pipeline) → Inspect (Aggregates)
StepWhat you doPage
1. ConnectCreate a Connector with credentials + connection details (one connector serves many pipelines).Connector Reference
2. DefineCreate a Schema. The platform auto-creates the matching ds_* PostgreSQL table; adding fields runs ALTER TABLE. Pick a primary key (single or composite).Data Model
3. Build & RunCreate a Pipeline pointing at the Connector + Schema. The visual editor’s Add-Node toolbar (transform / validate / enrich / branch / archive) edits an in-memory draft; Save Changes versions it; Publish flips the live version the scheduler runs.Flow Getting StartedFlow Editor UITransforms
4. InspectInspect rows in Customer Viewer, traced events in Interaction History, and per-run metrics in Pipeline Runs.Flow Lineage
Navigate to Data in the sidebar to access Connectors, Schemas, Pipelines, Pipeline Runs, Sources, Segments, and Aggregates (Customer Viewer + Interaction History).
One connector → many pipelines. The connector holds the bucket / host / credentials only. Each pipeline source node holds its own path, file mask, and format — so you can ingest different files into different schemas with different schedules from a single set of credentials. See the connector-reuse note on the Data → Connectors page.

Connectors

Connectors define how KaireonAI reaches your external data. 85 connector types are registered across 8 categories. Each connector has its own dynamic configuration form with typed fields, multiple authentication methods, and a Test Connection button to verify connectivity before saving.

Object Storage

ConnectorStatusAuth MethodsKey Config Fields
Amazon S3Ready (runtime ingestion + Test Connection)IAM Role, Access KeyBucket, path prefix, AWS region (30 regions), custom endpoint for S3-compatible storage
Google Cloud StorageReady (runtime ingestion + Test Connection)Service Account JSON, Access Token, Application Default CredentialsBucket, path prefix, GCP project ID
Azure Blob StorageReady (runtime ingestion + Test Connection)Connection String, Account KeyStorage account, container name, blob prefix
SFTPReady (runtime ingestion + Test Connection)Username/Password, SSH KeyHost, port, remote path
HTTP PullReady (runtime ingestion via REST API connector)API Key, Bearer TokenSource URL in node.config.path; SSRF-guarded by validateAndResolve
Local FilesystemReady (runtime ingestion)NoneMount path (host-relative)
All five cloud backends and local_fs share a common object-store abstraction shipped in Phase 6.2, so a pipeline written against one backend behaves the same way against any of the others. Object storage connectors support file format selection: CSV, JSON, JSON Lines, Parquet, Avro, ORC, TSV, and XML — read end-to-end through the same format-parser layer regardless of source backend.

Streaming

ConnectorStatusAuth MethodsKey Config Fields
Apache KafkaReady (batch polling)Username/Password (SASL), API KeyBootstrap servers, topic, consumer group, security protocol, schema registry URL, maxMessages (default 1000), wait timeout (default 15s)
Confluent CloudReady (batch polling)API KeyBootstrap server, topic, schema registry URL + credentials
Amazon KinesisComing soon (connection test works)IAM Role, Access KeyStream name, AWS region, start position (LATEST or TRIM_HORIZON)
Kafka and Confluent are batch-polling consumers, not true streaming. Each pipeline run opens a Kafka consumer, polls up to maxMessages records with the configured wait timeout, commits offsets, and closes. The consumer does not stay alive between runs. If you need long-lived, always-on streaming ingestion, that requires a persistent worker that is not yet implemented — schedule Kafka pipelines on a cron at whatever cadence matches your freshness target in the meantime.

Data Warehouses

ConnectorStatusAuth MethodsKey Config Fields
SnowflakeReadyUsername/Password, OAuth/Key-Pair TokenAccount identifier, warehouse, database, schema, role, Source Table, Row Limit
Google BigQueryReadyService Account JSONGCP project ID, dataset, location, Source Table, Row Limit
Amazon RedshiftReadyUsername/Password, IAM RoleCluster endpoint, port, database, schema
DatabricksReadyToken, OAuth2Workspace URL, HTTP path, Unity Catalog, schema
Snowflake and BigQuery row limits. Both connectors require a sourceTable and accept an optional rowLimit (default 100,000 rows — demo-safe). Set rowLimit to 0 to remove the cap entirely; only do this once you have sized the target database and pipeline run budget for a full-table read. The executor reads from the configured source table and caps the row count at rowLimit (or returns every row when rowLimit is 0).

Databases

ConnectorAuth MethodsKey Config Fields
PostgreSQLUsername/Password, Connection StringHost, port, database, schema, SSL mode (prefer/require/verify-ca/verify-full/disable)
MySQLUsername/Password, Connection StringHost, port, database, SSL mode
MongoDBConnection String, Username/PasswordDatabase, default collection (Atlas or self-hosted)

CRM

ConnectorAuth MethodsKey Config Fields
SalesforceOAuth2, Username/Password + Security TokenInstance URL, API version, objects to sync (Contact, Account, Opportunity, etc.)
HubSpotPrivate App Token, OAuth2Portal ID, objects to sync (contacts, companies, deals)

Customer Data Platforms

ConnectorStatusAuth MethodsKey Config Fields
SegmentReadyAPI KeyWorkspace slug, source ID

Messaging

ConnectorStatusAuth MethodsKey Config Fields
BrazeComing soonAPI KeyREST API endpoint (US-01, US-02, US-03, EU-01)

APIs and Direct Upload

ConnectorStatusAuth MethodsKey Config Fields
REST APIReadyAPI Key, OAuth2, Token, Username/PasswordBase URL, HTTP method (GET/POST), pagination type (none/offset/cursor/link), custom headers
WebhookReadyAPI Key, Bearer TokenWebhook path, max batch size
CSV File UploadReadyNoneDelimiter (comma/tab/semicolon), header row detection
ShopifyReadyAdmin API Access TokenShop domain
StripeReadySecret Key (sk_live_ or sk_test_)No additional config required
MailchimpReadyAPI KeyServer prefix, list/audience ID
Coming-soon connectors (amazon_kinesis, braze) are visibly badged and disabled in the connector picker. You can still view the form definitions in the registry, but creating a pipeline against them will no-op at run time until ingestion is implemented. Connection Test works today for Amazon Kinesis.

Security

Connector credentials are encrypted at rest using the platform encryption layer. The authConfig field is never returned in API responses — the GET endpoint explicitly excludes it from the select clause. Only the connection metadata (name, type, status, last tested timestamp) is exposed.

Schemas

Schemas define your entity structures. Unlike metadata-only schema systems, KaireonAI schemas are backed by real PostgreSQL tables. Creating a schema executes a create table statement. Adding a field runs alter table add column. Deleting a schema drops the table along with its dependent objects.

Entity Types

Each schema is assigned an entity type that describes what it models:
Entity TypeTypical Use
customerCustomer profiles, demographics, preferences
accountAccount records, membership details
transactionPurchase history, payment records
productProduct catalog, inventory
eventBehavioral events, clickstream, interactions
interactionCustomer-agent interactions, support tickets
segmentPre-computed audience segments
campaignCampaign metadata and performance
subscriptionSubscription plans and status
customAny other entity type

Field Types and PostgreSQL Mapping

Every field you define maps to a concrete PostgreSQL column type:
Abstract TypePostgreSQL TypeNotes
varcharvarchar (length-bounded)Default length 255 if not specified
texttextUnlimited length strings
integerinteger32-bit signed integer
bigintbigint64-bit signed integer
smallintsmallint16-bit signed integer
numeric / decimalnumeric (precision, scale)Arbitrary precision; specify precision and scale
float / realreal32-bit floating point
doubledouble precision64-bit floating point
booleanbooleantrue/false
datedateCalendar date
timestamptimestampDate and time without timezone
timestamptztimestamptzDate and time with timezone
jsonjsonbBinary JSON for nested/dynamic data
uuiduuidUniversally unique identifier

DDL Behavior

When you create a schema through the API or UI, the following happens:
  1. Metadata record is created in the platform’s data-schema registry with field definitions.
  2. A safe create table if not exists statement is executed against PostgreSQL with an auto-generated table name prefixed with ds_ (e.g., schema “customers” becomes table ds_customers).
  3. Every table automatically gets created_at (timestamptz) and updated_at (timestamptz) columns.
  4. Primary key handling: when no field is marked isPrimaryKey, an auto-generated id BIGSERIAL PRIMARY KEY column is added. When any field is marked isPrimaryKey: true (e.g. via the schema-create form’s “Custom primary key column” pane, or by passing isPrimaryKey: true in the API fields[] payload), the auto-id column is skipped and your column becomes the table’s PK.
  5. Your defined fields are added as additional columns with their mapped PostgreSQL types, nullability, uniqueness constraints, and default values.
If the DDL fails, the metadata record is rolled back to prevent orphaned metadata without a backing table.
Schema operations execute real DDL statements against your database. Creating a schema creates a table, adding a field alters the table, and deleting a schema drops the table with CASCADE. These operations are not reversible through the UI.

Field Constraints

Each field supports the following constraints:
ConstraintDDL Effect
isPrimaryKeyPrimary-key constraint on the column
isUniqueUniqueness constraint (skipped if the column is already the primary key)
isNullable: falseNot-null constraint
defaultValueDefault-value clause (validated against safe literal patterns)
Default values are validated to prevent SQL injection — only literals ('text', 123, null, true, false), the current-timestamp function, and current_timestamp are permitted.

CSV Column Inference

When uploading CSV files, the platform can automatically infer column types from sample data:
Detected PatternInferred Type
UUID formatuuid
true/false/yes/no/0/1boolean
Integer valuesinteger (or bigint if > 2,147,483,647)
Decimal valuesnumeric with auto-sized precision and scale
ISO date stringsdate or timestamptz (if time component present)
JSON objects/arraysjson
Short strings (≤255 chars)varchar with auto-sized length
Long stringstext

Schema Types & Relationships

Schemas are classified into three types that determine how they participate in decisioning:
Schema TypePurposeKey Behavior
CustomerCore customer profile data (demographics, preferences, scores)Primary entity for enrichment lookups. One row per customer.
CollectionMulti-row data linked to a customer (accounts, transactions, products held)Linked to a Customer schema with an explicit join mapping. Multiple rows per customer.
PropositionOffer/product catalog data used for matching and scoringReferenced during inventory and scoring stages.
Collection-to-Customer linking: When creating a Collection schema, you specify which Customer schema it belongs to and the join key mapping (e.g., collection.customer_id = customer.customer_id). This relationship enables the Enrich node to join collection data at decision time. Summary columns: Collection schemas support pre-materialized aggregation columns that roll up collection data to the customer level. Supported aggregation functions:
FunctionDescriptionExample
countNumber of rowsTotal accounts
sumSum of a numeric columnTotal balance across accounts
maxMaximum valueHighest credit limit
minMinimum valueOldest account open date
avgAverage valueAverage transaction amount
bool_orTrue if any row is trueHas any premium account
Summary columns are materialized by the Summarize pipeline transform at pipeline execution time (not decision time), keeping decision-time latency low. The Enrich node can then read from the summary table instead of querying the full collection. Auto-enrich toggle: Schemas with auto-enrich enabled are automatically included in the Enrich stage of Decision Flows without manual configuration. This is useful for core customer profile schemas that should always be available during decisioning.
If a Collection schema has summary columns defined but no pipeline includes a Summarize transform targeting it, the summary table will be empty. Add a Summarize node to your pipeline to materialize the aggregated data.

Schema References

Schemas are referenced throughout the platform:
  • Enrichment stages in Decision Flows load customer data from schema tables at decision time
  • Computed values reference schema data via the customer.* namespace in formulas
  • Pipelines use schemas as target destinations for ETL workflows
  • Segments define customer cohorts using schema field conditions with SQL-like filters

Pipelines

Pipelines are visual ETL workflows built with a drag-and-drop flow editor powered by React Flow. Each pipeline connects a source connector to a target schema through a chain of transform nodes.

Pipeline Structure

A pipeline consists of:
  • Connector — The source data connection (any of the 85 connector types)
  • Schema — The target destination table
  • Nodes — Visual nodes in the flow editor (source, transform, target) with position and config
  • Edges — Connections between nodes defining data flow direction

Transform Types

KaireonAI provides 15 built-in transform types:
Rename columns to standardize naming conventions across data sources. Configure the source field name and the desired target field name.Config: sourceField, targetField
Convert a field to a different data type. Supported target types: string, integer, bigint, float, numeric, boolean, date, timestamp, json, uuid.Config: field, targetType
Compute a new field using SQL-like expressions. Includes a function picker with 50+ built-in functions across 5 categories:
CategoryFunctions
Stringupper, lower, trim, substring, replace, concat, split_part, left, right, lpad, length, regexp_replace
Numericabs, round, ceil, floor, mod, greatest, least, coalesce, nullif
Date/Timedate_trunc, extract, age, to_char, to_date, to_timestamp, date + interval, date - interval, now, current_date
Type CastCast expressions to integer, bigint, numeric, text, boolean, date, timestamp, or uuid
Conditionalcase when, coalesce, nullif, if null
Config: outputField, expression
Keep only rows matching specified conditions. Supports a visual condition builder with operators:
OperatorApplies To
=, !=string, integer, numeric, boolean
>, <, >=, <=integer, numeric, date, timestamp
pattern match (like), starts with, ends withstring
in, not instring, integer
is null, is not nullall types
betweeninteger, numeric, date, timestamp
Config: field, operator, value
Remove unwanted columns from the data flow. Select one or more fields to exclude from downstream processing.Config: fields (array)
Add a new column with a name, data type, and default value or computed expression.Config: fieldName, fieldType, defaultValue
Replace field values using a JSON lookup table. Useful for code-to-label mapping.Example: {"M": "Male", "F": "Female", "O": "Other"}Config: field, mappings (JSON object), defaultValue (for unmatched values)
Split a single field into multiple output fields by a separator character. For example, split a full name into first and last name fields.Config: sourceField, separator, outputFields (array)
Concatenate multiple columns into a single field with an optional separator string.Config: sourceFields (array), separator, outputField
Remove duplicate rows based on one or more key columns. Keeps the first occurrence when duplicates are found.Config: keyFields (array)
Group by one or more columns and apply aggregate functions: sum, count, avg, min, max.Config: groupByFields (array), aggregations (array of {field, function, alias})
LEFT JOIN with another schema table to enrich data. Specify the lookup schema, join key, and which fields to pull from the lookup table.Config: lookupSchema, joinField, lookupField, selectFields (array)
Apply cryptographic hashing to field values. Supports SHA-256 and MD5 algorithms. Used for anonymization or generating deduplication keys.Config: field, algorithm (sha256 or md5), outputField
Detect and mask personally identifiable information. Supports partial masking patterns:
  • SSN: ***-**-1234
  • Email: j***@example.com
  • Phone: ***-***-5678
  • Credit card: ****-****-****-1234
Config: field, maskType, preserveLength
Aggregates collection data into a customer-level summary table. Runs at pipeline time, not decision time, so it does not add latency to the Recommend API. Use this transform when a Collection schema has summary columns defined.The Summarize node reads from the collection table, groups by the customer join key, applies the configured aggregation functions (COUNT, SUM, MAX, MIN, AVG, BOOL_OR), and writes the results to the summary table. The Enrich node can then read from the summary table at decision time.Config: collectionSchemaId, customerSchemaId, joinKey, aggregations (array of { sourceField, function, outputField })

Keyboard Shortcuts

ShortcutAction
Ctrl/Cmd + ZUndo last action
Ctrl/Cmd + Shift + ZRedo last undone action
Undo/redo buttons are also available in the editor toolbar.

Execution Config

Pipelines support the following execution modes with configurable resource allocation:
ModeStatusDescriptionUse Case
BatchReadyProcess all records in configurable batch sizesFull loads, daily syncs
Micro-BatchReadySmall frequent batches scheduled via cronNear real-time with controlled throughput
StreamingSelf-hostContinuous record-by-record processing via long-lived consumerReal-time event streams — Kafka, Kinesis, Pulsar
Streaming mode spawns a long-lived consumer per pipeline and is gated behind the FLOW_STREAMING_ENABLED=true environment variable on the worker service. The gate exists because a streaming consumer requires a persistent worker container (separate from the request-driven API) that is part of the self-hosted deployment topology, not the hosted playground. Selecting Streaming on the playground returns a clear error pointing to streaming-runtime. For near-real-time ingestion on the hosted playground, use Batch or Micro-Batch with a short cron cadence against a batch-polling connector (e.g., Kafka).

Configuration Options

SettingDefaultDescription
Batch Size10,000Records per batch (batch/micro-batch modes)
Parallelism1Concurrent workers (1 to 16)
PartitioningNonePartition key for distributed processing
Error HandlingFailskip (continue on error), fail (stop pipeline), dlq (route to dead-letter queue)
SchedulingManualCron expression for automatic runs

Loading Strategies

When a pipeline writes to a target schema, you choose how incoming data merges with existing rows. The loading strategy is configured per pipeline and applies at execution time.
StrategyBehaviorUse Case
AppendInsert new rows. Existing rows are untouched. This is the default.Incremental event streams, transaction logs
Truncate & LoadTruncate the target table, then insert all rows. Brief downtime while the table is empty.Full refresh of small-to-medium lookup tables
UpsertInsert with on-conflict-do-update using a configurable key column. Rows with matching keys are updated; new keys are inserted.Incremental updates where source rows may change (e.g., customer profiles)
Blue-Green SwapLoads all rows into a staging table (_staging suffix), then performs an atomic table rename to swap staging into production. The production table is untouched until the swap succeeds. If loading fails, the production table is never affected.Zero-downtime full refresh for production-critical tables. Recommended for high-availability deployments.
Blue-Green Swap is the safest strategy for production data. The atomic rename means readers see either the old table or the new table — never a partially loaded state.

Row Validation

Every pipeline run validates incoming rows before writing to the target table. Validation catches type mismatches, null violations, and length overflows before they hit the database.
ModeBehavior
StrictFail the entire pipeline on the first validation error. No rows are written. Use when data quality is critical.
SkipLog the error and skip the invalid row. Valid rows are still written. Use when partial loads are acceptable.
CoerceAttempt to fix the value (e.g., cast "123" to integer, trim overlength strings). If coercion fails, the row is skipped.
Validation checks include:
  • Type checking — Does the value match the target column’s data type?
  • Null validation — Is a NOT NULL column receiving a null value?
  • Length limits — Does a VARCHAR(n) value exceed its maximum length?
Validation errors are surfaced in the UI with row number, column name, and error description, so you can trace exactly which source rows failed and why.

Progress Tracking

Pipeline executions display real-time progress in the UI:
  • Progress bar with percentage complete based on rows processed vs. estimated total
  • Row counters showing loaded, failed, and skipped counts updated in real time
  • Validation error accordion that expands to show individual row-level errors with field and reason
  • Run history table showing each execution’s loading strategy, duration, row counts, and error summary

Run History

Each pipeline tracks its execution history with status (success, failed, partial, running), timestamps, and error details. The pipeline list view shows the last run status and timing at a glance.

Supported File Formats

When reading from file-based sources (local_fs, s3, gcs, azure_blob, sftp, http_pull — all share the same format-parser layer via the common object-store abstraction shipped in Phase 6.2):
FormatExtensionNotes
CSV.csvRFC-4180 quoting/escaping via papaparse; header row → object keys; default delimiter ,
JSON.jsonArray → one row per element; bare object → single row
JSON Lines.jsonlOne JSON object per line; parse error reports line number
Parquet.parquethyparquet reader (pure JS); up to ~50 MB per file
Avro.avroavsc reader; schema-embedded container files only
ORC.orcSelf-host via the @kaireonai/orc-native add-on (not yet published). Workaround: convert ORC → parquet upstream.
TSV.tsvpapaparse with delimiter: "\t"; same quoting/escaping as CSV
XML.xmlfast-xml-parser; configurable recordPath (default: every direct child of root); attribute keys prefixed with @_

Field Reference

Connector Fields

FieldTypeDescription
idstringAuto-generated unique identifier
namestringDisplay name for the connector
typeenumOne of 85 connector types
categoryenumobject_storage, streaming, warehouse, database, crm, cdp, messaging, api
authMethodenumiam_role, access_key, service_account_json, connection_string, oauth2, api_key, username_password, ssh_key, token, none
authConfigobjectEncrypted credentials (never returned in API responses)
configobjectConnection-specific settings (bucket, host, topic, etc.)
statusenumconnected, disconnected, error, testing
lastTestedAtdatetimeTimestamp of most recent connection test
lastErrorstringError message from last failed test

Schema Fields

FieldTypeDescription
idstringAuto-generated unique identifier
namestringMachine-safe name (lowercase, underscored) used for table naming
displayNamestringHuman-readable name shown in the UI
descriptionstringOptional description
entityTypeenumcustomer, account, transaction, product, event, interaction, segment, campaign, subscription, custom
tableNamestringAuto-generated PostgreSQL table name (prefixed with ds_)
fieldsarrayOrdered list of field definitions (name, dataType, constraints)
statusenumactive, draft, archived

Pipeline Fields

FieldTypeDescription
idstringAuto-generated unique identifier
namestringDisplay name for the pipeline
descriptionstringOptional description
connectorIdstringReference to the source connector
schemaIdstringReference to the target schema
nodesarrayFlow editor nodes with type, label, config, and position
edgesarrayConnections between nodes (sourceNodeId, targetNodeId)
schedulestringCron expression for scheduled execution (null for manual)
executionConfigobjectBatch size, parallelism, partitioning, error handling
statusenumdraft, active, paused, archived
lastRunAtdatetimeTimestamp of most recent execution
lastRunStatusenumsuccess, failed, partial, running

Worked Example

This example walks through creating a customer data pipeline end-to-end: define a schema, connect to S3, build a pipeline with transforms, and execute.

Step 1: Create a Customer Schema

curl -X POST /api/v1/schemas \
  -H "Content-Type: application/json" \
  -d '{
    "name": "customers",
    "displayName": "Customer Profiles",
    "description": "Core customer entity for decisioning",
    "entityType": "customer",
    "fields": [
      { "name": "name", "dataType": "varchar", "length": 100, "isNullable": false },
      { "name": "email", "dataType": "varchar", "length": 255, "isNullable": false, "isUnique": true },
      { "name": "credit_score", "dataType": "integer", "isNullable": true },
      { "name": "balance", "dataType": "numeric", "precision": 12, "scale": 2, "isNullable": true }
    ]
  }'
This creates a PostgreSQL table named ds_customers with the following DDL:
CREATE TABLE IF NOT EXISTS "ds_customers" (
  "id" BIGSERIAL PRIMARY KEY,
  "created_at" TIMESTAMPTZ NOT NULL DEFAULT NOW(),
  "updated_at" TIMESTAMPTZ NOT NULL DEFAULT NOW(),
  "name" VARCHAR(100) NOT NULL,
  "email" VARCHAR(255) NOT NULL UNIQUE,
  "credit_score" INTEGER,
  "balance" NUMERIC(12,2)
);

Step 2: Create an S3 Connector

curl -X POST /api/v1/connectors \
  -H "Content-Type: application/json" \
  -d '{
    "name": "Customer Data Lake",
    "type": "aws_s3",
    "config": {
      "bucket": "my-data-lake",
      "prefix": "customers/daily/",
      "region": "us-east-1"
    },
    "authMethod": "iam_role",
    "authConfig": {
      "roleArn": "arn:aws:iam::123456789012:role/KaireonDataAccess"
    }
  }'

Step 3: Build a Pipeline with Transforms

Create a pipeline that reads customer CSVs from S3, filters out zero-balance records, renames a field, and masks PII before loading into the schema:
curl -X POST /api/v1/pipelines \
  -H "Content-Type: application/json" \
  -d '{
    "name": "Customer Daily Load",
    "description": "Ingest customer data with PII masking",
    "connectorId": "<connector-id>",
    "schemaId": "<schema-id>",
    "executionConfig": {
      "mode": "batch",
      "batchSize": 5000,
      "parallelism": 4,
      "errorHandling": "dlq"
    },
    "nodes": [
      {
        "id": "source-1",
        "nodeType": "source",
        "label": "S3 CSV Source",
        "config": { "fileFormat": "csv", "hasHeader": true },
        "positionX": 0, "positionY": 100
      },
      {
        "id": "filter-1",
        "nodeType": "filter_rows",
        "label": "Filter Active Customers",
        "config": { "field": "balance", "operator": ">", "value": "0" },
        "positionX": 300, "positionY": 100
      },
      {
        "id": "rename-1",
        "nodeType": "rename_field",
        "label": "Standardize Email Field",
        "config": { "sourceField": "email_address", "targetField": "email" },
        "positionX": 600, "positionY": 100
      },
      {
        "id": "mask-1",
        "nodeType": "mask_pii",
        "label": "Mask SSN",
        "config": { "field": "ssn", "maskType": "ssn" },
        "positionX": 900, "positionY": 100
      },
      {
        "id": "target-1",
        "nodeType": "target",
        "label": "Customer Schema",
        "config": {},
        "positionX": 1200, "positionY": 100
      }
    ],
    "edges": [
      { "source": "source-1", "target": "filter-1" },
      { "source": "filter-1", "target": "rename-1" },
      { "source": "rename-1", "target": "mask-1" },
      { "source": "mask-1", "target": "target-1" }
    ]
  }'
This pipeline:
  1. Reads CSV files from the customers/daily/ prefix in S3
  2. Filters out rows where balance <= 0
  3. Renames email_address to email to match the schema field name
  4. Masks the ssn field to ***-**-1234 format
  5. Loads the transformed data into the ds_customers table

API Quick Reference

Connectors

OperationMethodEndpoint
List connectorsGET/api/v1/connectors
Create connectorPOST/api/v1/connectors
Update connectorPUT/api/v1/connectors
Delete connectorDELETE/api/v1/connectors?id={id}

Schemas

OperationMethodEndpoint
List schemasGET/api/v1/schemas
Create schema (also creates the backing table)POST/api/v1/schemas
Delete schema (also drops the backing table)DELETE/api/v1/schemas?id={id}

Pipelines

OperationMethodEndpoint
List pipelinesGET/api/v1/pipelines
Create pipelinePOST/api/v1/pipelines
Update pipelinePUT/api/v1/pipelines
Delete pipelineDELETE/api/v1/pipelines?id={id}
All endpoints require authentication and tenant context. Responses use cursor-based pagination with limit and cursor parameters.
For complete API request/response schemas, see the Connectors API, Schemas API, and Pipelines API.

Decision Flows

Use Enrich stages to load schema data at decision time for real-time personalization.

Computed Values

Write formulas that reference customer.* fields from your schema tables.

Core Concepts

Understand how data connects to decisioning and delivery across the platform.