Pipeline Patterns

Reusable patterns for common data engineering scenarios encountered in production Lakehouse Plumber projects. Each pattern includes the YAML configuration, generated Python output, and relevant caveats.

See also

Test Result Reporting (Publishing) for the data quality test result reporting pattern (publish to Azure DevOps, Delta tables, or a custom provider).

Pattern decision table

Pick the pattern that matches your task, then jump to its section for the YAML, the generated Python, and the caveats.

Pattern

Use this when…

Jump to

Multi-Source Ingestion (Fan-In)

You consolidate the same schema from multiple sources (regions, buckets, accounts) into one streaming table and want each source on its own checkpoint.

Multi-Source Ingestion (Fan-In)

CloudFiles Path Filtering

You exclude specific paths, directories, or file names from a CloudFiles Auto Loader load. Covers glob patterns, post-load SQL filters, and pathGlobFilter.

CloudFiles Path Filtering

ACMI Retail Demo

You want a complete reference project showing Bronze, Silver, and Gold layers, Change Data Capture (CDC), expectations, templates, presets, and environment substitutions end-to-end.

ACMI Retail Demo

Multi-Flowgroup Files

You repeat similar flowgroups (one per table, one per source system) and want to collapse many YAML files into a single template-driven file.

Multi-Flowgroup Files

Local Variables

You repeat the same value (entity name, table name, file path) across actions inside one flowgroup and want a single source of truth.

Local Variables

Sink Examples

You push data out of the lakehouse — Delta to an external catalog, Kafka topics, Azure Event Hubs, or a custom REST endpoint.

Sink Examples

More Examples (Coming Soon)

You are looking for patterns not yet documented (JDBC on-prem ingestion, incremental snapshots, Pandas-UDF transforms). Track upcoming additions or contribute one.

More Examples (Coming Soon)

Multi-Source Ingestion (Fan-In)

Consolidate data from multiple sources into a single :term:`streaming table <Streaming table>` — for example, the same log schema arriving from S3 buckets in different regions or accounts.

LHP natively supports multiple write actions targeting the same streaming table. It generates a single dp.create_streaming_table() call with multiple @dp.append_flow() functions — one per source. Each append flow gets its own independent checkpoint, so if one source has issues the others continue processing normally.

The key is the create_table flag: the first write action creates the table (create_table: true, the default), and subsequent writes append to it (create_table: false).

Configuration

pipelines/bronze/logs_multi_region.yaml
 1pipeline: raw_ingestions
 2flowgroup: logs_multi_region
 3
 4actions:
 5  # Load from US bucket
 6  - name: load_logs_us
 7    type: load
 8    readMode: stream
 9    source:
10      type: cloudfiles
11      path: "s3://my-bucket-us-east-1/logs/*.parquet"
12      format: parquet
13      options:
14        cloudFiles.maxFilesPerTrigger: 100
15    target: v_logs_us
16
17  # Load from EU bucket
18  - name: load_logs_eu
19    type: load
20    readMode: stream
21    source:
22      type: cloudfiles
23      path: "s3://my-bucket-eu-west-1/logs/*.parquet"
24      format: parquet
25      options:
26        cloudFiles.maxFilesPerTrigger: 100
27    target: v_logs_eu
28
29  # First write — creates the table
30  - name: write_logs_us
31    type: write
32    source: v_logs_us
33    write_target:
34      type: streaming_table
35      database: "${catalog}.${bronze_schema}"
36      table: unified_logs
37      create_table: true
38    description: "Write US region logs to unified table"
39
40  # Second write — appends to the same table
41  - name: write_logs_eu
42    type: write
43    source: v_logs_eu
44    write_target:
45      type: streaming_table
46      database: "${catalog}.${bronze_schema}"
47      table: unified_logs
48      create_table: false
49    description: "Append EU region logs to unified table"

Generated Output

