Operational Metadata

Concept

What. Operational metadata columns are auto-injected provenance and lineage columns that Lakehouse Plumber (LHP) adds to your generated tables. You define each column once in lhp.yaml — a name, a Spark expression, and the target table types it applies to — then opt actions in by listing the column name. LHP appends a withColumn(...) call for every selected column to the generated Python.

When. Enable operational metadata when you need a row-level audit trail (processing timestamp, pipeline name, run ID) or file-level lineage (source file path, modification time, size) on Bronze loads. Disable it on Silver and Gold tables when those columns no longer carry meaning downstream.

Minimum example. One preset turns on a single processing-timestamp column:

presets/bronze_layer.yaml
name: bronze_layer
version: "1.0"

defaults:
  operational_metadata: ["_processing_timestamp"]

The column definition itself (the Spark expression behind _processing_timestamp) lives at project level — see the reference body below.

Reference

Column catalogue

Define operational metadata columns in the project-level configuration file under the operational_metadata key. Each column declares its Spark expression, a description, the target types it applies to, and any extra imports the expression needs.

lhp.yaml - Project operational metadata configuration
 1# LakehousePlumber Project Configuration
 2name: my_lakehouse_project
 3version: "1.0"
 4
 5operational_metadata:
 6  columns:
 7    _processing_timestamp:
 8      expression: "F.current_timestamp()"
 9      description: "When the record was processed by the pipeline"
10      applies_to: ["streaming_table", "materialized_view", "view"]
11
12    _source_file_path:
13      expression: "F.col('_metadata.file_path')"
14      description: "Source file path for lineage tracking"
15      applies_to: ["view"]
16
17    _record_hash:
18      expression: "F.xxhash64(*[F.col(c) for c in df.columns])"
19      description: "Hash of all record fields for change detection"
20      applies_to: ["streaming_table", "materialized_view", "view"]
21      additional_imports:
22        - "from pyspark.sql.functions import xxhash64"
23
24    _pipeline_name:
25      expression: "F.lit('${pipeline_name}')"
26      description: "Name of the processing pipeline"
27      applies_to: ["streaming_table", "materialized_view", "view"]

Target type compatibility

The applies_to field controls which DLT table types can use each operational metadata column. LHP automatically filters columns based on the target type to prevent runtime errors.

Purpose of target type restrictions:

When defining operational metadata columns at the project level, the applies_to field serves as a safeguard mechanism to protect end users from accidentally using incompatible columns in their pipeline configurations. This is a defensive design pattern that prevents common mistakes.

Best practice for project administrators:

  • Set restrictive applies_to values for source-specific columns (e.g., CloudFiles metadata)

  • Use broader applies_to values for universal columns (e.g., timestamps, pipeline names)

  • This protects pipeline developers from runtime failures and provides clear usage guidance

Target types:

  • ``view`` - Source views created by load actions (@dp.temporary_view())

  • ``streaming_table`` - Live tables with streaming updates (@dp.materialized_view())

  • ``materialized_view`` - Batch-computed views for analytics (@dp.temporary_view())

Source-specific metadata limitations:

Warning

  • Metadata columns that depend on CloudFiles features (like _metadata.file_path) are only available in views that load data from CloudFiles sources. These columns will cause runtime errors if used with JDBC, SQL, Delta, or custom_datasource sources.

  • Custom data sources may provide their own metadata columns depending on their implementation, but CloudFiles-specific metadata will not be available.

See also

For complete details on file metadata columns available in Databricks CloudFiles, refer to the Databricks documentation: File Metadata Columns

Examples of source-restricted columns:

CloudFiles-only operational metadata
 1operational_metadata:
 2  columns:
 3    _source_file_name:
 4      expression: "F.col('_metadata.file_name')"
 5      description: "Original file name with extension"
 6      applies_to: ["view"]  # Only views, and only CloudFiles sources
 7
 8    _file_modification_time:
 9      expression: "F.col('_metadata.file_modification_time')"
10      description: "When the source file was last modified"
11      applies_to: ["view"]  # Only views, and only CloudFiles sources
12
13    _processing_timestamp:
14      expression: "F.current_timestamp()"
15      description: "When record was processed (works everywhere)"
16      applies_to: ["streaming_table", "materialized_view", "view"]

Safe usage patterns:

Source-aware metadata configuration
 1# CloudFiles load action - can use file metadata
 2- name: load_files
 3  type: load
 4  source:
 5    type: cloudfiles
 6    path: "/mnt/data/*.json"
 7  operational_metadata:
 8    - "_source_file_name"        # ✓ Available in CloudFiles
 9    - "_file_modification_time"  # ✓ Available in CloudFiles
