Skip to main content

Overview

The Composable Pipeline is the next-generation execution model for Decision Flows. Instead of a fixed sequence of stages, you assemble a pipeline from typed node blocks arranged across three ordered phases. Each node performs one job — load inventory, filter candidates, score offers, rank results — and you choose which nodes to include and how to configure them. Existing v1 flows continue to work unchanged. When you save a flow with version: 2 in its config, KaireonAI switches to the composable pipeline engine automatically.

Three Phases

Every composable pipeline organizes its nodes into three phases. Nodes within a phase run in position order; phases always execute in sequence.
PhaseNamePurposeAllowed Nodes
1NarrowLoad candidates and reduce the setinventory, filter, match_creatives, enrich, qualify, contact_policy, call_flow
2Score & RankScore, rank, and group survivorsscore, rank, group, call_flow
3OutputCompute final values and format responsecompute, set_properties, response

Node Types

Phase 1 — Narrow

Loads candidate offers from the database. Every pipeline must start with exactly one Inventory node.Config: scope"all" loads every active offer, or "category" restricts to a specific category.
Removes candidates that fail condition-based rules. Supports 13 operators across four namespaces.Operators: eq, neq, gt, gte, lt, lte, in, not_in, contains, starts_with, regex, is_null, is_not_nullNamespaces: offer.*, customer.*, request.*, channel.*Config:
  • conditions — Array of { field, operator, value } rules
  • combinator"AND" (all must pass) or "OR" (any can pass)
Filter nodes are only allowed in Phase 1.
Attaches eligible creative assets to each candidate offer based on channel targeting and audience rules.Config:
  • requireCreative — Whether to remove candidates without a creative (default true)
  • placementMatchMode"exact", "any", or "none" (default "any")
Loads customer data from schema tables. The loaded fields become available as customer.* variables in all downstream nodes.
Stub node (v2.0): The Enrich node currently logs a warning and does not load data from schema tables. Customer data (customer.* variables) will be empty. This will be fully implemented in a future release.
Config: sources — Array of enrichment sources, each with:
  • schemaId — Which schema table to query
  • lookupKey — The lookup field (default "customer_id")
  • fields — Which columns to load
  • prefix — Namespace prefix for loaded fields (default "customer")
  • cacheTtlSeconds — Redis cache duration (default 60)
  • optional — Whether to continue if the lookup fails (default true)
Applies qualification rules with AND/OR logic trees. Rules can be combined into nested groups for complex eligibility logic.
Partial implementation (v2.0): When using the AND/OR logic tree, individual rule evaluation currently hardcodes ruleType to "segment_match" with an empty config. Full rule loading from the database is not yet wired up.
Config:
  • mode"all" (run every rule), "selected" (pick specific rules), or "none" (skip)
  • qualificationRuleIds — Array of rule IDs when mode is "selected"
  • logic — Optional AND/OR logic group for combining rule results
AND/OR Logic: Rules and groups can be nested recursively:
{
  "operator": "AND",
  "ruleIds": ["rule_age", "rule_region"],
  "groups": [
    {
      "operator": "OR",
      "ruleIds": ["rule_premium", "rule_loyalty"],
      "groups": []
    }
  ]
}
This means: customer must pass age AND region AND (premium OR loyalty).
Enforces frequency and timing guardrails to prevent over-contacting customers.
Partial implementation (v2.0): The Contact Policy node currently passes empty rules and empty contact history summaries to the policy engine. It will filter correctly once rule and history loading is wired up in a future release.
Config:
  • mode"all", "selected", or "none"
  • contactPolicyIds — Array of policy IDs when mode is "selected"
Delegates to another Decision Flow as a sub-pipeline. Useful for reusable filtering or qualification logic.
Stub node (v2.0): The Call Flow node currently logs a warning and skips sub-flow invocation. The target flow is not executed. This will be fully implemented in a future release.
Config:
  • flowId — The target Decision Flow to call
  • mergeMode"append" (add sub-flow results to candidates) or "replace" (use sub-flow results only)