Generated logs_multi_region.py
 1from pyspark import pipelines as dp
 2
 3@dp.temporary_view()
 4def v_logs_us():
 5    """Write US region logs to unified table"""
 6    df = spark.readStream \
 7        .format("cloudFiles") \
 8        .option("cloudFiles.format", "parquet") \
 9        .option("cloudFiles.maxFilesPerTrigger", 100) \
10        .load("s3://my-bucket-us-east-1/logs/*.parquet")
11    return df
12
13@dp.temporary_view()
14def v_logs_eu():
15    """Append EU region logs to unified table"""
16    df = spark.readStream \
17        .format("cloudFiles") \
18        .option("cloudFiles.format", "parquet") \
19        .option("cloudFiles.maxFilesPerTrigger", 100) \
20        .load("s3://my-bucket-eu-west-1/logs/*.parquet")
21    return df
22
23# Single table creation
24dp.create_streaming_table(
25    name="catalog.bronze.unified_logs",
26    comment="Write US region logs to unified table"
27)
28
29# One append flow per source
30@dp.append_flow(target="catalog.bronze.unified_logs", name="f_unified_logs_us")
31def f_unified_logs_us():
32    """Write US region logs to unified table"""
33    df = spark.readStream.table("v_logs_us")
34    return df
35
36@dp.append_flow(target="catalog.bronze.unified_logs", name="f_unified_logs_eu")
37def f_unified_logs_eu():
38    """Append EU region logs to unified table"""
39    df = spark.readStream.table("v_logs_eu")
40    return df

Important

Each streaming table must have exactly one action with create_table: true across the entire pipeline. Additional actions targeting the same table must use create_table: false.

Templatising Multi-Source Ingestion

When you have many sources following the same pattern, combine this with templates to eliminate boilerplate. Define a template for the load+write pair and invoke it per source, each targeting the same table.

See also

CloudFiles Path Filtering

Three approaches to exclude specific paths, directories, or files from a CloudFiles Auto Loader pipeline — each useful in different scenarios.

Glob Patterns in the Load Path

LHP passes the source.path string directly to the generated .load("...") call with no escaping or transformation. Any glob syntax that Spark and CloudFiles support works as-is in your YAML.

Supported glob syntax (Databricks Auto Loader):

Syntax

Meaning

Example

*

Match any sequence of characters

/data/*.parquet

?

Match any single character

/data/file_?.csv

[abc]

Match any character in set

/data/[abc]*.csv

[a-z]

Match any character in range

/data/part_[0-9].csv

[^x]

Match any character NOT in set

/data/[^_]*.csv

{a,b,c}

Match any of the alternatives

/data/{sales,events}/

Exclude a specific day using brace expansion
 1actions:
 2  - name: load_logs_excluding_day16
 3    type: load
 4    readMode: stream
 5    source:
 6      type: cloudfiles
 7      path: "s3://my-bucket/data/2026/02/{0[1-9],1[0-5],1[7-9],2[0-9]}/logs/*.parquet"
 8      format: parquet
 9      options:
10        cloudFiles.useStrictGlobber: "true"
11    target: v_logs_raw

You can also put the glob pattern in an environment substitution token if it varies by environment:

substitutions/dev.yaml
dev:
  log_path_pattern: "s3://dev-bucket/data/2026/02/{0[1-9],1[0-5],1[7-9],2[0-9]}/logs/*.parquet"
flowgroup YAML
source:
  path: "${log_path_pattern}"

Warning

Caveats for complex glob patterns:

  • Character classes inside brace alternatives (like {0[1-9],1[0-5]}) should work at the Hadoop GlobFilter level, but this specific combination is not shown in Databricks documentation. Test in a dev environment before relying on it in production.

  • Use [^x] not [!x] for negation — the Unix shell [!x] syntax is not supported by Spark’s globber.

  • ** (globstar) is not documented for Auto Loader — avoid it.

  • Notification mode (cloudFiles.useNotifications: "true") with complex globs is undocumented territory. Directory listing mode (the default) is the safe choice for glob-heavy paths.

  • Consider adding cloudFiles.useStrictGlobber: "true" (DBR 12.2+) for predictable, Spark-standard globbing behaviour. The default globber is more permissive — for example, * can cross directory boundaries.

Post-Load Filter with SQL Transform

CloudFiles natively exposes _metadata.file_path in the loaded DataFrame. Add a SQL transform action between the load and write to filter out unwanted paths using full SQL regex power.

Filter excluded paths via SQL transform
 1actions:
 2  - name: load_all_files
 3    type: load
 4    readMode: stream
 5    source:
 6      type: cloudfiles
 7      path: "s3://my-bucket/data/"
 8      format: parquet
 9    target: v_raw_data
10
11  - name: filter_excluded_paths
12    type: transform
13    transform_type: sql
14    source: v_raw_data
15    target: v_filtered_data
16    sql: |
17      SELECT * FROM stream(v_raw_data)
18      WHERE NOT _metadata.file_path RLIKE '.*/exclude_[^/]+/.*'
19
20  - name: write_bronze
21    type: write
22    source: v_filtered_data
23    write_target:
24      type: streaming_table
25      database: "${catalog}.${bronze_schema}"
26      table: my_table
27    description: "Write filtered data to bronze layer"

