Skip to main content
The Flow source executor reads files into a pipeline run with five production-grade behaviors:
  1. Pattern matching — glob, regex, or date-templated globs.
  2. Ordering — latest-by-mtime, FIFO, lexicographic, or process-all.
  3. Wait policy — retry up to N times with configurable interval; act on deadline miss with alert / skip / fail.
  4. Atomic staging — every matched file is renamed into a .processing/ folder before any byte is read; on success it moves to a date-templated .archive/ folder, on failure to .failed/.
  5. Format parsers — CSV, JSON, and JSONL ship in Phase 3; Parquet, Avro, ORC, TSV, and XML throw an explicit scope-defer error rather than silently mishandle binary blobs.

Connector scope

Phase 3 implements the file-handling LOGIC for the local_fs connector. The IR schema validates seven connector kinds (s3, gcs, azure_blob, sftp, ftp, local_fs, http_pull) but only local_fs has a runtime executor today; the others throw a clear scope-defer error. Cloud-connector executors land in a dedicated follow-up phase that focuses on cloud-SDK integration + credentials + retry semantics — orthogonal to the file-handling logic shipped here.

Pattern types

{ "pattern": { "type": "glob",          "value": "orders_*.csv" } }
{ "pattern": { "type": "regex",         "value": "^events_[0-9]{4}\\.json$" } }
{ "pattern": { "type": "date_template", "value": "log_{YYYY-MM-DD}.jsonl",
                "timezone": "America/Los_Angeles" } }

Date-template tokens

TokenMeaning
{YYYY}4-digit year
{MM}2-digit month
{DD}2-digit day
{HH}2-digit hour (24h)
{mm}2-digit minute
{ss}2-digit second
{YYYY-MM-DD}shorthand combo
{YYYYMMDD}compact combo
Tokens are evaluated against the source’s configured timezone (IANA), defaulting to UTC. After expansion, the value is compiled as a glob (literal text + * and ? wildcards).

Ordering

ModeBehavior
latest_by_mtimemtime descending; newest first
fifo_by_namelexicographic ascending
lexicographicalias of fifo_by_name
process_allpreserve readdir order
The source executor processes ALL matched files in the chosen order. Single-file mode is a future option.

Wait policy

"waitPolicy": {
  "maxRetries": 6,
  "intervalMinutes": 10,
  "onMissAction": "alert"
}
The discover loop runs up to maxRetries times, sleeping intervalMinutes between attempts. On exhaustion:
onMissActionBehavior
alertLog a warn; return rowsOut: 0; downstream sees the empty result
skipLog info only; return rowsOut: 0
failThrow file arrival deadline missed; the run fails

Atomic staging

"atomicity": {
  "stagingFolder": ".processing/",
  "successFolder": ".archive/{YYYY-MM-DD}/",
  "failureFolder": ".failed/"
}
Every matched file is renamed into stagingFolder BEFORE any byte is read. On clean parse, files move to successFolder (date templates are expanded at archive time, in UTC by default). On parse failure, they move to failureFolder and the upstream error rethrows. stagingFolder, successFolder, and failureFolder may be relative to path or absolute. If a file with the same basename already exists at the destination, a .dup-<unix-ms> suffix is appended.

Crash recovery

If a run crashes mid-parse, files remain in stagingFolder. A follow-up improvement will rescan that folder first on the next run; today the operator should manually move files back if needed.

Formats

FormatPhase 3 status
csvimplemented (header + comma-split; blank trailing lines ignored)
jsonimplemented (array → rows; object → single row)
jsonlimplemented (line-delimited; parse error reports the line number)
parquet / avro / orc / tsv / xmlscope-defer error at runtime

Error model

WhereTriggerEffect
resolvePatternmalformed regexrun aborts with the original pattern
discoverFilesdir not foundENOENT propagates (operator-visible)
waitForFilesexhausted + onMissAction=failrun fails with file arrival deadline missed
stageFilesrename errorrethrows; downstream archive routes to failureFolder
parseByFormatmalformed inputrun fails; files moved to failureFolder
Unsupported formatparquet etc.scope-defer error