Skip to main content

Documentation Index

Fetch the complete documentation index at: https://docs.kaireonai.com/llms.txt

Use this file to discover all available pages before exploring further.

The Flow source executor reads files into a pipeline run with five production-grade behaviors:
  1. Pattern matching — glob, regex, or date-templated globs.
  2. Ordering — latest-by-mtime, FIFO, lexicographic, or process-all.
  3. Wait policy — retry up to N times with configurable interval; act on deadline miss with alert / skip / fail.
  4. Atomic staging — every matched file is renamed into a .processing/ folder before any byte is read; on success it moves to a date-templated .archive/ folder, on failure to .failed/.
  5. Format parsers — CSV, JSON, and JSONL parse via native + papaparse helpers; Parquet, Avro, TSV, and XML parse via hyparquet, avsc, papaparse, and fast-xml-parser respectively. ORC throws an explicit actionable error pointing to the convert-to-parquet upstream workaround (the optional @kaireonai/orc-native add-on package is planned but not yet published).

Connector scope

As of Phase 6.2, the source executor runs against six connector kinds in production: local_fs, s3, gcs, azure_blob, sftp, and http_pull. All six share a single cloud-object-store abstraction — list, get, and archive — so pattern matching, ordering, atomic staging, and wait-policy semantics behave identically across backends. The IR enum still includes ftp for forward compatibility, but plaintext FTP has no runtime executor; users on plaintext FTP should switch to sftp.
Calling the source executor with connector: "ftp" throws an explicit “connector ftp has no runtime executor (deprecated plaintext; use sftp)” error so the boundary is visible to operators.

Cloud source credentials

Cloud connectors load credentials from the Connector.authConfig field, which is encrypted at rest (AES-256-GCM). The runtime decrypts on-demand; plaintext credentials never persist outside the per-run process.

S3 (connector: "s3")

Two auth modes — pick one. Both rely on region (e.g. us-east-1). Access keys:
KeyRequiredNotes
accessKeyIdyesIAM user access key
secretAccessKeyyesIAM user secret
sessionTokennofor STS-vended creds
regionyesbucket region
IAM Role assumption (preferred for App Runner / EKS workloads):
KeyRequiredNotes
roleArnyesrole to assume; alternate name iamRoleArn accepted
externalIdnotrust-policy ExternalId condition
regionyesbucket region
The runtime calls AWS STS to assume the role and tags the resulting session with the kaireon-flow-runtime- name prefix (distinguishable from connector-test sessions in CloudTrail).

GCS (connector: "gcs")

KeyRequiredNotes
projectIdyesGCP project owning the bucket
serviceAccountKeyone-offull service-account JSON as a string
accessTokenone-ofshort-lived OAuth token (alternative to serviceAccountKey)
If neither serviceAccountKey nor accessToken is set, the runtime falls through to Application Default Credentials (requires the GOOGLE_APPLICATION_CREDENTIALS env var on the runner).

Azure Blob (connector: "azure_blob")

Two auth modes — pick one. Connection string:
KeyRequiredNotes
connectionStringyesfull Azure storage connection string
Account + key:
KeyRequiredNotes
storageAccountyesaccount name (3-24 lowercase alphanumeric)
accountKeyyesbase64 storage account key

SFTP (connector: "sftp")

KeyRequiredNotes
hostyesserver hostname
portnodefaults to 22
usernameyeslogin
passwordone-ofpassword authentication
privateKeyone-ofraw PEM contents (not a path)
The connection opens lazily on first call and is reused across list / get / archive within a single run; the source executor closes it in the run-level finally block. Connect uses readyTimeout: 10000 (10 s) to fail fast on unreachable hosts.

HTTP-pull (connector: "http_pull")

The source URL goes in node.config.path (not authConfig). Auth headers come from:
KeyRequiredNotes
apiKeyone-ofadded as Authorization: Bearer <apiKey> (or as the header named in apiKeyHeader if set)
apiKeyHeadernocustom header name when apiKey is set (e.g. X-API-Key)
bearerTokenone-ofadded as Authorization: Bearer <bearerToken>
Every URL is gated by the platform’s URL validator before fetch. Loopback (localhost, 127.0.0.1), link-local (169.254.x.x — including the AWS IMDS endpoint), and RFC1918 private ranges are rejected up-front. Both list and get operations validate independently so a maliciously-crafted file descriptor cannot bypass the SSRF check.

Cloud archive semantics

