Operational Metadata¶
Concept¶
What. Operational metadata columns are auto-injected provenance and lineage
columns that Lakehouse Plumber (LHP) adds to your generated tables. You define
each column once in lhp.yaml — a name, a Spark expression, and the target
table types it applies to — then opt actions in by listing the column name.
LHP appends a withColumn(...) call for every selected column to the
generated Python.
When. Enable operational metadata when you need a row-level audit trail (processing timestamp, pipeline name, run ID) or file-level lineage (source file path, modification time, size) on Bronze loads. Disable it on Silver and Gold tables when those columns no longer carry meaning downstream.
Minimum example. One preset turns on a single processing-timestamp column:
name: bronze_layer
version: "1.0"
defaults:
operational_metadata: ["_processing_timestamp"]
The column definition itself (the Spark expression behind
_processing_timestamp) lives at project level — see the reference body below.
Reference¶
Column catalogue¶
Define operational metadata columns in the project-level configuration file
under the operational_metadata key. Each column declares its Spark
expression, a description, the target types it applies to, and any extra
imports the expression needs.
1# LakehousePlumber Project Configuration
2name: my_lakehouse_project
3version: "1.0"
4
5operational_metadata:
6 columns:
7 _processing_timestamp:
8 expression: "F.current_timestamp()"
9 description: "When the record was processed by the pipeline"
10 applies_to: ["streaming_table", "materialized_view", "view"]
11
12 _source_file_path:
13 expression: "F.col('_metadata.file_path')"
14 description: "Source file path for lineage tracking"
15 applies_to: ["view"]
16
17 _record_hash:
18 expression: "F.xxhash64(*[F.col(c) for c in df.columns])"
19 description: "Hash of all record fields for change detection"
20 applies_to: ["streaming_table", "materialized_view", "view"]
21 additional_imports:
22 - "from pyspark.sql.functions import xxhash64"
23
24 _pipeline_name:
25 expression: "F.lit('${pipeline_name}')"
26 description: "Name of the processing pipeline"
27 applies_to: ["streaming_table", "materialized_view", "view"]
Target type compatibility¶
The applies_to field controls which DLT table types can use each operational metadata column.
LHP automatically filters columns based on the target type to prevent runtime errors.
Purpose of target type restrictions:
When defining operational metadata columns at the project level, the applies_to field serves as a
safeguard mechanism to protect end users from accidentally using incompatible columns in their
pipeline configurations. This is a defensive design pattern that prevents common mistakes.
Best practice for project administrators:
Set restrictive
applies_tovalues for source-specific columns (e.g., CloudFiles metadata)Use broader
applies_tovalues for universal columns (e.g., timestamps, pipeline names)This protects pipeline developers from runtime failures and provides clear usage guidance
Target types:
``view`` - Source views created by load actions (
@dp.temporary_view())``streaming_table`` - Live tables with streaming updates (
@dp.materialized_view())``materialized_view`` - Batch-computed views for analytics (
@dp.temporary_view())
Source-specific metadata limitations:
Warning
Metadata columns that depend on CloudFiles features (like
_metadata.file_path) are only available in views that load data from CloudFiles sources. These columns will cause runtime errors if used with JDBC, SQL, Delta, or custom_datasource sources.Custom data sources may provide their own metadata columns depending on their implementation, but CloudFiles-specific metadata will not be available.
See also
For complete details on file metadata columns available in Databricks CloudFiles, refer to the Databricks documentation: File Metadata Columns
Examples of source-restricted columns:
1operational_metadata:
2 columns:
3 _source_file_name:
4 expression: "F.col('_metadata.file_name')"
5 description: "Original file name with extension"
6 applies_to: ["view"] # Only views, and only CloudFiles sources
7
8 _file_modification_time:
9 expression: "F.col('_metadata.file_modification_time')"
10 description: "When the source file was last modified"
11 applies_to: ["view"] # Only views, and only CloudFiles sources
12
13 _processing_timestamp:
14 expression: "F.current_timestamp()"
15 description: "When record was processed (works everywhere)"
16 applies_to: ["streaming_table", "materialized_view", "view"]
Safe usage patterns:
1# CloudFiles load action - can use file metadata
2- name: load_files
3 type: load
4 source:
5 type: cloudfiles
6 path: "/mnt/data/*.json"
7 operational_metadata:
8 - "_source_file_name" # ✓ Available in CloudFiles
9 - "_file_modification_time" # ✓ Available in CloudFiles
10 - "_processing_timestamp" # ✓ Available everywhere
11 target: v_file_data
12
13# JDBC load action - file metadata not available
14- name: load_database
15 type: load
16 source:
17 type: jdbc
18 table: "customers"
19 operational_metadata:
20 - "_processing_timestamp" # ✓ Available everywhere
21 # DO NOT USE: "_source_file_name" would cause runtime error
22 target: v_database_data
23
24# Custom data source - metadata depends on implementation
25- name: load_api_data
26 type: load
27 module_path: "data_sources/api_source.py"
28 custom_datasource_class: "APIDataSource"
29 options:
30 api_endpoint: "https://api.example.com/data"
31 operational_metadata:
32 - "_processing_timestamp" # ✓ Available everywhere
33 # Custom metadata depends on DataSource implementation
34 target: v_api_data
Multi-level configuration¶
Operational metadata can be configured at multiple levels with additive behavior - columns from all levels are combined together.
Additive behavior: Operational metadata columns are never overridden between levels.
Instead, columns from preset + flowgroup + action levels are combined together. The
only exception is operational_metadata: false at action level, which disables all
metadata.
Preset level
1name: bronze_layer
2version: "1.0"
3
4defaults:
5 operational_metadata: ["_processing_timestamp", "_source_file_path"]
FlowGroup level
1pipeline: customer_ingestion
2flowgroup: load_customers
3presets: ["bronze_layer"]
4operational_metadata: ["_record_hash"] # Adds to preset columns
5
6actions:
7 - name: load_customer_files
8 type: load
9 source:
10 type: cloudfiles
11 path: "/mnt/landing/customers/*.json"
12 format: json
13 target: v_customers_raw
Action level
1actions:
2 - name: load_with_custom_metadata
3 type: load
4 source:
5 type: cloudfiles
6 path: "/mnt/data/*.parquet"
7 format: parquet
8 operational_metadata: # Adds to flowgroup + preset columns
9 - "_pipeline_name"
10 - "_custom_business_logic"
11 target: v_enriched_data
12
13 - name: load_without_metadata
14 type: load
15 source:
16 type: sql
17 sql: "SELECT * FROM source_table"
18 operational_metadata: false # Disables all metadata
19 target: v_clean_data
Additive behavior example:
1# Preset defines base columns
2# presets/bronze_layer.yaml
3defaults:
4 operational_metadata: ["_processing_timestamp"]
5
6# FlowGroup adds more columns
7pipeline: customer_ingestion
8flowgroup: load_customers
9operational_metadata: ["_source_file_path", "_record_hash"]
10
11actions:
12 - name: load_customer_files
13 type: load
14 source:
15 type: cloudfiles
16 path: "/mnt/data/*.json"
17 # Action adds even more columns
18 operational_metadata:
19 - "_pipeline_name"
20 - "_custom_business_logic"
21 target: v_customers_raw
22
23# Final result: ALL columns combined
24# ✓ _processing_timestamp (from preset)
25# ✓ _source_file_path (from flowgroup)
26# ✓ _record_hash (from flowgroup)
27# ✓ _pipeline_name (from action)
28# ✓ _custom_business_logic (from action)
Usage patterns¶
Enable all available columns:
operational_metadata: true
Select specific columns:
operational_metadata:
- "_processing_timestamp"
- "_source_file_path"
- "_record_hash"
Disable metadata:
operational_metadata: false
Generated Python code:
1@dp.temporary_view()
2def v_customers_raw():
3 """Load customer files from landing zone"""
4 df = spark.readStream \
5 .format("cloudFiles") \
6 .option("cloudFiles.format", "json") \
7 .load("/mnt/landing/customers/*.json")
8
9 # Add operational metadata columns
10 df = df.withColumn('_processing_timestamp', F.current_timestamp())
11 df = df.withColumn('_source_file_path', F.col('_metadata.file_path'))
12 df = df.withColumn('_record_hash', F.xxhash64(*[F.col(c) for c in df.columns]))
13
14 return df
Danger
When you add operational metadata columns to an upstream action, if your downstream action is a transformation, for example SQL transform, you need to make sure they are included in the SQL query.
Internal Implementation Note¶
The codebase maintains strict semantic separation between single and multi-document YAML files:
load_yaml_file()- For single-document files (configs, templates, presets)Validates exactly one document exists
Raises
MultiDocumentError(LHP-IO-003) for empty files or files with multiple documentsUsed for templates, presets, configs, and other single-document files
load_yaml_documents_all()- For multi-document files (flowgroup files only)Returns list of all documents
Used exclusively for flowgroup YAML files that may contain multiple flowgroups
This strict validation prevents accidental misuse and catches bugs early. If you encounter a
MultiDocumentError, the error message will guide you to the correct loading method.