Skip to main content
Phase 5 ships the server-side foundation for the three-tier connector model.

Three tiers

TierMechanismWhen
YAML specDeclarative HTTP/REST connector90% of SaaS APIs (Salesforce, Stripe, HubSpot, Klaviyo, …)
TS plugindefineConnector({...}) via the SDKStreaming + complex protocols (Kafka, Snowflake COPY, JDBC)
AI-generated YAMLComing in Phase 5bNet-new HTTP APIs from a docs URL

YAML spec format

kind: connector
version: "1.0"
id: salesforce_rest
displayName: Salesforce REST
category: crm
auth:
  - type: oauth2
    authorizeUrl: https://login.salesforce.com/services/oauth2/authorize
    tokenUrl: https://login.salesforce.com/services/oauth2/token
    scopes: [api, refresh_token]
    clientIdRef: "{{secrets.clientId}}"
    clientSecretRef: "{{secrets.clientSecret}}"
endpoints:
  - id: query
    method: GET
    url: "{{instance_url}}/services/data/v59.0/query"
    params:
      q: "{{soql}}"
    pagination:
      type: cursor
      cursorField: nextRecordsUrl
      pageSize: 200
    rateLimit:
      requestsPerSecond: 10
read:
  primary: query
  resourceTypes: [Account, Contact, Lead, Opportunity]
Validated by parseConnectorYaml(text) — js-yaml load + Zod safeParse, same two-phase pattern as parsePipelineIR. Invalid specs are rejected before they enter the registry.

Auth types

none, api_key (header or query), basic, oauth2 (operator passes a pre-fetched accessToken in secrets.accessToken; Phase 5 doesn’t auto-refresh, that’s a follow-up).

Pagination types

none, cursor (follows a response field until absent), offset (numeric offset+limit), page (page number). Each stops on a short page or maxPages cap (default 100).

Rate limit

requestsPerSecond paces inter-page sleeps. burstSize is reserved for a future token-bucket implementation.

HTTP runtime

executeYamlEndpoint({ spec, endpointId, params, secrets, signal }) runs the call:
  1. Resolve auth → headers + query params
  2. Substitute {{var}} templates against (params + secrets)
  3. SSRF-validate the URL via lib/security/url-validator.validateAndResolve
  4. Fetch with rate-limit pacing
  5. Walk pagination
  6. Extract rows via responseRowsPath (defaults to root)

Plugin SDK

import { z } from "zod";
import { defineConnector } from "@/lib/flow/connectors/plugin-sdk";

export default defineConnector({
  id: "kafka",
  displayName: "Apache Kafka",
  category: "streaming",
  configSchema: z.object({
    brokers: z.array(z.string()),
    topic: z.string(),
    consumerGroup: z.string(),
  }),
  read: async function* (ctx) {
    // implementation; yields { key, value, partition, offset, ... }
  },
  testConnection: async (config) => ({ ok: true }),
});
Plugins register themselves in the in-memory connectorRegistry via connectorRegistry.registerPlugin(plugin). Plugin imports must be explicit (no filesystem auto-discovery in Phase 5).

MCP createYamlConnector

Promoted from Phase 2b stub. Accepts a YAML text body, validates via parseConnectorYaml, registers on success.
{
  "ok": true,
  "id": "salesforce_rest",
  "displayName": "Salesforce REST"
}
Or:
{
  "ok": false,
  "errors": ["auth.0.tokenUrl: must be https://"]
}

Out of scope (deferred to Phase 5b)

  • AI generator endpoint (POST /api/v1/ai/generate-yaml-connector) — paste a docs URL, get a draft YAML
  • Marketplace UI — separate UX layer
  • Bulk migration of the 25+ existing HTTP-shaped connectors
  • OAuth2 auto-refresh on 401
  • Filesystem auto-discovery of plugins
  • Streaming connector plugins (Kafka, etc.) — SDK supports them, none ship in Phase 5