Documentation Index
Fetch the complete documentation index at: https://docs.kaireonai.com/llms.txt
Use this file to discover all available pages before exploring further.
Connector status snapshot. 85 connector types are registered in
the UI today. The connectors documented on this page are
production-ready except for
amazon_kinesis and braze, which ship
as coming-soon: they expose forms but pipeline runs that use
them no-op in the executor (logged, zero rows). The remaining 26
coming-soon types are the W16 expansion entries documented on
Connectors Expanded. Coming-soon
connectors are visibly badged and disabled in the connector picker.Overview
The Data module is the foundation of KaireonAI. Everything the platform decides on — offers, scores, journeys — runs on clean, structured data flowing in from your systems. There is one workflow you’ll repeat for every entity you bring in:| Step | What you do | Page |
|---|---|---|
| 1. Connect | Create a Connector with credentials + connection details (one connector serves many pipelines). | Connector Reference |
| 2. Define | Create a Schema. The platform auto-creates the matching ds_* PostgreSQL table; adding fields runs ALTER TABLE. Pick a primary key (single or composite). | Data Model |
| 3. Build & Run | Create a Pipeline pointing at the Connector + Schema. The visual editor’s Add-Node toolbar (transform / validate / enrich / branch / archive) edits an in-memory draft; Save Changes versions it; Publish flips the live version the scheduler runs. | Flow Getting Started → Flow Editor UI → Transforms |
| 4. Inspect | Inspect rows in Customer Viewer, traced events in Interaction History, and per-run metrics in Pipeline Runs. | Flow Lineage |
Connectors
Connectors define how KaireonAI reaches your external data. 85 connector types are registered across 8 categories. Each connector has its own dynamic configuration form with typed fields, multiple authentication methods, and a Test Connection button to verify connectivity before saving.Object Storage
| Connector | Status | Auth Methods | Key Config Fields |
|---|---|---|---|
| Amazon S3 | Ready (runtime ingestion + Test Connection) | IAM Role, Access Key | Bucket, path prefix, AWS region (30 regions), custom endpoint for S3-compatible storage |
| Google Cloud Storage | Ready (runtime ingestion + Test Connection) | Service Account JSON, Access Token, Application Default Credentials | Bucket, path prefix, GCP project ID |
| Azure Blob Storage | Ready (runtime ingestion + Test Connection) | Connection String, Account Key | Storage account, container name, blob prefix |
| SFTP | Ready (runtime ingestion + Test Connection) | Username/Password, SSH Key | Host, port, remote path |
| HTTP Pull | Ready (runtime ingestion via REST API connector) | API Key, Bearer Token | Source URL in node.config.path; SSRF-guarded by validateAndResolve |
| Local Filesystem | Ready (runtime ingestion) | None | Mount path (host-relative) |
local_fs share a common object-store
abstraction shipped in Phase 6.2, so a pipeline written against one
backend behaves the same way against any of the others. Object storage
connectors support file format selection: CSV, JSON, JSON Lines,
Parquet, Avro, ORC, TSV, and XML — read end-to-end through the same
format-parser layer regardless of source backend.
Streaming
| Connector | Status | Auth Methods | Key Config Fields |
|---|---|---|---|
| Apache Kafka | Ready (batch polling) | Username/Password (SASL), API Key | Bootstrap servers, topic, consumer group, security protocol, schema registry URL, maxMessages (default 1000), wait timeout (default 15s) |
| Confluent Cloud | Ready (batch polling) | API Key | Bootstrap server, topic, schema registry URL + credentials |
| Amazon Kinesis | Coming soon (connection test works) | IAM Role, Access Key | Stream name, AWS region, start position (LATEST or TRIM_HORIZON) |
Data Warehouses
| Connector | Status | Auth Methods | Key Config Fields |
|---|---|---|---|
| Snowflake | Ready | Username/Password, OAuth/Key-Pair Token | Account identifier, warehouse, database, schema, role, Source Table, Row Limit |
| Google BigQuery | Ready | Service Account JSON | GCP project ID, dataset, location, Source Table, Row Limit |
| Amazon Redshift | Ready | Username/Password, IAM Role | Cluster endpoint, port, database, schema |
| Databricks | Ready | Token, OAuth2 | Workspace URL, HTTP path, Unity Catalog, schema |
Snowflake and BigQuery row limits. Both connectors require a
sourceTable and accept an optional rowLimit (default 100,000 rows —
demo-safe). Set rowLimit to 0 to remove the cap entirely; only do this
once you have sized the target database and pipeline run budget for a
full-table read. The executor reads from the configured source table and
caps the row count at rowLimit (or returns every row when rowLimit is 0).Databases
| Connector | Auth Methods | Key Config Fields |
|---|---|---|
| PostgreSQL | Username/Password, Connection String | Host, port, database, schema, SSL mode (prefer/require/verify-ca/verify-full/disable) |
| MySQL | Username/Password, Connection String | Host, port, database, SSL mode |
| MongoDB | Connection String, Username/Password | Database, default collection (Atlas or self-hosted) |
CRM
| Connector | Auth Methods | Key Config Fields |
|---|---|---|
| Salesforce | OAuth2, Username/Password + Security Token | Instance URL, API version, objects to sync (Contact, Account, Opportunity, etc.) |
| HubSpot | Private App Token, OAuth2 | Portal ID, objects to sync (contacts, companies, deals) |
Customer Data Platforms
| Connector | Status | Auth Methods | Key Config Fields |
|---|---|---|---|
| Segment | Ready | API Key | Workspace slug, source ID |
Messaging
| Connector | Status | Auth Methods | Key Config Fields |
|---|---|---|---|
| Braze | Coming soon | API Key | REST API endpoint (US-01, US-02, US-03, EU-01) |
APIs and Direct Upload
| Connector | Status | Auth Methods | Key Config Fields |
|---|---|---|---|
| REST API | Ready | API Key, OAuth2, Token, Username/Password | Base URL, HTTP method (GET/POST), pagination type (none/offset/cursor/link), custom headers |
| Webhook | Ready | API Key, Bearer Token | Webhook path, max batch size |
| CSV File Upload | Ready | None | Delimiter (comma/tab/semicolon), header row detection |
| Shopify | Ready | Admin API Access Token | Shop domain |
| Stripe | Ready | Secret Key (sk_live_ or sk_test_) | No additional config required |
| Mailchimp | Ready | API Key | Server prefix, list/audience ID |
Coming-soon connectors (
amazon_kinesis, braze) are visibly badged
and disabled in the connector picker. You can still view the form
definitions in the registry, but creating a pipeline against them will
no-op at run time until ingestion is implemented. Connection Test
works today for Amazon Kinesis.Security
Connector credentials are encrypted at rest using the platform encryption layer. TheauthConfig field is never returned in API responses — the GET endpoint explicitly excludes it from the select clause. Only the connection metadata (name, type, status, last tested timestamp) is exposed.
Schemas
Schemas define your entity structures. Unlike metadata-only schema systems, KaireonAI schemas are backed by real PostgreSQL tables. Creating a schema executes acreate table statement. Adding a field runs alter table add column. Deleting a schema drops the table along with its dependent objects.
Entity Types
Each schema is assigned an entity type that describes what it models:| Entity Type | Typical Use |
|---|---|
customer | Customer profiles, demographics, preferences |
account | Account records, membership details |
transaction | Purchase history, payment records |
product | Product catalog, inventory |
event | Behavioral events, clickstream, interactions |
interaction | Customer-agent interactions, support tickets |
segment | Pre-computed audience segments |
campaign | Campaign metadata and performance |
subscription | Subscription plans and status |
custom | Any other entity type |
Field Types and PostgreSQL Mapping
Every field you define maps to a concrete PostgreSQL column type:| Abstract Type | PostgreSQL Type | Notes |
|---|---|---|
varchar | varchar (length-bounded) | Default length 255 if not specified |
text | text | Unlimited length strings |
integer | integer | 32-bit signed integer |
bigint | bigint | 64-bit signed integer |
smallint | smallint | 16-bit signed integer |
numeric / decimal | numeric (precision, scale) | Arbitrary precision; specify precision and scale |
float / real | real | 32-bit floating point |
double | double precision | 64-bit floating point |
boolean | boolean | true/false |
date | date | Calendar date |
timestamp | timestamp | Date and time without timezone |
timestamptz | timestamptz | Date and time with timezone |
json | jsonb | Binary JSON for nested/dynamic data |
uuid | uuid | Universally unique identifier |
DDL Behavior
When you create a schema through the API or UI, the following happens:- Metadata record is created in the platform’s data-schema registry with field definitions.
- A safe
create table if not existsstatement is executed against PostgreSQL with an auto-generated table name prefixed withds_(e.g., schema “customers” becomes tableds_customers). - Every table automatically gets
created_at(timestamptz) andupdated_at(timestamptz) columns. - Primary key handling: when no field is marked
isPrimaryKey, an auto-generatedid BIGSERIAL PRIMARY KEYcolumn is added. When any field is markedisPrimaryKey: true(e.g. via the schema-create form’s “Custom primary key column” pane, or by passingisPrimaryKey: truein the APIfields[]payload), the auto-id column is skipped and your column becomes the table’s PK. - Your defined fields are added as additional columns with their mapped PostgreSQL types, nullability, uniqueness constraints, and default values.
Field Constraints
Each field supports the following constraints:| Constraint | DDL Effect |
|---|---|
isPrimaryKey | Primary-key constraint on the column |
isUnique | Uniqueness constraint (skipped if the column is already the primary key) |
isNullable: false | Not-null constraint |
defaultValue | Default-value clause (validated against safe literal patterns) |
'text', 123, null, true, false), the current-timestamp function, and current_timestamp are permitted.
CSV Column Inference
When uploading CSV files, the platform can automatically infer column types from sample data:| Detected Pattern | Inferred Type |
|---|---|
| UUID format | uuid |
| true/false/yes/no/0/1 | boolean |
| Integer values | integer (or bigint if > 2,147,483,647) |
| Decimal values | numeric with auto-sized precision and scale |
| ISO date strings | date or timestamptz (if time component present) |
| JSON objects/arrays | json |
| Short strings (≤255 chars) | varchar with auto-sized length |
| Long strings | text |
Schema Types & Relationships
Schemas are classified into three types that determine how they participate in decisioning:| Schema Type | Purpose | Key Behavior |
|---|---|---|
| Customer | Core customer profile data (demographics, preferences, scores) | Primary entity for enrichment lookups. One row per customer. |
| Collection | Multi-row data linked to a customer (accounts, transactions, products held) | Linked to a Customer schema with an explicit join mapping. Multiple rows per customer. |
| Proposition | Offer/product catalog data used for matching and scoring | Referenced during inventory and scoring stages. |
collection.customer_id = customer.customer_id). This relationship enables the Enrich node to join collection data at decision time.
Summary columns: Collection schemas support pre-materialized aggregation columns that roll up collection data to the customer level. Supported aggregation functions:
| Function | Description | Example |
|---|---|---|
count | Number of rows | Total accounts |
sum | Sum of a numeric column | Total balance across accounts |
max | Maximum value | Highest credit limit |
min | Minimum value | Oldest account open date |
avg | Average value | Average transaction amount |
bool_or | True if any row is true | Has any premium account |
Schema References
Schemas are referenced throughout the platform:- Enrichment stages in Decision Flows load customer data from schema tables at decision time
- Computed values reference schema data via the
customer.*namespace in formulas - Pipelines use schemas as target destinations for ETL workflows
- Segments define customer cohorts using schema field conditions with SQL-like filters
Pipelines
Pipelines are visual ETL workflows built with a drag-and-drop flow editor powered by React Flow. Each pipeline connects a source connector to a target schema through a chain of transform nodes.Pipeline Structure
A pipeline consists of:- Connector — The source data connection (any of the 85 connector types)
- Schema — The target destination table
- Nodes — Visual nodes in the flow editor (source, transform, target) with position and config
- Edges — Connections between nodes defining data flow direction
Transform Types
KaireonAI provides 15 built-in transform types:Rename Field
Rename Field
Rename columns to standardize naming conventions across data sources. Configure the source field name and the desired target field name.Config:
sourceField, targetFieldCast Type
Cast Type
Convert a field to a different data type. Supported target types:
string, integer, bigint, float, numeric, boolean, date, timestamp, json, uuid.Config: field, targetTypeExpression
Expression
Compute a new field using SQL-like expressions. Includes a function picker with 50+ built-in functions across 5 categories:
Config:
| Category | Functions |
|---|---|
| String | upper, lower, trim, substring, replace, concat, split_part, left, right, lpad, length, regexp_replace |
| Numeric | abs, round, ceil, floor, mod, greatest, least, coalesce, nullif |
| Date/Time | date_trunc, extract, age, to_char, to_date, to_timestamp, date + interval, date - interval, now, current_date |
| Type Cast | Cast expressions to integer, bigint, numeric, text, boolean, date, timestamp, or uuid |
| Conditional | case when, coalesce, nullif, if null |
outputField, expressionFilter Rows
Filter Rows
Keep only rows matching specified conditions. Supports a visual condition builder with operators:
Config:
| Operator | Applies To |
|---|---|
=, != | string, integer, numeric, boolean |
>, <, >=, <= | integer, numeric, date, timestamp |
pattern match (like), starts with, ends with | string |
in, not in | string, integer |
is null, is not null | all types |
between | integer, numeric, date, timestamp |
field, operator, valueDrop Field
Drop Field
Remove unwanted columns from the data flow. Select one or more fields to exclude from downstream processing.Config:
fields (array)Add Field
Add Field
Add a new column with a name, data type, and default value or computed expression.Config:
fieldName, fieldType, defaultValueMap Values
Map Values
Replace field values using a JSON lookup table. Useful for code-to-label mapping.Example:
{"M": "Male", "F": "Female", "O": "Other"}Config: field, mappings (JSON object), defaultValue (for unmatched values)Split Field
Split Field
Split a single field into multiple output fields by a separator character. For example, split a full name into first and last name fields.Config:
sourceField, separator, outputFields (array)Merge Fields
Merge Fields
Concatenate multiple columns into a single field with an optional separator string.Config:
sourceFields (array), separator, outputFieldDeduplicate
Deduplicate
Remove duplicate rows based on one or more key columns. Keeps the first occurrence when duplicates are found.Config:
keyFields (array)Aggregate
Aggregate
Group by one or more columns and apply aggregate functions:
sum, count, avg, min, max.Config: groupByFields (array), aggregations (array of {field, function, alias})Lookup / Join
Lookup / Join
LEFT JOIN with another schema table to enrich data. Specify the lookup schema, join key, and which fields to pull from the lookup table.Config:
lookupSchema, joinField, lookupField, selectFields (array)Hash
Hash
Apply cryptographic hashing to field values. Supports SHA-256 and MD5 algorithms. Used for anonymization or generating deduplication keys.Config:
field, algorithm (sha256 or md5), outputFieldMask PII
Mask PII
Detect and mask personally identifiable information. Supports partial masking patterns:
- SSN:
***-**-1234 - Email:
j***@example.com - Phone:
***-***-5678 - Credit card:
****-****-****-1234
field, maskType, preserveLengthSummarize
Summarize
Aggregates collection data into a customer-level summary table. Runs at pipeline time, not decision time, so it does not add latency to the Recommend API. Use this transform when a Collection schema has summary columns defined.The Summarize node reads from the collection table, groups by the customer join key, applies the configured aggregation functions (COUNT, SUM, MAX, MIN, AVG, BOOL_OR), and writes the results to the summary table. The Enrich node can then read from the summary table at decision time.Config:
collectionSchemaId, customerSchemaId, joinKey, aggregations (array of { sourceField, function, outputField })Keyboard Shortcuts
| Shortcut | Action |
|---|---|
| Ctrl/Cmd + Z | Undo last action |
| Ctrl/Cmd + Shift + Z | Redo last undone action |
Execution Config
Pipelines support the following execution modes with configurable resource allocation:| Mode | Status | Description | Use Case |
|---|---|---|---|
| Batch | Ready | Process all records in configurable batch sizes | Full loads, daily syncs |
| Micro-Batch | Ready | Small frequent batches scheduled via cron | Near real-time with controlled throughput |
| Streaming | Self-host | Continuous record-by-record processing via long-lived consumer | Real-time event streams — Kafka, Kinesis, Pulsar |
Streaming mode spawns a long-lived consumer per pipeline and is gated
behind the
FLOW_STREAMING_ENABLED=true environment variable on the worker
service. The gate exists because a streaming consumer requires a persistent
worker container (separate from the request-driven API) that is part of the
self-hosted deployment topology, not the hosted playground. Selecting
Streaming on the playground returns a clear error pointing to
streaming-runtime. For near-real-time
ingestion on the hosted playground, use Batch or Micro-Batch with a
short cron cadence against a batch-polling connector (e.g., Kafka).Configuration Options
| Setting | Default | Description |
|---|---|---|
| Batch Size | 10,000 | Records per batch (batch/micro-batch modes) |
| Parallelism | 1 | Concurrent workers (1 to 16) |
| Partitioning | None | Partition key for distributed processing |
| Error Handling | Fail | skip (continue on error), fail (stop pipeline), dlq (route to dead-letter queue) |
| Scheduling | Manual | Cron expression for automatic runs |
Loading Strategies
When a pipeline writes to a target schema, you choose how incoming data merges with existing rows. The loading strategy is configured per pipeline and applies at execution time.| Strategy | Behavior | Use Case |
|---|---|---|
| Append | Insert new rows. Existing rows are untouched. This is the default. | Incremental event streams, transaction logs |
| Truncate & Load | Truncate the target table, then insert all rows. Brief downtime while the table is empty. | Full refresh of small-to-medium lookup tables |
| Upsert | Insert with on-conflict-do-update using a configurable key column. Rows with matching keys are updated; new keys are inserted. | Incremental updates where source rows may change (e.g., customer profiles) |
| Blue-Green Swap | Loads all rows into a staging table (_staging suffix), then performs an atomic table rename to swap staging into production. The production table is untouched until the swap succeeds. If loading fails, the production table is never affected. | Zero-downtime full refresh for production-critical tables. Recommended for high-availability deployments. |
Blue-Green Swap is the safest strategy for production data. The atomic rename means readers see either the old table or the new table — never a partially loaded state.
Row Validation
Every pipeline run validates incoming rows before writing to the target table. Validation catches type mismatches, null violations, and length overflows before they hit the database.| Mode | Behavior |
|---|---|
| Strict | Fail the entire pipeline on the first validation error. No rows are written. Use when data quality is critical. |
| Skip | Log the error and skip the invalid row. Valid rows are still written. Use when partial loads are acceptable. |
| Coerce | Attempt to fix the value (e.g., cast "123" to integer, trim overlength strings). If coercion fails, the row is skipped. |
- Type checking — Does the value match the target column’s data type?
- Null validation — Is a
NOT NULLcolumn receiving a null value? - Length limits — Does a
VARCHAR(n)value exceed its maximum length?
Progress Tracking
Pipeline executions display real-time progress in the UI:- Progress bar with percentage complete based on rows processed vs. estimated total
- Row counters showing loaded, failed, and skipped counts updated in real time
- Validation error accordion that expands to show individual row-level errors with field and reason
- Run history table showing each execution’s loading strategy, duration, row counts, and error summary
Run History
Each pipeline tracks its execution history with status (success, failed, partial, running), timestamps, and error details. The pipeline list view shows the last run status and timing at a glance.
Supported File Formats
When reading from file-based sources (local_fs, s3, gcs, azure_blob, sftp, http_pull — all share the same format-parser layer via the common object-store abstraction shipped in Phase 6.2):
| Format | Extension | Notes |
|---|---|---|
| CSV | .csv | RFC-4180 quoting/escaping via papaparse; header row → object keys; default delimiter , |
| JSON | .json | Array → one row per element; bare object → single row |
| JSON Lines | .jsonl | One JSON object per line; parse error reports line number |
| Parquet | .parquet | hyparquet reader (pure JS); up to ~50 MB per file |
| Avro | .avro | avsc reader; schema-embedded container files only |
| ORC | .orc | Self-host via the @kaireonai/orc-native add-on (not yet published). Workaround: convert ORC → parquet upstream. |
| TSV | .tsv | papaparse with delimiter: "\t"; same quoting/escaping as CSV |
| XML | .xml | fast-xml-parser; configurable recordPath (default: every direct child of root); attribute keys prefixed with @_ |
Field Reference
Connector Fields
| Field | Type | Description |
|---|---|---|
id | string | Auto-generated unique identifier |
name | string | Display name for the connector |
type | enum | One of 85 connector types |
category | enum | object_storage, streaming, warehouse, database, crm, cdp, messaging, api |
authMethod | enum | iam_role, access_key, service_account_json, connection_string, oauth2, api_key, username_password, ssh_key, token, none |
authConfig | object | Encrypted credentials (never returned in API responses) |
config | object | Connection-specific settings (bucket, host, topic, etc.) |
status | enum | connected, disconnected, error, testing |
lastTestedAt | datetime | Timestamp of most recent connection test |
lastError | string | Error message from last failed test |
Schema Fields
| Field | Type | Description |
|---|---|---|
id | string | Auto-generated unique identifier |
name | string | Machine-safe name (lowercase, underscored) used for table naming |
displayName | string | Human-readable name shown in the UI |
description | string | Optional description |
entityType | enum | customer, account, transaction, product, event, interaction, segment, campaign, subscription, custom |
tableName | string | Auto-generated PostgreSQL table name (prefixed with ds_) |
fields | array | Ordered list of field definitions (name, dataType, constraints) |
status | enum | active, draft, archived |
Pipeline Fields
| Field | Type | Description |
|---|---|---|
id | string | Auto-generated unique identifier |
name | string | Display name for the pipeline |
description | string | Optional description |
connectorId | string | Reference to the source connector |
schemaId | string | Reference to the target schema |
nodes | array | Flow editor nodes with type, label, config, and position |
edges | array | Connections between nodes (sourceNodeId, targetNodeId) |
schedule | string | Cron expression for scheduled execution (null for manual) |
executionConfig | object | Batch size, parallelism, partitioning, error handling |
status | enum | draft, active, paused, archived |
lastRunAt | datetime | Timestamp of most recent execution |
lastRunStatus | enum | success, failed, partial, running |
Worked Example
This example walks through creating a customer data pipeline end-to-end: define a schema, connect to S3, build a pipeline with transforms, and execute.Step 1: Create a Customer Schema
ds_customers with the following DDL:
Step 2: Create an S3 Connector
Step 3: Build a Pipeline with Transforms
Create a pipeline that reads customer CSVs from S3, filters out zero-balance records, renames a field, and masks PII before loading into the schema:- Reads CSV files from the
customers/daily/prefix in S3 - Filters out rows where
balance <= 0 - Renames
email_addresstoemailto match the schema field name - Masks the
ssnfield to***-**-1234format - Loads the transformed data into the
ds_customerstable
API Quick Reference
Connectors
| Operation | Method | Endpoint |
|---|---|---|
| List connectors | GET | /api/v1/connectors |
| Create connector | POST | /api/v1/connectors |
| Update connector | PUT | /api/v1/connectors |
| Delete connector | DELETE | /api/v1/connectors?id={id} |
Schemas
| Operation | Method | Endpoint |
|---|---|---|
| List schemas | GET | /api/v1/schemas |
| Create schema (also creates the backing table) | POST | /api/v1/schemas |
| Delete schema (also drops the backing table) | DELETE | /api/v1/schemas?id={id} |
Pipelines
| Operation | Method | Endpoint |
|---|---|---|
| List pipelines | GET | /api/v1/pipelines |
| Create pipeline | POST | /api/v1/pipelines |
| Update pipeline | PUT | /api/v1/pipelines |
| Delete pipeline | DELETE | /api/v1/pipelines?id={id} |
limit and cursor parameters.
For complete API request/response schemas, see the Connectors API, Schemas API, and Pipelines API.
Related
Decision Flows
Use Enrich stages to load schema data at decision time for real-time personalization.
Computed Values
Write formulas that reference
customer.* fields from your schema tables.Core Concepts
Understand how data connects to decisioning and delivery across the platform.