Call flow nodes are allowed in Phase 1 and Phase 2 only. Maximum nesting depth is 2 levels. Circular references are detected and rejected at save time.

Phase 2 — Score & Rank

Runs scoring models against each candidate. Every pipeline must have exactly one Score node.Config:
  • method"priority_weighted", "propensity", or "formula"
  • defaultModel — The model key to use when no override matches
  • overrides — Array of model overrides scoped to offer, category, or channel
  • overridePriority — Resolution order, e.g. ["offer", "category", "channel", "default"]
  • formula — Weight-based blending when method is "priority_weighted":
    {
      "propensityWeight": 0.4,
      "contextWeight": 0.3,
      "valueWeight": 0.2,
      "leverWeight": 0.1
    }
    
    Weights must sum to 1.0.
Champion/Challenger experiments:
{
  "championChallenger": {
    "enabled": true,
    "champion": { "modelKey": "model_v2", "weight": 80 },
    "challengers": [
      { "modelKey": "model_v3", "weight": 20 }
    ]
  }
}
Traffic is split deterministically using a hash of the customer ID, so each customer always sees the same model variant.
Produces the final ordered list using multi-objective arbitration. One Rank node allowed per pipeline.Config:
  • method"topN", "diversity", "round_robin", or "explore_exploit"
  • maxCandidates — How many offers to return (1–50, default 5)
  • maxPerCategory — Optional cap on offers per category
  • maxPerChannel — Optional cap on offers per channel
  • explorationRate — Optional 0–1 value for explore/exploit strategies
Allocates ranked candidates across multiple named placements (e.g., hero banner, sidebar, email). Must appear after the Rank node.Config:
  • placements — Array of { placementId, count } definitions
  • allocationStrategy"priority_fill" assigns best-scoring offers to each placement in order, preventing duplicates across placements
When a Group node is present, the Recommend API response includes a placements object instead of a flat array:
{
  "placements": {
    "hero": [{ "offerId": "...", "score": 0.95 }],
    "sidebar": [{ "offerId": "...", "score": 0.82 }]
  }
}

Phase 3 — Output

Evaluates formula-based computed fields for each candidate offer. One Compute node allowed.Config:
  • overrides — Flow-level formula overrides (array of { name, formula, outputType })
  • extras — Additional computed fields beyond what categories define (same shape)
See the Computed Values Guide and Formula Reference for formula syntax.
Attaches static or derived key-value pairs to each candidate before the response is assembled.Config: properties — Array of { key, value } pairs. Each property can also include a formula field for dynamically computed values.
Formats the final output. Every pipeline must end with exactly one Response node.Config:
  • includeDebugTrace — Whether to include debug trace data (default false)
  • responseFormat"standard" (flat array) or "grouped" (placements object, requires Group node)

Stub Node Summary

The following nodes have limited functionality in the initial v2 release. They are structurally valid and can be included in pipeline configs, but their runtime behavior is incomplete:
NodeCurrent BehaviorImpact
enrichLogs warning, does not query schema tablescustomer.* variables are empty in downstream nodes
call_flowLogs warning, skips sub-flow executionSub-flows are not invoked; candidates pass through unchanged
qualify (with logic tree)Evaluates logic tree but hardcodes segment_match rule typeRule evaluation may not match expected behavior for non-segment rules
contact_policyPasses empty rules and empty historyNo candidates are filtered by contact policies
All other nodes (inventory, filter, match_creatives, score, rank, group, compute, set_properties, response) are fully functional.

Worked Example

This example walks through 8 offers being processed by a v2 pipeline with grouping and computed fields.

Pipeline Config

