Ingest with Auto Loader¶
This how-to wires Databricks Auto Loader (cloudfiles) into a Lakehouse Plumber
(LHP) load action so a FlowGroup streams files from S3, ADLS, Google Cloud Storage,
or a Unity Catalog volume into a temporary view. The page focuses on the LHP YAML;
for option semantics, follow the link at the bottom to the Databricks Auto Loader
documentation.
Minimum viable cloudfiles load¶
The smallest useful cloudfiles load action streams CSVs from a landing volume
into a temporary view named v_customer_cloudfiles.
actions:
- name: load_csv_file_from_cloudfiles
type: load
readMode: stream
operational_metadata: ["_source_file_path", "_source_file_size", "_source_file_modification_time"]
source:
type: cloudfiles
path: "${landing_volume}/{{ landing_folder }}/*.csv"
format: csv
options:
cloudFiles.format: csv
header: True
delimiter: "|"
cloudFiles.maxFilesPerTrigger: 11
cloudFiles.inferColumnTypes: False
cloudFiles.schemaEvolutionMode: "addNewColumns"
cloudFiles.rescuedDataColumn: "_rescued_data"
cloudFiles.schemaHints: "schemas/{{ table_name }}_schema.yaml"
target: v_customer_cloudfiles
description: "Load customer CSV files from landing volume"
Required keys:
readMode: stream—cloudfilesonly supports streaming. LHP raises a validation error if you setbatch.source.type: cloudfiles— selects the Auto Loader generator.source.path— a file glob.${token}substitutions and{{ param }}template parameters both work.source.format—csv,json,parquet,avro,orc,text, orbinaryFile. Default:json.source.options.cloudFiles.format— mandatory Auto Loader option. If you omit it, LHP copiessource.formatinto it during generation.
Optional but commonly set:
operational_metadata— a list of audit column names to add to the output._source_file_path,_source_file_size, and_source_file_modification_timeare the conventional file-lineage triple. See Operational Metadata for the full column catalog.Format reader options (
header,delimiter,multiline, etc.) live alongsidecloudFiles.*options in the sameoptionsblock.
Guide schema inference with schemaHints¶
Use cloudFiles.schemaHints when you want Auto Loader to infer most columns but
need to pin specific types (for example, force a string ID column to BIGINT).
LHP auto-detects three formats:
Inline DDL string — best for short schemas:
cloudFiles.schemaHints: "customer_id BIGINT, name STRING, email STRING"
External YAML file — recommended when you also want column metadata or
NOT NULL constraints:
cloudFiles.schemaHints: "schemas/customer_schema.yaml"
YAML schemas honour the nullable: false field per column and emit NOT NULL
in the generated DDL.
External DDL or SQL file — for pre-written CREATE TABLE-style DDL:
cloudFiles.schemaHints: "schemas/customer_schema.ddl"
# or .sql
LHP picks the format by file extension. Anything that is not .yaml, .yml,
.json, .ddl, or .sql is treated as an inline string.
Enforce a full schema instead of inferring¶
Set source.schema instead of cloudFiles.schemaHints when you want Auto
Loader to accept exactly the columns you declare and reject everything else.
actions:
- name: load_customer_with_schema
type: load
readMode: stream
source:
type: cloudfiles
path: "/data/customers/*.csv"
format: csv
schema: schemas/customer_schema.yaml
options:
cloudFiles.format: csv
target: v_customer_raw
description: "Load customer CSV with explicit schema enforcement"
Warning
source.schema and cloudFiles.schemaHints are mutually exclusive. LHP
raises a configuration-conflict error if you set both, and also blocks the
legacy schema_file field when either is present. With source.schema
set, leave cloudFiles.schemaEvolutionMode unset — inference is disabled,
so evolution does not apply.
Filter which files Auto Loader picks up¶
Two LHP-shaped knobs control which files enter the stream:
Prefix filter — narrow the
source.pathglob (for example,s3://bucket/data/{sales,marketing}/*.parquet). Auto Loader only lists the directories that match.Suffix or name filter — set
pathGlobFilterinsidesource.optionsto match basenames only (for example,"*.parquet").
For multi-source fan-in, brace-expansion globs, and a comparison of approaches, see Pipeline Patterns.
Reprocess files that already streamed¶
Auto Loader tracks ingested files by checkpoint; replays do not re-emit rows by default. Three options change that behaviour without re-creating the pipeline:
cloudFiles.includeExistingFiles: true— on a fresh pipeline, ingest files already present at the source path. Has no effect once the stream has initialised; flip it at first run.cloudFiles.allowOverwrites: true— re-emit a file when its content changes at the same path (overwrite-style sources).cloudFiles.cleanSourcepluscloudFiles.cleanSource.retentionDurationandcloudFiles.cleanSource.moveDestination— let Auto Loader archive or delete ingested files so the next run sees only new arrivals.
To force a full re-ingest from scratch, drop the streaming target table and rerun; LHP regenerates the pipeline Python and Auto Loader rebuilds its checkpoint.
Tune throughput and listing¶
These pass straight through source.options:
cloudFiles.maxFilesPerTrigger— cap files per micro-batch (integer).cloudFiles.maxBytesPerTrigger— cap bytes per micro-batch (string with unit, for example"10g").cloudFiles.useIncrementalListing: true— incremental directory listing on S3, ADLS Gen2, and GCS. Defaults to"auto".cloudFiles.useNotifications: true— switch from directory listing to cloud-event notifications. Requires the corresponding cloud-side setup (SNS/SQS, Event Grid, or Pub/Sub).
Recover malformed rows¶
Set cloudFiles.rescuedDataColumn (default behaviour: disabled) to capture
columns that fail type coercion as a JSON struct in a sidecar column. The example
at the top of this page uses the conventional column name _rescued_data.
Combine with a Data Quality Expectations (DQE) transform action and the
quarantine pattern to redirect rescued or failed rows to a dead-letter table;
see Quarantine Records for the DLT-friendly recipe.
See also¶
Architecture — how LHP turns this YAML into Python and where the generator and Jinja2 template live in
src/lhp/.Load Actions — full reference for every load sub-type (
cloudfiles,delta,sql,jdbc,python,kafka,custom_datasource) and every key on the action model.Pipeline Patterns — multi-source fan-in, brace-expansion globs, and
pathGlobFilterfor finer file selection.Operational Metadata — audit column catalog and FlowGroup-level inheritance rules.
Databricks Auto Loader documentation for the underlying
cloudFiles.*option semantics.