Auto Loader still reads all files before the filter is applied. This approach works when the excluded files are readable but contain unwanted data. If excluded files cannot be read at all (e.g. AccessDenied), use glob patterns or pathGlobFilter instead.

pathGlobFilter Option

To filter by file name (not full path), use the pathGlobFilter reader option. This is a Spark-native option that filters on the basename of each file after directory listing.

Filter by filename pattern
 1actions:
 2  - name: load_parquet_only
 3    type: load
 4    readMode: stream
 5    source:
 6      type: cloudfiles
 7      path: "s3://my-bucket/data/"
 8      format: parquet
 9      options:
10        pathGlobFilter: "*.parquet"
11    target: v_raw_data

pathGlobFilter filters on the filename only (basename), not the full path. The .load() path is a prefix filter; pathGlobFilter is for suffix or name filtering.

Explicit Include List

Use brace expansion to explicitly list only the directories to include:

Include only specific directories
1actions:
2  - name: load_selected_sources
3    type: load
4    readMode: stream
5    source:
6      type: cloudfiles
7      path: "s3://my-bucket/data/{sales,marketing,events}/*.parquet"
8      format: parquet
9    target: v_selected_data

This generates .load("s3://my-bucket/data/{sales,marketing,events}/*.parquet") and Auto Loader will only scan those three directories.

For truly separate paths (different buckets or unrelated prefixes), use the multi-source ingestion pattern above — multiple load+write actions targeting the same table with create_table: false on the additional writes.

Choosing the Right Approach

Scenario

Approach

Notes

Exclude specific date partitions

Glob patterns

Use brace expansion to enumerate included segments

Exclude paths matching a regex

SQL transform

Full regex power via _metadata.file_path

Filter by file extension or name

pathGlobFilter

Set in source.options; filters basename only

Include specific named directories

Brace expansion

Simple and explicit; documented by Databricks

Multiple separate buckets or prefixes

Multiple append flows

Independent checkpoints per source

See also

ACMI Retail Demo

Folder: Example_Projects/acmi

