Pipeline Patterns¶
Reusable patterns for common data engineering scenarios encountered in production Lakehouse Plumber projects. Each pattern includes the YAML configuration, generated Python output, and relevant caveats.
See also
Test Result Reporting (Publishing) for the data quality test result reporting pattern (publish to Azure DevOps, Delta tables, or a custom provider).
Pattern decision table¶
Pick the pattern that matches your task, then jump to its section for the YAML, the generated Python, and the caveats.
Pattern |
Use this when… |
Jump to |
|---|---|---|
Multi-Source Ingestion (Fan-In) |
You consolidate the same schema from multiple sources (regions, buckets, accounts) into one streaming table and want each source on its own checkpoint. |
|
CloudFiles Path Filtering |
You exclude specific paths, directories, or file names from a CloudFiles
Auto Loader load. Covers glob patterns, post-load SQL filters, and
|
|
ACMI Retail Demo |
You want a complete reference project showing Bronze, Silver, and Gold layers, Change Data Capture (CDC), expectations, templates, presets, and environment substitutions end-to-end. |
|
Multi-Flowgroup Files |
You repeat similar flowgroups (one per table, one per source system) and want to collapse many YAML files into a single template-driven file. |
|
Local Variables |
You repeat the same value (entity name, table name, file path) across actions inside one flowgroup and want a single source of truth. |
|
Sink Examples |
You push data out of the lakehouse — Delta to an external catalog, Kafka topics, Azure Event Hubs, or a custom REST endpoint. |
|
More Examples (Coming Soon) |
You are looking for patterns not yet documented (JDBC on-prem ingestion, incremental snapshots, Pandas-UDF transforms). Track upcoming additions or contribute one. |
Multi-Source Ingestion (Fan-In)¶
Consolidate data from multiple sources into a single :term:`streaming table <Streaming table>` — for example, the same log schema arriving from S3 buckets in different regions or accounts.
LHP natively supports multiple write actions targeting the same streaming table. It
generates a single dp.create_streaming_table() call with multiple
@dp.append_flow() functions — one per source. Each append flow gets its own
independent checkpoint, so if one source has issues the others continue processing
normally.
The key is the create_table flag: the first write action creates the table
(create_table: true, the default), and subsequent writes append to it
(create_table: false).
Configuration¶
1pipeline: raw_ingestions
2flowgroup: logs_multi_region
3
4actions:
5 # Load from US bucket
6 - name: load_logs_us
7 type: load
8 readMode: stream
9 source:
10 type: cloudfiles
11 path: "s3://my-bucket-us-east-1/logs/*.parquet"
12 format: parquet
13 options:
14 cloudFiles.maxFilesPerTrigger: 100
15 target: v_logs_us
16
17 # Load from EU bucket
18 - name: load_logs_eu
19 type: load
20 readMode: stream
21 source:
22 type: cloudfiles
23 path: "s3://my-bucket-eu-west-1/logs/*.parquet"
24 format: parquet
25 options:
26 cloudFiles.maxFilesPerTrigger: 100
27 target: v_logs_eu
28
29 # First write — creates the table
30 - name: write_logs_us
31 type: write
32 source: v_logs_us
33 write_target:
34 type: streaming_table
35 database: "${catalog}.${bronze_schema}"
36 table: unified_logs
37 create_table: true
38 description: "Write US region logs to unified table"
39
40 # Second write — appends to the same table
41 - name: write_logs_eu
42 type: write
43 source: v_logs_eu
44 write_target:
45 type: streaming_table
46 database: "${catalog}.${bronze_schema}"
47 table: unified_logs
48 create_table: false
49 description: "Append EU region logs to unified table"
Generated Output¶
1from pyspark import pipelines as dp
2
3@dp.temporary_view()
4def v_logs_us():
5 """Write US region logs to unified table"""
6 df = spark.readStream \
7 .format("cloudFiles") \
8 .option("cloudFiles.format", "parquet") \
9 .option("cloudFiles.maxFilesPerTrigger", 100) \
10 .load("s3://my-bucket-us-east-1/logs/*.parquet")
11 return df
12
13@dp.temporary_view()
14def v_logs_eu():
15 """Append EU region logs to unified table"""
16 df = spark.readStream \
17 .format("cloudFiles") \
18 .option("cloudFiles.format", "parquet") \
19 .option("cloudFiles.maxFilesPerTrigger", 100) \
20 .load("s3://my-bucket-eu-west-1/logs/*.parquet")
21 return df
22
23# Single table creation
24dp.create_streaming_table(
25 name="catalog.bronze.unified_logs",
26 comment="Write US region logs to unified table"
27)
28
29# One append flow per source
30@dp.append_flow(target="catalog.bronze.unified_logs", name="f_unified_logs_us")
31def f_unified_logs_us():
32 """Write US region logs to unified table"""
33 df = spark.readStream.table("v_logs_us")
34 return df
35
36@dp.append_flow(target="catalog.bronze.unified_logs", name="f_unified_logs_eu")
37def f_unified_logs_eu():
38 """Append EU region logs to unified table"""
39 df = spark.readStream.table("v_logs_eu")
40 return df
Important
Each streaming table must have exactly one action with create_table: true across
the entire pipeline. Additional actions targeting the same table must use
create_table: false.
Templatising Multi-Source Ingestion¶
When you have many sources following the same pattern, combine this with templates to eliminate boilerplate. Define a template for the load+write pair and invoke it per source, each targeting the same table.
See also
Best Practices — related best-practice patterns
Write Actions for full
streaming_tablereference
CloudFiles Path Filtering¶
Three approaches to exclude specific paths, directories, or files from a CloudFiles Auto Loader pipeline — each useful in different scenarios.
Glob Patterns in the Load Path¶
LHP passes the source.path string directly to the generated .load("...") call
with no escaping or transformation. Any glob syntax that Spark and CloudFiles support
works as-is in your YAML.
Supported glob syntax (Databricks Auto Loader):
Syntax |
Meaning |
Example |
|---|---|---|
|
Match any sequence of characters |
|
|
Match any single character |
|
|
Match any character in set |
|
|
Match any character in range |
|
|
Match any character NOT in set |
|
|
Match any of the alternatives |
|
1actions:
2 - name: load_logs_excluding_day16
3 type: load
4 readMode: stream
5 source:
6 type: cloudfiles
7 path: "s3://my-bucket/data/2026/02/{0[1-9],1[0-5],1[7-9],2[0-9]}/logs/*.parquet"
8 format: parquet
9 options:
10 cloudFiles.useStrictGlobber: "true"
11 target: v_logs_raw
You can also put the glob pattern in an environment substitution token if it varies by environment:
dev:
log_path_pattern: "s3://dev-bucket/data/2026/02/{0[1-9],1[0-5],1[7-9],2[0-9]}/logs/*.parquet"
source:
path: "${log_path_pattern}"
Warning
Caveats for complex glob patterns:
Character classes inside brace alternatives (like
{0[1-9],1[0-5]}) should work at the Hadoop GlobFilter level, but this specific combination is not shown in Databricks documentation. Test in a dev environment before relying on it in production.Use
[^x]not[!x]for negation — the Unix shell[!x]syntax is not supported by Spark’s globber.**(globstar) is not documented for Auto Loader — avoid it.Notification mode (
cloudFiles.useNotifications: "true") with complex globs is undocumented territory. Directory listing mode (the default) is the safe choice for glob-heavy paths.Consider adding
cloudFiles.useStrictGlobber: "true"(DBR 12.2+) for predictable, Spark-standard globbing behaviour. The default globber is more permissive — for example,*can cross directory boundaries.
Post-Load Filter with SQL Transform¶
CloudFiles natively exposes _metadata.file_path in the loaded DataFrame. Add a
SQL transform action between the load and write to filter out unwanted paths using
full SQL regex power.
1actions:
2 - name: load_all_files
3 type: load
4 readMode: stream
5 source:
6 type: cloudfiles
7 path: "s3://my-bucket/data/"
8 format: parquet
9 target: v_raw_data
10
11 - name: filter_excluded_paths
12 type: transform
13 transform_type: sql
14 source: v_raw_data
15 target: v_filtered_data
16 sql: |
17 SELECT * FROM stream(v_raw_data)
18 WHERE NOT _metadata.file_path RLIKE '.*/exclude_[^/]+/.*'
19
20 - name: write_bronze
21 type: write
22 source: v_filtered_data
23 write_target:
24 type: streaming_table
25 database: "${catalog}.${bronze_schema}"
26 table: my_table
27 description: "Write filtered data to bronze layer"
Auto Loader still reads all files before the filter is applied. This approach works
when the excluded files are readable but contain unwanted data. If excluded files cannot
be read at all (e.g. AccessDenied), use glob patterns or pathGlobFilter instead.
pathGlobFilter Option¶
To filter by file name (not full path), use the pathGlobFilter reader option.
This is a Spark-native option that filters on the basename of each file after directory
listing.
1actions:
2 - name: load_parquet_only
3 type: load
4 readMode: stream
5 source:
6 type: cloudfiles
7 path: "s3://my-bucket/data/"
8 format: parquet
9 options:
10 pathGlobFilter: "*.parquet"
11 target: v_raw_data
pathGlobFilter filters on the filename only (basename), not the full path. The
.load() path is a prefix filter; pathGlobFilter is for suffix or name filtering.
Explicit Include List¶
Use brace expansion to explicitly list only the directories to include:
1actions:
2 - name: load_selected_sources
3 type: load
4 readMode: stream
5 source:
6 type: cloudfiles
7 path: "s3://my-bucket/data/{sales,marketing,events}/*.parquet"
8 format: parquet
9 target: v_selected_data
This generates .load("s3://my-bucket/data/{sales,marketing,events}/*.parquet") and
Auto Loader will only scan those three directories.
For truly separate paths (different buckets or unrelated prefixes), use the
multi-source ingestion pattern above — multiple
load+write actions targeting the same table with create_table: false on the
additional writes.
Choosing the Right Approach¶
Scenario |
Approach |
Notes |
|---|---|---|
Exclude specific date partitions |
Glob patterns |
Use brace expansion to enumerate included segments |
Exclude paths matching a regex |
SQL transform |
Full regex power via |
Filter by file extension or name |
|
Set in |
Include specific named directories |
Brace expansion |
Simple and explicit; documented by Databricks |
Multiple separate buckets or prefixes |
Multiple append flows |
Independent checkpoints per source |
See also
Load Actions for full CloudFiles load reference
Write Actions for
streaming_tableand append flow detailsBest Practices for related ingestion patterns
ACMI Retail Demo¶
Folder: Example_Projects/acmi
Highlights¶
Multi-format ingestion – CSV, JSON, Parquet using cloudfiles.
Bronze → Silver → Gold layers encoded as separate pipelines.
Change-Data-Feed (CDC) enabled on streaming tables.
Data-Quality Expectations (expectations/*.json).
Templates & Presets to avoid repetition.
Environment substitutions for dev, tst, prod.
Walk-through¶
# 1. Install prereqs & enter project root
pip install lakehouse-plumber
cd Example_Projects/acmi
# 2. Validate all pipelines for dev environment
lhp validate --env dev
# 3. Generate Bronze layer code (raw ingestion)
lhp generate --env dev --pipeline 01_raw_ingestion
# Check ./generated/ for Python DLT scripts
# 4. Generate Silver layer
lhp generate --env dev --pipeline 03_silver
# 5. Generate Gold analytics views
lhp generate --env dev --pipeline 04_gold
Customising the Example¶
Edit
substitutions/dev.yamlto match your catalog and storage paths.Tweak presets under
presets/(e.g., change table properties).Adjust schema hints or expectations JSON to enforce your data contract.
Multi-Flowgroup Files¶
For projects with many similar flowgroups, you can combine multiple flowgroups into a single YAML file to reduce file proliferation and improve organization.
Example: SAP Master Data (3 files → 1 file)¶
Instead of:
brand_ingestion.yamlcategory_ingestion.yamlcarrier_ingestion.yaml
Use one file sap_master_data.yaml:
pipeline: raw_ingestions_sap
use_template: TMPL003_parquet_ingestion_template
flowgroups:
- flowgroup: sap_brand_ingestion
template_parameters:
table_name: raw_sap_brand
landing_folder: brand
- flowgroup: sap_cat_ingestion
template_parameters:
table_name: raw_sap_cat
landing_folder: category
- flowgroup: sap_carrier_ingestion
template_parameters:
table_name: raw_sap_carrier
landing_folder: carrier
Result: 67% file reduction with identical functionality.
See Multi-Flowgroup YAML Files for complete documentation with inheritance rules, syntax options, migration guides, and real-world examples.
Local Variables¶
Local variables reduce repetition within a single flowgroup by defining reusable values. They use %{variable} syntax and are resolved before templates and environment substitutions.
Simple Example¶
Instead of repeating “customer” throughout your flowgroup:
pipeline: acmi_edw_bronze
flowgroup: customer_pipeline
actions:
- name: "load_customer_raw"
source:
table: "customer_raw"
target: "v_customer_raw"
- name: "customer_cleanse"
source: "v_customer_raw"
target: "v_customer_cleaned"
- name: "write_customer_bronze"
source: "v_customer_cleaned"
write_target:
table: "customer"
Use local variables to define it once:
pipeline: acmi_edw_bronze
flowgroup: customer_pipeline
variables:
entity: customer
source_table: customer_raw
target_table: customer
actions:
- name: "load_%{entity}_raw"
source:
table: "%{source_table}"
target: "v_%{entity}_raw"
- name: "%{entity}_cleanse"
source: "v_%{entity}_raw"
target: "v_%{entity}_cleaned"
- name: "write_%{entity}_bronze"
source: "v_%{entity}_cleaned"
write_target:
table: "%{target_table}"
Benefits:
Single source of truth: Change “customer” to “order” in one place
Reduced errors: No risk of inconsistent naming across actions
Better readability: Intent is clear from the variables section
Works everywhere: Inline patterns like
prefix_%{var}_suffixsupported
Real-World Example¶
Here’s a production pattern combining local variables with environment substitutions:
1pipeline: acmi_edw_bronze
2flowgroup: product_pipeline
3
4variables:
5 entity: product
6 source_table: product_raw
7 target_table: product
8 schema_file: product_schema.yaml
9
10actions:
11 - name: "load_%{entity}_raw"
12 type: load
13 operational_metadata: ["_processing_timestamp"]
14 readMode: stream
15 source:
16 type: delta
17 database: "${catalog}.${raw_schema}" # Environment token
18 table: "%{source_table}" # Local variable
19 target: "v_%{entity}_raw"
20 description: "Load %{entity} table from raw schema"
21
22 - name: "%{entity}_quality_check"
23 type: transform
24 transform_type: sql
25 source: "v_%{entity}_raw"
26 target: "v_%{entity}_validated"
27 sql_path: "sql/quality_checks/%{entity}_check.sql"
28 expectations_path: "expectations/%{entity}_expectations.json"
29
30 - name: "write_%{entity}_bronze"
31 type: write
32 source: "v_%{entity}_validated"
33 write_target:
34 type: streaming_table
35 database: "${catalog}.${bronze_schema}" # Environment token
36 table: "%{target_table}" # Local variable
37 schema_hints_path: "schemas/%{schema_file}"
Notice: Local variables (%{entity}) and environment tokens (${catalog}) work together seamlessly.
See Templates Reference for complete documentation on local variables, including:
Recursive variable definitions
Error handling for undefined variables
Interaction with templates and presets
Processing order details
Sink Examples¶
The ACME Supermarkets project includes comprehensive sink examples demonstrating data export to external systems. These examples showcase Delta, Kafka, and custom API sinks for streaming data to destinations beyond traditional DLT-managed tables.
Location: Example_Projects/acme_supermarkets_lhp/pipelines/06_sink_examples/
Delta Sink Example¶
Export aggregated sales metrics to external Unity Catalog for cross-workspace analytics.
File: 01_delta_sink_external_catalog.yaml
cd Example_Projects/acme_supermarkets_lhp
lhp generate --env dev --pipeline acme_supermarkets_sinks_pipeline
cat generated/acme_supermarkets_sinks_pipeline/delta_sink_example.py
Key features:
Aggregates silver layer data
Writes to external catalog table
Schema evolution enabled
Optimized writes for performance
Kafka Sink Example¶
Stream order fulfillment events to Kafka for real-time processing by downstream systems.
File: 02_kafka_sink_order_events.yaml
Key features:
Transforms data to Kafka key/value format using
to_json()JSON serialization of order events
Kafka headers for event metadata
Security and performance tuning configuration
Important
Kafka sinks require explicit key and value columns created in a
transform action before writing.
Azure Event Hubs Example¶
Stream inventory alerts to Azure Event Hubs using OAuth authentication.
File: 03_event_hubs_sink_inventory_alerts.yaml
Key features:
OAuth authentication with Azure Event Hubs
Kafka-compatible interface (
sink_type: kafka)Priority-based alert routing
Unity Catalog service credentials
Custom API Sink Example¶
Push customer profile updates to external CRM via REST API.
File: 04_custom_api_sink_customer_updates.yaml
Custom implementation: sinks/customer_api_sink.py
Key features:
HTTP POST with bearer token authentication
Batch processing with configurable batch size
Retry logic with exponential backoff
Dead letter queue for failed records
Comprehensive error logging
Walk-through¶
cd Example_Projects/acme_supermarkets_lhp
# Validate sink configurations
lhp validate --env dev
# Generate all sink examples
lhp generate --env dev --pipeline acme_supermarkets_sinks_pipeline
# Inspect generated Python code
cat generated/acme_supermarkets_sinks_pipeline/delta_sink_example.py
cat generated/acme_supermarkets_sinks_pipeline/kafka_sink_example.py
cat generated/acme_supermarkets_sinks_pipeline/custom_api_sink_example.py
# Deploy with Databricks bundles
databricks bundle deploy -t dev
For more details on sink configuration and options, see Write Actions.
More Examples (Coming Soon)¶
JDBC ingestion with on-prem Oracle.
Incremental snapshot tables using delta load and materialized_view write.
Python transform with Pandas-UDF cleaning.
Contributions welcome – open a PR adding a folder under Example_Projects!