The source executor’s successFolder / failureFolder config is honored across backends with backend-appropriate semantics:
BackendArchive operation
local_fsfs.rename (atomic on POSIX within the same filesystem)
s3server-side copy + delete
gcsbucket-file copy + delete
azure_blobblob copy-from-URL (poll until done) + delete
sftpnative rename over SFTP
http_pullno-op (read-only external; archive folders are ignored)
Duplicate-key handling on the destination: every backend appends a .dup-${ts} suffix when the target already exists, mirroring the local-filesystem atomic-staging behavior.
For http_pull, the IR atomicity.successFolder/failureFolder config is still required for schema validation parity but its values are ignored at runtime. A debug log emits “http_pull archive: no-op (read-only external source)” so operators can confirm the no-op was reached.

Pattern types

{ "pattern": { "type": "glob",          "value": "orders_*.csv" } }
{ "pattern": { "type": "regex",         "value": "^events_[0-9]{4}\\.json$" } }
{ "pattern": { "type": "date_template", "value": "log_{YYYY-MM-DD}.jsonl",
                "timezone": "America/Los_Angeles" } }

Date-template tokens

TokenMeaning
{YYYY}4-digit year
{MM}2-digit month
{DD}2-digit day
{HH}2-digit hour (24h)
{mm}2-digit minute
{ss}2-digit second
{YYYY-MM-DD}shorthand combo
{YYYYMMDD}compact combo
Tokens are evaluated against the source’s configured timezone (IANA), defaulting to UTC. After expansion, the value is compiled as a glob (literal text + * and ? wildcards).

Ordering

ModeBehavior
latest_by_mtimemtime descending; newest first
fifo_by_namelexicographic ascending
lexicographicalias of fifo_by_name
process_allpreserve readdir order
The source executor processes ALL matched files in the chosen order. Single-file mode is a future option.

Wait policy

"waitPolicy": {
  "maxRetries": 6,
  "intervalMinutes": 10,
  "onMissAction": "alert"
}
The discover loop runs up to maxRetries times, sleeping intervalMinutes between attempts. On exhaustion:
onMissActionBehavior
alertLog a warn, emit a warning into the System Health feed (Source had no files), return rowsOut: 0; downstream sees the empty result
skipLog info only; return rowsOut: 0
failThrow file arrival deadline missed; the run fails

Atomic staging

"atomicity": {
  "stagingFolder": ".processing/",
  "successFolder": ".archive/{YYYY-MM-DD}/",
  "failureFolder": ".failed/"
}
Every matched file is renamed into stagingFolder BEFORE any byte is read. On clean parse, files move to successFolder (date templates are expanded at archive time, in UTC by default). On parse failure, they move to failureFolder and the upstream error rethrows. stagingFolder, successFolder, and failureFolder may be relative to path or absolute. If a file with the same basename already exists at the destination, a .dup-<unix-ms> suffix is appended.

Crash recovery

If a run crashes mid-parse, files remain in stagingFolder. A follow-up improvement will rescan that folder first on the next run; today the operator should manually move files back if needed.

Formats

FormatLibraryNotes
csvpapaparse 5RFC-4180 quoting & escaping; default delimiter ,; header row → object keys; blank lines skipped
jsonnative JSON.parsearray → rows; object → single row
jsonlnative JSON.parse per lineline-delimited; parse error reports line number
parquethyparquetpure JS reader; up to ~50 MB per file; binary read mode (Buffer, not utf8)
avroavscschema-embedded container files only; nested records become nested JS objects
orc(not implemented)throws ORC parsing requires the optional @kaireonai/orc-native add-on package, which is not yet published. Configure your pipeline to convert ORC → parquet upstream until then.
tsvpapaparse 5 (delimiter: "\t")RFC-4180-style quoting; same engine as CSV
xmlfast-xml-parser 5configurable recordPath (default: every direct child of root); attribute keys prefixed with @_; text nodes keyed #text

XML recordPath

Specify which XML elements become rows via the recordPath config field on the source node:
# Atom-style feed: parse <feed>/<entry> elements
recordPath: "/feed/entry"
When recordPath is omitted (or set to *), every direct child of the root element becomes a row.

Error model

WhereTriggerEffect
resolvePatternmalformed regexrun aborts with the original pattern
discoverFilesdir not foundENOENT propagates (operator-visible)
waitForFilesexhausted + onMissAction=failrun fails with file arrival deadline missed
stageFilesrename errorrethrows; downstream archive routes to failureFolder
parseByFormatmalformed inputrun fails with <format> parse error: ...; original error preserved on error.cause; files moved to failureFolder
parseByFormat (orc)alwaysthrows the explicit “ORC parsing requires…” message; not silent; files moved to failureFolder