10    - "_processing_timestamp"    # ✓ Available everywhere
11  target: v_file_data
12
13# JDBC load action - file metadata not available
14- name: load_database
15  type: load
16  source:
17    type: jdbc
18    table: "customers"
19  operational_metadata:
20    - "_processing_timestamp"    # ✓ Available everywhere
21    # DO NOT USE: "_source_file_name" would cause runtime error
22  target: v_database_data
23
24# Custom data source - metadata depends on implementation
25- name: load_api_data
26  type: load
27  module_path: "data_sources/api_source.py"
28  custom_datasource_class: "APIDataSource"
29  options:
30    api_endpoint: "https://api.example.com/data"
31  operational_metadata:
32    - "_processing_timestamp"    # ✓ Available everywhere
33    # Custom metadata depends on DataSource implementation
34  target: v_api_data

Multi-level configuration

Operational metadata can be configured at multiple levels with additive behavior - columns from all levels are combined together.

Additive behavior: Operational metadata columns are never overridden between levels. Instead, columns from preset + flowgroup + action levels are combined together. The only exception is operational_metadata: false at action level, which disables all metadata.

Preset level

presets/bronze_layer.yaml
1name: bronze_layer
2version: "1.0"
3
4defaults:
5  operational_metadata: ["_processing_timestamp", "_source_file_path"]

FlowGroup level

pipelines/customer_ingestion/load_customers.yaml
 1pipeline: customer_ingestion
 2flowgroup: load_customers
 3presets: ["bronze_layer"]
 4operational_metadata: ["_record_hash"]  # Adds to preset columns
 5
 6actions:
 7  - name: load_customer_files
 8    type: load
 9    source:
10      type: cloudfiles
11      path: "/mnt/landing/customers/*.json"
12      format: json
13    target: v_customers_raw

Action level

Action-specific metadata configuration
 1actions:
 2  - name: load_with_custom_metadata
 3    type: load
 4    source:
 5      type: cloudfiles
 6      path: "/mnt/data/*.parquet"
 7      format: parquet
 8    operational_metadata:  # Adds to flowgroup + preset columns
 9      - "_pipeline_name"
10      - "_custom_business_logic"
11    target: v_enriched_data
12
13  - name: load_without_metadata
14    type: load
15    source:
16      type: sql
17      sql: "SELECT * FROM source_table"
18            operational_metadata: false  # Disables all metadata
19     target: v_clean_data

Additive behavior example:

Complete example showing additive behavior
 1# Preset defines base columns
 2# presets/bronze_layer.yaml
 3defaults:
 4  operational_metadata: ["_processing_timestamp"]
 5
 6# FlowGroup adds more columns
 7pipeline: customer_ingestion
 8flowgroup: load_customers
 9operational_metadata: ["_source_file_path", "_record_hash"]
10
11actions:
12  - name: load_customer_files
13    type: load
14    source:
15      type: cloudfiles
16      path: "/mnt/data/*.json"
17    # Action adds even more columns
18    operational_metadata:
19      - "_pipeline_name"
20      - "_custom_business_logic"
21    target: v_customers_raw
22
23# Final result: ALL columns combined
24# ✓ _processing_timestamp      (from preset)
25# ✓ _source_file_path          (from flowgroup)
26# ✓ _record_hash               (from flowgroup)
27# ✓ _pipeline_name             (from action)
28# ✓ _custom_business_logic     (from action)

Usage patterns

Enable all available columns:

operational_metadata: true

Select specific columns:

operational_metadata:
  - "_processing_timestamp"
  - "_source_file_path"
  - "_record_hash"

Disable metadata:

operational_metadata: false

Generated Python code:

Generated DLT code with operational metadata
 1@dp.temporary_view()
 2def v_customers_raw():
 3    """Load customer files from landing zone"""
 4    df = spark.readStream \
 5        .format("cloudFiles") \
 6        .option("cloudFiles.format", "json") \
 7        .load("/mnt/landing/customers/*.json")
 8
 9    # Add operational metadata columns
10    df = df.withColumn('_processing_timestamp', F.current_timestamp())
11    df = df.withColumn('_source_file_path', F.col('_metadata.file_path'))
12    df = df.withColumn('_record_hash', F.xxhash64(*[F.col(c) for c in df.columns]))
13
14    return df

Danger

  • When you add operational metadata columns to an upstream action, if your downstream action is a transformation, for example SQL transform, you need to make sure they are included in the SQL query.

Internal Implementation Note

The codebase maintains strict semantic separation between single and multi-document YAML files:

  • load_yaml_file() - For single-document files (configs, templates, presets)

    • Validates exactly one document exists

    • Raises MultiDocumentError (LHP-IO-003) for empty files or files with multiple documents

    • Used for templates, presets, configs, and other single-document files

  • load_yaml_documents_all() - For multi-document files (flowgroup files only)

    • Returns list of all documents

    • Used exclusively for flowgroup YAML files that may contain multiple flowgroups

This strict validation prevents accidental misuse and catches bugs early. If you encounter a MultiDocumentError, the error message will guide you to the correct loading method.