{
  "version": 2,
  "nodes": [
    { "id": "n1", "type": "inventory", "phase": 1, "position": 0,
      "config": { "scope": "all", "includeStatuses": ["active"] } },
    { "id": "n2", "type": "filter", "phase": 1, "position": 1,
      "config": {
        "conditions": [
          { "field": "offer.priority", "operator": "gte", "value": 30 }
        ],
        "combinator": "AND"
      } },
    { "id": "n3", "type": "score", "phase": 2, "position": 0,
      "config": { "method": "priority_weighted" } },
    { "id": "n4", "type": "rank", "phase": 2, "position": 1,
      "config": { "method": "topN", "maxCandidates": 4 } },
    { "id": "n5", "type": "group", "phase": 2, "position": 2,
      "config": {
        "placements": [
          { "placementId": "hero", "count": 1 },
          { "placementId": "sidebar", "count": 3 }
        ],
        "allocationStrategy": "priority_fill"
      } },
    { "id": "n6", "type": "compute", "phase": 3, "position": 0,
      "config": {
        "extras": [
          { "name": "display_rate", "formula": "round(base_rate * 0.9, 2)", "outputType": "number" }
        ]
      } },
    { "id": "n7", "type": "response", "phase": 3, "position": 1,
      "config": { "responseFormat": "grouped" } }
  ]
}

Step-by-Step Execution

Step 1 — Inventory: Loads 8 active offers from the database.
OfferPriorityWeightbase_rate
Premium Card9010014.99
Travel Rewards808017.99
Cash Back709015.49
Student Card2510022.99
Balance Transfer607012.99
Secured Card2010024.99
Business Platinum856016.99
Everyday Card405019.99
Step 2 — Filter (offer.priority >= 30): Removes Student Card (25) and Secured Card (20). 6 remain. Step 3 — Score (priority_weighted): Each candidate gets score = (priority/100) * (weight/100).
OfferScore
Premium Card0.90
Travel Rewards0.64
Cash Back0.63
Business Platinum0.51
Balance Transfer0.42
Everyday Card0.20
Step 4 — Rank (topN, maxCandidates=4): Takes the top 4 by score. Everyday Card and Balance Transfer are cut. 4 remain. Step 5 — Group (priority_fill): Allocates to placements in score order, no duplicates across placements.
PlacementOffers
hero (1 slot)Premium Card (0.90)
sidebar (3 slots)Travel Rewards (0.64), Cash Back (0.63), Business Platinum (0.51)
Step 6 — Compute: Evaluates display_rate = round(base_rate * 0.9, 2) for each candidate.
Offerdisplay_rate
Premium Card13.49
Travel Rewards16.19
Cash Back13.94
Business Platinum15.29
Step 7 — Response (grouped format): Returns the final response:
{
  "customerId": "cust_12345",
  "placements": {
    "hero": [
      {
        "offerId": "offer_premium_card",
        "offerName": "Premium Card",
        "score": 0.90,
        "rank": 1,
        "personalization": { "display_rate": 13.49 }
      }
    ],
    "sidebar": [
      {
        "offerId": "offer_travel_rewards",
        "offerName": "Travel Rewards",
        "score": 0.64,
        "rank": 2,
        "personalization": { "display_rate": 16.19 }
      },
      {
        "offerId": "offer_cash_back",
        "offerName": "Cash Back",
        "score": 0.63,
        "rank": 3,
        "personalization": { "display_rate": 13.94 }
      },
      {
        "offerId": "offer_biz_platinum",
        "offerName": "Business Platinum",
        "score": 0.51,
        "rank": 4,
        "personalization": { "display_rate": 15.29 }
      }
    ]
  },
  "traceSummary": {
    "totalCandidates": 8,
    "afterQualification": 0,
    "afterContactPolicy": 0,
    "topScores": [
      { "offerId": "offer_premium_card", "score": 0.90 },
      { "offerId": "offer_travel_rewards", "score": 0.64 },
      { "offerId": "offer_cash_back", "score": 0.63 },
      { "offerId": "offer_biz_platinum", "score": 0.51 }
    ]
  }
}

Pipeline Validation

