Documentation Index
Fetch the complete documentation index at: https://docs.kaireonai.com/llms.txt
Use this file to discover all available pages before exploring further.
The Flow source executor reads files into a pipeline run with five
production-grade behaviors:
- Pattern matching — glob, regex, or date-templated globs.
- Ordering — latest-by-mtime, FIFO, lexicographic, or process-all.
- Wait policy — retry up to N times with configurable interval; act
on deadline miss with
alert / skip / fail.
- Atomic staging — every matched file is renamed into a
.processing/ folder before any byte is read; on success it moves to
a date-templated .archive/ folder, on failure to .failed/.
- Format parsers — CSV, JSON, and JSONL parse via native + papaparse
helpers; Parquet, Avro, TSV, and XML parse via hyparquet, avsc,
papaparse, and fast-xml-parser respectively. ORC throws an explicit
actionable error pointing to the convert-to-parquet upstream workaround
(the optional
@kaireonai/orc-native add-on package is planned but
not yet published).
Connector scope
As of Phase 6.2, the source executor runs against six connector kinds in
production: local_fs, s3, gcs, azure_blob, sftp, and
http_pull. All six share a single cloud-object-store abstraction —
list, get, and archive — so pattern matching, ordering, atomic
staging, and wait-policy semantics behave identically across backends.
The IR enum still includes ftp for forward compatibility, but
plaintext FTP has no runtime executor; users on plaintext FTP should
switch to sftp.
Calling the source executor with connector: "ftp" throws an explicit
“connector ftp has no runtime executor (deprecated plaintext; use
sftp)” error so the boundary is visible to operators.
Cloud source credentials
Cloud connectors load credentials from the Connector.authConfig
field, which is encrypted at rest (AES-256-GCM). The runtime decrypts
on-demand; plaintext credentials never persist outside the per-run
process.
S3 (connector: "s3")
Two auth modes — pick one. Both rely on region (e.g. us-east-1).
Access keys:
| Key | Required | Notes |
|---|
accessKeyId | yes | IAM user access key |
secretAccessKey | yes | IAM user secret |
sessionToken | no | for STS-vended creds |
region | yes | bucket region |
IAM Role assumption (preferred for App Runner / EKS workloads):
| Key | Required | Notes |
|---|
roleArn | yes | role to assume; alternate name iamRoleArn accepted |
externalId | no | trust-policy ExternalId condition |
region | yes | bucket region |
The runtime calls AWS STS to assume the role and tags the resulting
session with the kaireon-flow-runtime- name prefix (distinguishable
from connector-test sessions in CloudTrail).
GCS (connector: "gcs")
| Key | Required | Notes |
|---|
projectId | yes | GCP project owning the bucket |
serviceAccountKey | one-of | full service-account JSON as a string |
accessToken | one-of | short-lived OAuth token (alternative to serviceAccountKey) |
If neither serviceAccountKey nor accessToken is set, the runtime
falls through to Application Default Credentials (requires the
GOOGLE_APPLICATION_CREDENTIALS env var on the runner).
Azure Blob (connector: "azure_blob")
Two auth modes — pick one.
Connection string:
| Key | Required | Notes |
|---|
connectionString | yes | full Azure storage connection string |
Account + key:
| Key | Required | Notes |
|---|
storageAccount | yes | account name (3-24 lowercase alphanumeric) |
accountKey | yes | base64 storage account key |
SFTP (connector: "sftp")
| Key | Required | Notes |
|---|
host | yes | server hostname |
port | no | defaults to 22 |
username | yes | login |
password | one-of | password authentication |
privateKey | one-of | raw PEM contents (not a path) |
The connection opens lazily on first call and is reused across
list / get / archive within a single run; the source executor closes
it in the run-level finally block. Connect uses readyTimeout: 10000
(10 s) to fail fast on unreachable hosts.
HTTP-pull (connector: "http_pull")
The source URL goes in node.config.path (not authConfig). Auth
headers come from:
| Key | Required | Notes |
|---|
apiKey | one-of | added as Authorization: Bearer <apiKey> (or as the header named in apiKeyHeader if set) |
apiKeyHeader | no | custom header name when apiKey is set (e.g. X-API-Key) |
bearerToken | one-of | added as Authorization: Bearer <bearerToken> |
Every URL is gated by the platform’s URL validator before fetch.
Loopback (localhost, 127.0.0.1), link-local (169.254.x.x —
including the AWS IMDS endpoint), and RFC1918 private ranges are
rejected up-front. Both list and get operations validate
independently so a maliciously-crafted file descriptor cannot bypass
the SSRF check.
Cloud archive semantics
The source executor’s successFolder / failureFolder config is
honored across backends with backend-appropriate semantics:
| Backend | Archive operation |
|---|
local_fs | fs.rename (atomic on POSIX within the same filesystem) |
s3 | server-side copy + delete |
gcs | bucket-file copy + delete |
azure_blob | blob copy-from-URL (poll until done) + delete |
sftp | native rename over SFTP |
http_pull | no-op (read-only external; archive folders are ignored) |
Duplicate-key handling on the destination: every backend appends a
.dup-${ts} suffix when the target already exists, mirroring
the local-filesystem atomic-staging behavior.
For http_pull, the IR atomicity.successFolder/failureFolder config
is still required for schema validation parity but its values are
ignored at runtime. A debug log emits “http_pull archive: no-op
(read-only external source)” so operators can confirm the no-op was
reached.
Pattern types
{ "pattern": { "type": "glob", "value": "orders_*.csv" } }
{ "pattern": { "type": "regex", "value": "^events_[0-9]{4}\\.json$" } }
{ "pattern": { "type": "date_template", "value": "log_{YYYY-MM-DD}.jsonl",
"timezone": "America/Los_Angeles" } }
Date-template tokens
| Token | Meaning |
|---|
{YYYY} | 4-digit year |
{MM} | 2-digit month |
{DD} | 2-digit day |
{HH} | 2-digit hour (24h) |
{mm} | 2-digit minute |
{ss} | 2-digit second |
{YYYY-MM-DD} | shorthand combo |
{YYYYMMDD} | compact combo |
Tokens are evaluated against the source’s configured timezone
(IANA), defaulting to UTC. After expansion, the value is compiled as a
glob (literal text + * and ? wildcards).
Ordering
| Mode | Behavior |
|---|
latest_by_mtime | mtime descending; newest first |
fifo_by_name | lexicographic ascending |
lexicographic | alias of fifo_by_name |
process_all | preserve readdir order |
The source executor processes ALL matched files in the chosen order.
Single-file mode is a future option.
Wait policy
"waitPolicy": {
"maxRetries": 6,
"intervalMinutes": 10,
"onMissAction": "alert"
}
The discover loop runs up to maxRetries times, sleeping
intervalMinutes between attempts. On exhaustion:
onMissAction | Behavior |
|---|
alert | Log a warn, emit a warning into the System Health feed (Source had no files), return rowsOut: 0; downstream sees the empty result |
skip | Log info only; return rowsOut: 0 |
fail | Throw file arrival deadline missed; the run fails |
Atomic staging
"atomicity": {
"stagingFolder": ".processing/",
"successFolder": ".archive/{YYYY-MM-DD}/",
"failureFolder": ".failed/"
}
Every matched file is renamed into stagingFolder BEFORE any byte is
read. On clean parse, files move to successFolder (date templates are
expanded at archive time, in UTC by default). On parse failure, they
move to failureFolder and the upstream error rethrows.
stagingFolder, successFolder, and failureFolder may be relative
to path or absolute. If a file with the same basename already exists
at the destination, a .dup-<unix-ms> suffix is appended.
Crash recovery
If a run crashes mid-parse, files remain in stagingFolder. A
follow-up improvement will rescan that folder first on the next run;
today the operator should manually move files back if needed.
| Format | Library | Notes |
|---|
csv | papaparse 5 | RFC-4180 quoting & escaping; default delimiter ,; header row → object keys; blank lines skipped |
json | native JSON.parse | array → rows; object → single row |
jsonl | native JSON.parse per line | line-delimited; parse error reports line number |
parquet | hyparquet | pure JS reader; up to ~50 MB per file; binary read mode (Buffer, not utf8) |
avro | avsc | schema-embedded container files only; nested records become nested JS objects |
orc | (not implemented) | throws ORC parsing requires the optional @kaireonai/orc-native add-on package, which is not yet published. Configure your pipeline to convert ORC → parquet upstream until then. |
tsv | papaparse 5 (delimiter: "\t") | RFC-4180-style quoting; same engine as CSV |
xml | fast-xml-parser 5 | configurable recordPath (default: every direct child of root); attribute keys prefixed with @_; text nodes keyed #text |
XML recordPath
Specify which XML elements become rows via the recordPath config field on the source node:
# Atom-style feed: parse <feed>/<entry> elements
recordPath: "/feed/entry"
When recordPath is omitted (or set to *), every direct child of the root element becomes a row.
Error model
| Where | Trigger | Effect |
|---|
resolvePattern | malformed regex | run aborts with the original pattern |
discoverFiles | dir not found | ENOENT propagates (operator-visible) |
waitForFiles | exhausted + onMissAction=fail | run fails with file arrival deadline missed |
stageFiles | rename error | rethrows; downstream archive routes to failureFolder |
parseByFormat | malformed input | run fails with <format> parse error: ...; original error preserved on error.cause; files moved to failureFolder |
parseByFormat (orc) | always | throws the explicit “ORC parsing requires…” message; not silent; files moved to failureFolder |