Highlights

  • Multi-format ingestion – CSV, JSON, Parquet using cloudfiles.

  • Bronze → Silver → Gold layers encoded as separate pipelines.

  • Change-Data-Feed (CDC) enabled on streaming tables.

  • Data-Quality Expectations (expectations/*.json).

  • Templates & Presets to avoid repetition.

  • Environment substitutions for dev, tst, prod.

Walk-through

# 1. Install prereqs & enter project root
pip install lakehouse-plumber
cd Example_Projects/acmi

# 2. Validate all pipelines for dev environment
lhp validate --env dev

# 3. Generate Bronze layer code (raw ingestion)
lhp generate --env dev --pipeline 01_raw_ingestion

# Check ./generated/ for Python DLT scripts

# 4. Generate Silver layer
lhp generate --env dev --pipeline 03_silver

# 5. Generate Gold analytics views
lhp generate --env dev --pipeline 04_gold

Customising the Example

  1. Edit substitutions/dev.yaml to match your catalog and storage paths.

  2. Tweak presets under presets/ (e.g., change table properties).

  3. Adjust schema hints or expectations JSON to enforce your data contract.

Multi-Flowgroup Files

For projects with many similar flowgroups, you can combine multiple flowgroups into a single YAML file to reduce file proliferation and improve organization.

Example: SAP Master Data (3 files → 1 file)

Instead of:

  • brand_ingestion.yaml

  • category_ingestion.yaml

  • carrier_ingestion.yaml

Use one file sap_master_data.yaml:

pipeline: raw_ingestions_sap
use_template: TMPL003_parquet_ingestion_template

flowgroups:
  - flowgroup: sap_brand_ingestion
    template_parameters:
      table_name: raw_sap_brand
      landing_folder: brand

  - flowgroup: sap_cat_ingestion
    template_parameters:
      table_name: raw_sap_cat
      landing_folder: category

  - flowgroup: sap_carrier_ingestion
    template_parameters:
      table_name: raw_sap_carrier
      landing_folder: carrier

Result: 67% file reduction with identical functionality.

See Multi-Flowgroup YAML Files for complete documentation with inheritance rules, syntax options, migration guides, and real-world examples.

Local Variables

Local variables reduce repetition within a single flowgroup by defining reusable values. They use %{variable} syntax and are resolved before templates and environment substitutions.

Simple Example

Instead of repeating “customer” throughout your flowgroup:

Without local variables (repetitive)
pipeline: acmi_edw_bronze
flowgroup: customer_pipeline

actions:
  - name: "load_customer_raw"
    source:
      table: "customer_raw"
    target: "v_customer_raw"

  - name: "customer_cleanse"
    source: "v_customer_raw"
    target: "v_customer_cleaned"

  - name: "write_customer_bronze"
    source: "v_customer_cleaned"
    write_target:
      table: "customer"

Use local variables to define it once:

With local variables (DRY principle)
pipeline: acmi_edw_bronze
flowgroup: customer_pipeline

variables:
  entity: customer
  source_table: customer_raw
  target_table: customer

actions:
  - name: "load_%{entity}_raw"
    source:
      table: "%{source_table}"
    target: "v_%{entity}_raw"

  - name: "%{entity}_cleanse"
    source: "v_%{entity}_raw"
    target: "v_%{entity}_cleaned"

  - name: "write_%{entity}_bronze"
    source: "v_%{entity}_cleaned"
    write_target:
      table: "%{target_table}"

Benefits:

  • Single source of truth: Change “customer” to “order” in one place

  • Reduced errors: No risk of inconsistent naming across actions

  • Better readability: Intent is clear from the variables section

  • Works everywhere: Inline patterns like prefix_%{var}_suffix supported

Real-World Example

Here’s a production pattern combining local variables with environment substitutions:

pipelines/bronze/product_ingestion.yaml
 1pipeline: acmi_edw_bronze
 2flowgroup: product_pipeline
 3
 4variables:
 5  entity: product
 6  source_table: product_raw
 7  target_table: product
 8  schema_file: product_schema.yaml
 9
10actions:
11  - name: "load_%{entity}_raw"
12    type: load
13    operational_metadata: ["_processing_timestamp"]
14    readMode: stream
15    source:
16      type: delta
17      database: "${catalog}.${raw_schema}"  # Environment token
18      table: "%{source_table}"            # Local variable
19    target: "v_%{entity}_raw"
20    description: "Load %{entity} table from raw schema"
21
22  - name: "%{entity}_quality_check"
23    type: transform
24    transform_type: sql
25    source: "v_%{entity}_raw"
26    target: "v_%{entity}_validated"
27    sql_path: "sql/quality_checks/%{entity}_check.sql"
28    expectations_path: "expectations/%{entity}_expectations.json"
29
30  - name: "write_%{entity}_bronze"
31    type: write
32    source: "v_%{entity}_validated"
33    write_target:
34      type: streaming_table
35      database: "${catalog}.${bronze_schema}"  # Environment token
36      table: "%{target_table}"               # Local variable
37      schema_hints_path: "schemas/%{schema_file}"

Notice: Local variables (%{entity}) and environment tokens (${catalog}) work together seamlessly.

See Templates Reference for complete documentation on local variables, including:

  • Recursive variable definitions

  • Error handling for undefined variables

  • Interaction with templates and presets

  • Processing order details

Sink Examples

The ACME Supermarkets project includes comprehensive sink examples demonstrating data export to external systems. These examples showcase Delta, Kafka, and custom API sinks for streaming data to destinations beyond traditional DLT-managed tables.

Location: Example_Projects/acme_supermarkets_lhp/pipelines/06_sink_examples/

Delta Sink Example

Export aggregated sales metrics to external Unity Catalog for cross-workspace analytics.

File: 01_delta_sink_external_catalog.yaml

cd Example_Projects/acme_supermarkets_lhp
lhp generate --env dev --pipeline acme_supermarkets_sinks_pipeline
cat generated/acme_supermarkets_sinks_pipeline/delta_sink_example.py

Key features:

  • Aggregates silver layer data

  • Writes to external catalog table

  • Schema evolution enabled

  • Optimized writes for performance

Kafka Sink Example

Stream order fulfillment events to Kafka for real-time processing by downstream systems.

File: 02_kafka_sink_order_events.yaml

Key features:

  • Transforms data to Kafka key/value format using to_json()

  • JSON serialization of order events

  • Kafka headers for event metadata

  • Security and performance tuning configuration

Important

Kafka sinks require explicit key and value columns created in a transform action before writing.

Azure Event Hubs Example

Stream inventory alerts to Azure Event Hubs using OAuth authentication.

File: 03_event_hubs_sink_inventory_alerts.yaml

Key features:

  • OAuth authentication with Azure Event Hubs

  • Kafka-compatible interface (sink_type: kafka)

  • Priority-based alert routing

  • Unity Catalog service credentials

Custom API Sink Example

Push customer profile updates to external CRM via REST API.

File: 04_custom_api_sink_customer_updates.yaml

Custom implementation: sinks/customer_api_sink.py

Key features:

  • HTTP POST with bearer token authentication

  • Batch processing with configurable batch size

  • Retry logic with exponential backoff

  • Dead letter queue for failed records

  • Comprehensive error logging

Walk-through

cd Example_Projects/acme_supermarkets_lhp

# Validate sink configurations
lhp validate --env dev

# Generate all sink examples
lhp generate --env dev --pipeline acme_supermarkets_sinks_pipeline

# Inspect generated Python code
cat generated/acme_supermarkets_sinks_pipeline/delta_sink_example.py
cat generated/acme_supermarkets_sinks_pipeline/kafka_sink_example.py
cat generated/acme_supermarkets_sinks_pipeline/custom_api_sink_example.py

# Deploy with Databricks bundles
databricks bundle deploy -t dev

For more details on sink configuration and options, see Write Actions.

More Examples (Coming Soon)

  • JDBC ingestion with on-prem Oracle.

  • Incremental snapshot tables using delta load and materialized_view write.

  • Python transform with Pandas-UDF cleaning.

Contributions welcome – open a PR adding a folder under Example_Projects!