KaireonAI validates the pipeline structure when you save a flow. Invalid pipelines are rejected with specific error codes:
CodeRule
EMPTY_PIPELINEPipeline must contain at least one node
MISSING_INVENTORYMust start with an Inventory node
MISSING_RESPONSEMust end with a Response node
MISSING_SCOREMust contain a Score node
DUPLICATE_SINGLETONOnly one of each: inventory, score, rank, group, compute, response
PHASE_ORDER_VIOLATIONPhases must be non-decreasing (1 -> 2 -> 3)
FILTER_WRONG_PHASEFilter nodes must be in Phase 1
GROUP_BEFORE_RANKGroup node must appear after the Rank node
CALL_FLOW_WRONG_PHASECall Flow nodes must be in Phase 1 or 2
CALL_FLOW_MAX_DEPTHSub-flow nesting cannot exceed 2 levels
CALL_FLOW_CIRCULARCircular call_flow references are not allowed
INVALID_NODE_CONFIGA node’s config doesn’t match its type-specific schema

V2 Config Format

A v2 flow config uses this JSON structure:
{
  "version": 2,
  "nodes": [
    { "id": "n1", "type": "inventory", "phase": 1, "position": 0, "config": { "scope": "all" } },
    { "id": "n2", "type": "filter", "phase": 1, "position": 1, "config": { "conditions": [{ "field": "offer.status", "operator": "eq", "value": "active" }], "combinator": "AND" } },
    { "id": "n3", "type": "score", "phase": 2, "position": 0, "config": { "method": "priority_weighted" } },
    { "id": "n4", "type": "rank", "phase": 2, "position": 1, "config": { "method": "topN", "maxCandidates": 5 } },
    { "id": "n5", "type": "response", "phase": 3, "position": 0, "config": {} }
  ],
  "flowConfig": {
    "experiment": { "enabled": false },
    "timeout": { "maxMs": 5000 },
    "caching": { "enabled": false }
  }
}
The version: 2 field is what tells KaireonAI to use the composable pipeline engine. Without it, the flow runs through the v1 fixed-stage engine.

Migrating from V1

Existing v1 flows work without changes. When you’re ready to migrate, use the built-in migration utility that converts a v1 config to v2 format:
  • The v1 inventory stage maps to an Inventory node
  • filter.qualificationMode maps to a Qualify node
  • filter.contactPolicyMode maps to a Contact Policy node
  • scoring.method and scoring.modelKey map to a Score node
  • ranking.method and ranking.maxCandidates map to a Rank node
  • Enrichment and compute configs (if present) map to Enrich and Compute nodes
  • A Response node is always added at the end
The migration preserves all existing behavior — the same offers will be recommended in the same order.

API

Saving a V2 flow

Use the standard Decision Flows API. The draftConfig field accepts both v1 and v2 formats:
PUT /api/v1/decision-flows
{
  "id": "df_12345",
  "draftConfig": {
    "version": 2,
    "nodes": [
      { "id": "n1", "type": "inventory", "phase": 1, "position": 0, "config": { "scope": "all" } },
      { "id": "n2", "type": "score", "phase": 2, "position": 0, "config": { "method": "priority_weighted" } },
      { "id": "n3", "type": "rank", "phase": 2, "position": 1, "config": { "method": "topN", "maxCandidates": 5 } },
      { "id": "n4", "type": "response", "phase": 3, "position": 0, "config": {} }
    ],
    "flowConfig": {}
  }
}
KaireonAI validates the v2 config with both schema validation and pipeline structural rules before saving.

Executing a V2 flow

V2 flows are executed through the same Recommend API as v1 flows — no changes needed on the caller side:
POST /api/v1/recommend
{
  "customerId": "cust_12345",
  "decisionFlowKey": "credit_cards",
  "attributes": { "channel": "web" },
  "maxOffers": 5
}
If the flow config has version: 2, the engine uses the composable pipeline. Otherwise it uses the v1 engine.

Next Steps