Load Actions

Concept

What

A Load action reads data from a source — files, a Delta table, a SQL query, an RDBMS, a Python function, a Kafka topic, or a custom PySpark DataSource — and exposes the result as a temporary view inside a FlowGroup. Downstream Transform and Write actions reference the view by its target name. Each Load action declares one source via the source.type field; the rest of source carries the type-specific configuration.

When

Every FlowGroup that produces data starts with a Load action. Pick the sub-type by how the source is delivered:

Sub-type

Use when…

cloudfiles

Files arrive in object storage (S3, ADLS, GCS, Unity Catalog volumes); incremental ingestion with checkpoints and schema evolution. Streaming only.

delta

Reading an existing Delta table or its Change Data Feed (CDF). Batch or streaming.

sql

An arbitrary SQL query materialised as a temporary view.

jdbc

Pulling from an external RDBMS; credentials via Databricks secrets.

python

The format is not covered by a built-in sub-type, or you need custom pre-processing in Python before the flow sees the data.

kafka

Streaming from Apache Kafka, AWS MSK, or Azure Event Hubs.

custom_datasource

You have a PySpark DataSource implementation and want LHP to register and invoke it.

Minimum example

The smallest working Load action reads a Delta table and exposes it as a temporary view:

pipelines/bronze/customer.yaml
actions:
  - name: customer_raw_load
    type: load
    readMode: stream
    source:
      type: delta
      catalog: "${catalog}"
      schema: "${raw_schema}"
      table: customer
    target: v_customer_raw

The reference body below documents every sub-type and every option.

Reference

LHP supports seven Load sub-types: delta, cloudfiles, sql, jdbc, python, kafka, and custom_datasource. Additional sources arrive through the plugin mechanism.

delta

Use when reading an existing Delta table or its Change Data Feed (CDF). Batch or streaming.

Deprecated since version 0.7.8: The database field (e.g., database: "${catalog}.${schema}") is deprecated for delta sources. Use explicit catalog and schema fields instead. The old format is auto-converted with a deprecation warning. Removal in v1.0.0.

actions:
  - name: customer_raw_load
    type: load
    operational_metadata: ["_processing_timestamp"]
    readMode: stream
    source:
      type: delta
      catalog: "${catalog}"
      schema: "${raw_schema}"
      table: customer
    target: v_customer_raw
    description: "Load customer table from raw schema"

Anatomy of a delta load action

  • name: Unique name for this action within the FlowGroup

  • type: Action type - brings data into a temporary view

  • operational_metadata: Add custom metadata columns (e.g., processing timestamp)

  • readMode: Either batch or stream - translates to spark.read.table() or spark.readStream.table()

  • source:
    • type: Use Delta table as source

    • catalog: Target catalog using substitution variables

    • schema: Target schema using substitution variables

    • table: Name of the Delta table to read from

  • target: Name of the temporary view created

  • description: Optional documentation for the action

Delta load actions read from both regular Delta tables and Change Data Feed (CDC) enabled tables. Use readMode: stream for real-time processing or readMode: batch for one-time loads.

Delta Options

Delta load actions support the options field to configure Delta-specific reader options:

actions:
  - name: load_orders_cdc
    type: load
    readMode: stream
    source:
      type: delta
      catalog: "${catalog}"
      schema: "bronze"
      table: orders
      options:
        readChangeFeed: "true"
        startingVersion: "0"
        ignoreDeletes: "true"
    target: v_orders_changes
    description: "Stream order changes using Delta Change Data Feed"

Supported Delta Options:

Option

Type

Description

readChangeFeed

string/boolean

Enable Change Data Feed (stream or batch)

startingVersion

string

Starting version for CDC or time travel

startingTimestamp

string

Starting timestamp for CDC (ISO 8601 format)

endingVersion

string

Ending version for batch CDF reads

endingTimestamp

string

Ending timestamp for batch CDF reads

versionAsOf

string

Read specific table version (time travel)

timestampAsOf

string

Read table at specific timestamp (time travel)

ignoreDeletes

boolean

Ignore delete operations in CDC

skipChangeCommits

string/boolean

Skip change commits in CDC stream

maxFilesPerTrigger

number

Maximum files to process per trigger

Note

  • readChangeFeed works in both stream and batch mode. In batch mode, a starting bound (startingVersion or startingTimestamp) is required.

  • endingVersion and endingTimestamp are only valid in batch mode.

  • readChangeFeed and skipChangeCommits are mutually exclusive — one reads all changes, the other skips them.

  • readChangeFeed cannot be combined with time-travel options (versionAsOf / timestampAsOf).

  • startingVersion and startingTimestamp are mutually exclusive.

  • versionAsOf and timestampAsOf are mutually exclusive.

  • All option values are validated and cannot be None or empty strings.

Batch CDF Example:

actions:
  - name: load_order_changes
    type: load
    readMode: batch
    source:
      type: delta
      catalog: "${catalog}"
      schema: "bronze"
      table: orders
      options:
        readChangeFeed: "true"
        startingVersion: "5"
        endingVersion: "20"
    target: v_order_changes
    description: "Read order changes between version 5 and 20"

Warning

When using startingVersion, the specified version may become unavailable after VACUUM runs. Prefer startingTimestamp for durable references, or use checkpoint-managed streaming for production workloads.

readChangeFeed vs skipChangeCommits:

  • readChangeFeed: "true" — reads the Change Data Feed, exposing row-level changes (inserts, updates, deletes) with metadata columns. Use this when you need to process individual changes (e.g., CDC into a downstream table).

  • skipChangeCommits: "true" — skips commits that contain data-changing operations (useful when a table has CDF enabled but you only want the latest state, ignoring change events). Cannot be combined with readChangeFeed.

CDF Metadata Columns:

When readChangeFeed is enabled, the resulting DataFrame includes three additional columns:

  • _change_type — the type of change: insert, update_preimage, update_postimage, or delete

  • _commit_version — the Delta version of the commit

  • _commit_timestamp — the timestamp of the commit

An UPDATE operation produces two rows: one with _change_type = "update_preimage" (the old values) and one with _change_type = "update_postimage" (the new values).

If writing CDF data to a non-CDC streaming table, you should filter or drop these columns in a transform action:

SELECT * EXCEPT (_change_type, _commit_version, _commit_timestamp)
FROM stream(v_order_changes)
WHERE _change_type != 'delete'

Full Refresh Resilience:

When a Delta table undergoes a full refresh (e.g., TRUNCATE followed by reload), the CDF stream emits a large batch of delete rows followed by insert rows. This can overwhelm downstream consumers. Mitigation strategies:

  • Use ignoreDeletes: "true" if deletes are not relevant to your pipeline.

  • Use skipChangeCommits: "true" on non-CDF consumers that share the same source table.

  • For CDC targets, rely on Databricks checkpointing to handle reprocessing gracefully.

Time Travel Example:

actions:
  - name: load_customers_snapshot
    type: load
    readMode: batch
    source:
      type: delta
      catalog: "${catalog}"
      schema: "silver"
      table: customers
      options:
        versionAsOf: "10"
      where_clause: ["status = 'active'"]
      select_columns: ["customer_id", "name", "email"]
    target: v_customers_snapshot
    description: "Load customers at version 10"

See also

The above YAML translates to the following PySpark code

 1from pyspark import pipelines as dp
 2from pyspark.sql.functions import current_timestamp
 3
 4@dp.temporary_view()
 5def v_customer_raw():
 6    """Load customer table from raw schema"""
 7    df = spark.readStream.table("acmi_edw_dev.edw_raw.customer")
 8
 9    # Add operational metadata columns
10    df = df.withColumn('_processing_timestamp', current_timestamp())
11
12    return df

cloudfiles

Use when files arrive in object storage and you want incremental ingestion with checkpoints and schema evolution. Streaming only. For an end-to-end walkthrough, see Ingest with Auto Loader.

actions:
  - name: load_csv_file_from_cloudfiles
    type: load
    readMode : "stream"
    operational_metadata: ["_source_file_path","_source_file_size","_source_file_modification_time"]
    source:
      type: cloudfiles
      path: "${landing_volume}/{{ landing_folder }}/*.csv"
      format: csv
      options:
        cloudFiles.format: csv
        header: True
        delimiter: "|"
        cloudFiles.maxFilesPerTrigger: 11
        cloudFiles.inferColumnTypes: False
        cloudFiles.schemaEvolutionMode: "addNewColumns"
        cloudFiles.rescuedDataColumn: "_rescued_data"
        cloudFiles.schemaHints: "schemas/{{ table_name }}_schema.yaml"
    target: v_customer_cloudfiles
    description: "Load customer CSV files from landing volume"

Anatomy of a cloudFiles load action

  • name: Unique name for this action within the FlowGroup

  • type: Action type - brings data into a temporary view

  • readMode: must be stream (CloudFiles only supports streaming mode). This translates to spark.readStream.format("cloudFiles")

  • operational_metadata: Add custom metadata columns

  • source:
    • type: Use Databricks Auto Loader (CloudFiles)

    • path: File path pattern with substitution variables

    • format: Specify the file format as CSV, JSON, Parquet, etc.

    • schema: Path to a YAML schema file for full schema enforcement (see below)

    • options:
      • cloudFiles.format: Explicitly set CloudFiles format to CSV

      • header: First row contains column headers

      • delimiter: Use pipe character as field separator

      • cloudFiles.maxFilesPerTrigger: Limit number of files processed per trigger

      • cloudFiles.schemaHints: Schema definition for Auto Loader (supports multiple formats - see below)

  • target: Name of the temporary view created

  • description: Optional documentation for the action

cloudFiles.schemaHints Format Options

The cloudFiles.schemaHints option supports three formats, automatically detected by the framework:

Option 1: Inline DDL String (for simple schemas)

cloudFiles.schemaHints: "customer_id BIGINT, name STRING, email STRING"

Option 2: External YAML File (recommended for complex schemas with metadata)

cloudFiles.schemaHints: "schemas/customer_schema.yaml"

Option 3: External DDL/SQL File (for pre-defined DDL statements)

cloudFiles.schemaHints: "schemas/customer_schema.ddl"
# or
cloudFiles.schemaHints: "schemas/customer_schema.sql"

File Path Organization: Organize schema files in subdirectories relative to your project root:

  • Root level: "customer_schema.yaml"

  • Single directory: "schemas/customer_schema.yaml"

  • Nested subdirectories: "schemas/bronze/dimensions/customer_schema.yaml"

The framework automatically detects whether the value is an inline DDL string or a file path based on common file indicators (.yaml, .yml, .ddl, .sql, or path separators).

YAML Schema Conversion: When using YAML schema files (Option 2), the nullable field is respected during conversion to DDL:

  • Columns with nullable: false are converted to include NOT NULL constraint

  • Columns with nullable: true (or omitted, default is true) are converted without constraints

Example: A YAML column defined as {name: c_custkey, type: BIGINT, nullable: false} will generate c_custkey BIGINT NOT NULL in the schema hints.

source.schema — Full Schema Enforcement

The source.schema field provides full schema enforcement by applying a StructType schema on the DataStreamReader before .load(). This disables schema inference entirely, ensuring the data conforms exactly to the specified schema.

When to use ``schema`` vs ``schemaHints``:

  • Use schema when you want to enforce a complete, exact schema and disable inference.

  • Use schemaHints when you want to guide Auto Loader’s inference while still allowing it to discover additional columns.

actions:
  - name: load_customer_with_schema
    type: load
    readMode: stream
    source:
      type: cloudfiles
      path: "/data/customers/*.csv"
      format: csv
      schema: schemas/customer_schema.yaml
      options:
        cloudFiles.format: csv
    target: v_customer_raw
    description: "Load customer CSV with explicit schema enforcement"

The schema file uses the same YAML format as schemaHints files:

name: customer
version: "1.0"
columns:
  - name: c_custkey
    type: BIGINT
    nullable: false
  - name: c_name
    type: STRING
    nullable: true

This generates code with .schema() applied on the reader chain before .load():

customer_schema = StructType([
    StructField("c_custkey", LongType(), False),
    StructField("c_name", StringType(), True),
])

@dp.temporary_view()
def v_customer_raw():
    df = spark.readStream \
        .format("cloudFiles") \
        .schema(customer_schema) \
        .option("cloudFiles.format", "csv") \
        .load("/data/customers/*.csv")
    return df

When you provide source.schema, cloudFiles.schemaEvolutionMode defaults to none because inference is disabled. Do not combine source.schema with cloudFiles.schemaHints — these are mutually exclusive approaches.

Lakehouse Plumber uses syntax consistent with Databricks so you can transfer knowledge between the two. All options available here mirror those of Databricks Auto Loader.

See also

The above Yaml translates to the following Pyspark code

 1from pyspark import pipelines as dp
 2from pyspark.sql.functions import F
 3
 4customer_cloudfiles_schema_hints = """
 5    c_custkey BIGINT NOT NULL,
 6    c_name STRING NOT NULL,
 7    c_address STRING,
 8    c_nationkey BIGINT NOT NULL,
 9    c_phone STRING,
10    c_acctbal DECIMAL(18,2),
11    c_mktsegment STRING,
12    c_comment STRING
13""".strip().replace("\n", " ")
14
15
16@dp.temporary_view()
17def v_customer_cloudfiles():
18    """Load customer CSV files from landing volume"""
19    df = spark.readStream \
20        .format("cloudFiles") \
21        .option("cloudFiles.format", "csv") \
22        .option("header", True) \
23        .option("delimiter", "|") \
24        .option("cloudFiles.maxFilesPerTrigger", 11) \
25        .option("cloudFiles.inferColumnTypes", False) \
26        .option("cloudFiles.schemaEvolutionMode", "addNewColumns") \
27        .option("cloudFiles.rescuedDataColumn", "_rescued_data") \
28        .option("cloudFiles.schemaHints", customer_cloudfiles_schema_hints) \
29        .load("/Volumes/acmi_edw_dev/edw_raw/landing_volume/customer/*.csv")
30
31
32    # Add operational metadata columns
33    df = df.withColumn('_source_file_size', F.col('_metadata.file_size'))
34    df = df.withColumn('_source_file_modification_time', F.col('_metadata.file_modification_time'))
35    df = df.withColumn('_source_file_path', F.col('_metadata.file_path'))
36
37    return df

sql

Use when you need an arbitrary SQL query (often joins or windowed aggregates across already-loaded sources) materialised as a temporary view. SQL load actions support both inline SQL and external SQL files.

Option 1: Inline SQL

actions:
  - name: load_customer_summary
    type: load
    readMode: batch
    source:
      type: sql
      sql: |
        SELECT
          c_custkey,
          c_name,
          c_mktsegment,
          COUNT(*) as order_count,
          SUM(o_totalprice) as total_spent
        FROM ${catalog}.${raw_schema}.customer c
        LEFT JOIN ${catalog}.${raw_schema}.orders o
          ON c.c_custkey = o.o_custkey
        GROUP BY c_custkey, c_name, c_mktsegment
    target: v_customer_summary
    description: "Load customer summary with order statistics"

Option 2: External SQL File

actions:
  - name: load_customer_metrics
    type: load
    readMode: batch
    source:
      type: sql
      sql_path: "sql/customer_metrics.sql"
    target: v_customer_metrics
    description: "Load customer metrics from external SQL file"

Anatomy of an SQL load action

  • name: Unique name for this action within the FlowGroup

  • type: Action type - brings data into a temporary view

  • readMode: Either batch or stream - determines execution mode

  • source:
    • type: Use SQL query as source

    • sql: SQL statement with substitution variables for dynamic values (inline option)

    • sql_path: Path to external .sql file (external file option)

  • target: Name of the temporary view created from query results

  • description: Optional documentation for the action

See also

SQL load actions let you create complex views from multiple tables using standard SQL. Use substitution variables like ${catalog} and ${schema} for environment-specific values.

File Substitution Support

Substitution variables work in both inline SQL and external SQL files (sql_path). The same ${token} and ${secret:scope/key} syntax from YAML works in .sql files. Files are processed for substitutions before query execution.

File Organization: When using sql_path, the path is relative to your YAML file location. Common practice is to create a sql/ folder alongside your pipeline YAML files.

The above YAML examples translate to the following PySpark code

For inline SQL:

 1from pyspark import pipelines as dp
 2
 3@dp.temporary_view()
 4def v_customer_summary():
 5    """Load customer summary with order statistics"""
 6    return spark.sql("""
 7        SELECT
 8          c_custkey,
 9          c_name,
10          c_mktsegment,
11          COUNT(*) as order_count,
12          SUM(o_totalprice) as total_spent
13        FROM acmi_edw_dev.edw_raw.customer c
14        LEFT JOIN acmi_edw_dev.edw_raw.orders o
15          ON c.c_custkey = o.o_custkey
16        GROUP BY c_custkey, c_name, c_mktsegment
17    """)

For external SQL file:

 1from pyspark import pipelines as dp
 2
 3@dp.temporary_view()
 4def v_customer_metrics():
 5    """Load customer metrics from external SQL file"""
 6    return spark.sql("""
 7        -- Content from sql/customer_metrics.sql file
 8        SELECT
 9          customer_id,
10          total_orders,
11          avg_order_value,
12          last_order_date
13        FROM ${catalog}.${silver_schema}.customer_analytics
14        WHERE last_order_date >= current_date() - INTERVAL 90 DAYS
15    """)

jdbc

Use when pulling from an external RDBMS (Oracle, SQL Server, Postgres, MySQL). Credentials flow through Databricks secrets. JDBC load actions support both table queries and custom SQL queries.

Option 1: Query-based JDBC

actions:
  - name: load_external_customers
    type: load
    readMode: batch
    operational_metadata: ["_extraction_timestamp"]
    source:
      type: jdbc
      url: "jdbc:postgresql://db.example.com:5432/production"
      driver: "org.postgresql.Driver"
      user: "${secret:database/username}"
      password: "${secret:database/password}"
      query: |
        SELECT
          customer_id,
          first_name,
          last_name,
          email,
          registration_date,
          country
        FROM customers
        WHERE status = 'active'
        AND registration_date >= CURRENT_DATE - INTERVAL '7 days'
    target: v_external_customers
    description: "Load active customers from external PostgreSQL database"

Option 2: Table-based JDBC

actions:
  - name: load_external_products
    type: load
    readMode: batch
    source:
      type: jdbc
      url: "jdbc:mysql://mysql.example.com:3306/catalog"
      driver: "com.mysql.cj.jdbc.Driver"
      user: "${secret:mysql/username}"
      password: "${secret:mysql/password}"
      table: "products"
    target: v_external_products
    description: "Load products table from external MySQL database"

Anatomy of a JDBC load action

  • name: Unique name for this action within the FlowGroup

  • type: Action type - brings data into a temporary view

  • readMode: Either batch or stream - JDBC typically uses batch mode

  • operational_metadata: Add custom metadata columns (e.g., extraction timestamp)

  • source:
    • type: Use JDBC connection as source

    • url: JDBC connection string with database server details

    • driver: JDBC driver class name (database-specific)

    • user: Database username (supports secret substitution)

    • password: Database password (supports secret substitution)

    • query: Custom SQL query to execute (query option)

    • table: Table name to read entirely (table option)

  • target: Name of the temporary view created

  • description: Optional documentation for the action

See also

JDBC load actions require either a query or table field, but not both — providing both raises an error. Use secret substitution (${secret:scope/key}) for secure credential management, and ensure the appropriate JDBC driver is available on your Databricks cluster.

Secret Management: Always use ${secret:scope/key} syntax for database credentials. The framework automatically handles secret substitution during code generation.

The above YAML examples translate to the following PySpark code

For query-based JDBC:

 1from pyspark import pipelines as dp
 2from pyspark.sql.functions import current_timestamp
 3
 4@dp.temporary_view()
 5def v_external_customers():
 6    """Load active customers from external PostgreSQL database"""
 7    df = spark.read \
 8        .format("jdbc") \
 9        .option("url", "jdbc:postgresql://db.example.com:5432/production") \
10        .option("user", "{{ secret_substituted_username }}") \
11        .option("password", "{{ secret_substituted_password }}") \
12        .option("driver", "org.postgresql.Driver") \
13        .option("query", """
14            SELECT
15              customer_id,
16              first_name,
17              last_name,
18              email,
19              registration_date,
20              country
21            FROM customers
22            WHERE status = 'active'
23            AND registration_date >= CURRENT_DATE - INTERVAL '7 days'
24        """) \
25        .load()
26
27    # Add operational metadata columns
28    df = df.withColumn('_extraction_timestamp', current_timestamp())
29
30    return df

For table-based JDBC:

 1from pyspark import pipelines as dp
 2
 3@dp.temporary_view()
 4def v_external_products():
 5    """Load products table from external MySQL database"""
 6    df = spark.read \
 7        .format("jdbc") \
 8        .option("url", "jdbc:mysql://mysql.example.com:3306/catalog") \
 9        .option("user", "{{ secret_substituted_username }}") \
10        .option("password", "{{ secret_substituted_password }}") \
11        .option("driver", "com.mysql.cj.jdbc.Driver") \
12        .option("dbtable", "products") \
13        .load()
14
15    return df

python

Use when the source format is not covered by a built-in sub-type, or you need custom pre-processing in Python before the flow sees the data. Python load actions call custom Python functions that return DataFrames.

YAML Configuration:

actions:
  - name: load_api_data
    type: load
    readMode: batch
    operational_metadata: ["_api_call_timestamp"]
    source:
      type: python
      module_path: "extractors/api_extractor.py"
      function_name: "extract_customer_data"
      parameters:
        api_endpoint: "https://api.example.com/customers"
        api_key: "${secret:apis/customer_api_key}"
        batch_size: 1000
        start_date: "2024-01-01"
    target: v_api_customers
    description: "Load customer data from external API"

Python Function (extractors/api_extractor.py):

 1import requests
 2from pyspark.sql import DataFrame
 3from pyspark.sql.types import StructType, StructField, StringType, TimestampType, IntegerType
 4
 5def extract_customer_data(spark, parameters: dict) -> DataFrame:
 6    """Extract customer data from external API.
 7
 8    Args:
 9        spark: SparkSession instance
10        parameters: Configuration parameters from YAML
11
12    Returns:
13        DataFrame: Customer data as PySpark DataFrame
14    """
15    # Extract parameters from YAML configuration
16    api_endpoint = parameters.get("api_endpoint")
17    api_key = parameters.get("api_key")
18    batch_size = parameters.get("batch_size", 1000)
19    start_date = parameters.get("start_date")
20
21    # Call external API
22    headers = {"Authorization": f"Bearer {api_key}"}
23    response = requests.get(
24        f"{api_endpoint}?start_date={start_date}&limit={batch_size}",
25        headers=headers
26    )
27    response.raise_for_status()
28
29    # Convert API response to DataFrame
30    data = response.json()["customers"]
31
32    # Define schema for the DataFrame
33    schema = StructType([
34        StructField("customer_id", IntegerType(), True),
35        StructField("first_name", StringType(), True),
36        StructField("last_name", StringType(), True),
37        StructField("email", StringType(), True),
38        StructField("registration_date", TimestampType(), True)
39    ])
40
41    # Create and return DataFrame
42    return spark.createDataFrame(data, schema)

Anatomy of a Python load action

  • name: Unique name for this action within the FlowGroup

  • type: Action type - brings data into a temporary view

  • readMode: Either batch or stream - Python actions typically use batch mode

  • operational_metadata: Add custom metadata columns

  • source:
    • type: Use Python function as source

    • module_path: Path to Python file containing the extraction function

    • function_name: Name of function to call (defaults to “get_df” if not specified)

    • parameters: Dictionary of parameters to pass to the function

  • target: Name of the temporary view created

  • description: Optional documentation for the action

See also

Python functions must accept two parameters: spark (SparkSession) and parameters (dict). The function must return a PySpark DataFrame that will be used as the view source.

File Organization: When using module_path, the path is relative to your YAML file location. Common practice is to create an extractors/ or functions/ folder alongside your pipeline YAML files.

Parameter Substitution: The parameters dictionary supports ${token} substitution for environment-specific values:

parameters:
  catalog: "${catalog}"
  table_name: "${schema}.users"
  api_endpoint: "${api_url}"
  batch_size: 1000                     # No substitution needed

All tokens are replaced with values from substitutions/{env}.yaml at generation time. Secret references (${secret:scope/key}) are converted to dbutils.secrets.get() calls.

The above YAML translates to the following PySpark code

 1from pyspark import pipelines as dp
 2from pyspark.sql.functions import current_timestamp
 3from extractors.api_extractor import extract_customer_data
 4
 5@dp.temporary_view()
 6def v_api_customers():
 7    """Load customer data from external API"""
 8    # Call the external Python function with spark and parameters
 9    parameters = {
10        "api_endpoint": "https://api.example.com/customers",
11        "api_key": "{{ secret_substituted_api_key }}",
12        "batch_size": 1000,
13        "start_date": "2024-01-01"
14    }
15    df = extract_customer_data(spark, parameters)
16
17    # Add operational metadata columns
18    df = df.withColumn('_api_call_timestamp', current_timestamp())
19
20    return df

kafka

Use when streaming from Apache Kafka, AWS Managed Streaming for Apache Kafka (MSK), or Azure Event Hubs via the Kafka protocol.

actions:
  - name: load_kafka_events
    type: load
    readMode: stream
    operational_metadata: ["_processing_timestamp"]
    source:
      type: kafka
      bootstrap_servers: "kafka1.example.com:9092,kafka2.example.com:9092"
      subscribe: "events,logs,metrics"
      options:
        startingOffsets: "latest"
        failOnDataLoss: false
        kafka.group.id: "lhp-consumer-group"
        kafka.session.timeout.ms: 30000
        kafka.ssl.truststore.location: "/path/to/truststore.jks"
        kafka.ssl.truststore.password: "${secret:scope/truststore-password}"
    target: v_kafka_events_raw
    description: "Load events from Kafka topics"

Anatomy of a Kafka load action

  • name: Unique name for this action within the FlowGroup

  • type: Action type - brings data into a temporary view

  • readMode: Must be stream - Kafka is always streaming

  • operational_metadata: Add custom metadata columns (e.g., processing timestamp)

  • source:
    • type: Use Apache Kafka as source

    • bootstrap_servers: Comma-separated list of Kafka broker addresses (host:port)

    • subscribe: Comma-separated list of topics to subscribe to (choose ONE subscription method)

    • subscribePattern: Java regex pattern for topic subscription (alternative to subscribe)

    • assign: JSON string specifying specific topic partitions (alternative to subscribe)

    • options:
      • startingOffsets: Starting offset position (earliest/latest/JSON)

      • failOnDataLoss: Whether to fail on potential data loss (default: true)

      • kafka.group.id: Consumer group ID (use with caution)

      • kafka.session.timeout.ms: Session timeout in milliseconds

      • kafka.ssl.*: SSL/TLS configuration options for secure connections

      • kafka.sasl.*: SASL authentication options

      • All other kafka.* options from Databricks Kafka connector

  • target: Name of the temporary view created

  • description: Optional documentation for the action

See also

Kafka always returns a fixed 7-column schema with binary key/value columns: key, value, topic, partition, offset, timestamp, timestampType. Explicitly deserialize the key and value columns using transform actions.

Warning

Subscription Methods: Specify exactly ONE of:

  • subscribe: Comma-separated list of specific topics

  • subscribePattern: Java regex pattern for topic names

  • assign: JSON with specific topic partitions

Using multiple subscription methods will result in an error.

The above YAML translates to the following PySpark code

 1from pyspark import pipelines as dp
 2from pyspark.sql.functions import current_timestamp
 3
 4@dp.temporary_view()
 5def v_kafka_events_raw():
 6    """Load events from Kafka topics"""
 7    df = spark.readStream \
 8        .format("kafka") \
 9        .option("kafka.bootstrap.servers", "kafka1.example.com:9092,kafka2.example.com:9092") \
10        .option("subscribe", "events,logs,metrics") \
11        .option("startingOffsets", "latest") \
12        .option("failOnDataLoss", False) \
13        .option("kafka.group.id", "lhp-consumer-group") \
14        .option("kafka.session.timeout.ms", 30000) \
15        .option("kafka.ssl.truststore.location", "/path/to/truststore.jks") \
16        .option("kafka.ssl.truststore.password", dbutils.secrets.get("scope", "truststore-password")) \
17        .load()
18
19    # Add operational metadata columns
20    df = df.withColumn('_processing_timestamp', current_timestamp())
21
22    return df

Example: Deserializing Kafka Data

Since Kafka returns binary data, you typically need a transform action to deserialize:

actions:
  # Load from Kafka (returns binary key/value)
  - name: load_kafka_events
    type: load
    readMode: stream
    source:
      type: kafka
      bootstrap_servers: "localhost:9092"
      subscribe: "events"
    target: v_kafka_events_raw

  # Deserialize and parse JSON
  - name: parse_kafka_events
    type: transform
    transform_type: sql
    source: v_kafka_events_raw
    target: v_kafka_events_parsed
    sql: |
      SELECT
        CAST(key AS STRING) as message_key,
        from_json(CAST(value AS STRING), 'event_type STRING, timestamp BIGINT, data STRING') as parsed_value,
        topic,
        partition,
        offset,
        timestamp as kafka_timestamp
      FROM $source

Advanced Authentication: AWS MSK IAM

AWS Managed Streaming for Apache Kafka (MSK) supports IAM authentication for secure, credential-free access.

Prerequisites:

  1. AWS MSK cluster configured with IAM authentication enabled

  2. Databricks cluster with IAM role/instance profile with MSK permissions

  3. IAM policy granting kafka-cluster:Connect, kafka-cluster:DescribeCluster, and topic/group permissions

YAML Configuration:

actions:
  - name: load_msk_orders
    type: load
    readMode: stream
    source:
      type: kafka
      bootstrap_servers: "b-1.msk-cluster.abc123.kafka.us-east-1.amazonaws.com:9098"
      subscribe: "orders"
      options:
        kafka.security.protocol: "SASL_SSL"
        kafka.sasl.mechanism: "AWS_MSK_IAM"
        kafka.sasl.jaas.config: "shadedmskiam.software.amazon.msk.auth.iam.IAMLoginModule required;"
        kafka.sasl.client.callback.handler.class: "shadedmskiam.software.amazon.msk.auth.iam.IAMClientCallbackHandler"
        startingOffsets: "earliest"
        failOnDataLoss: "false"
    target: v_msk_orders_raw
    description: "Load orders from MSK using IAM authentication"

With Specific IAM Role:

options:
  kafka.security.protocol: "SASL_SSL"
  kafka.sasl.mechanism: "AWS_MSK_IAM"
  kafka.sasl.jaas.config: 'shadedmskiam.software.amazon.msk.auth.iam.IAMLoginModule required awsRoleArn="${msk_role_arn}";'
  kafka.sasl.client.callback.handler.class: "shadedmskiam.software.amazon.msk.auth.iam.IAMClientCallbackHandler"

Generated PySpark Code:

 1from pyspark import pipelines as dp
 2
 3@dp.temporary_view()
 4def v_msk_orders_raw():
 5    """Load orders from MSK using IAM authentication"""
 6    df = spark.readStream \
 7        .format("kafka") \
 8        .option("kafka.bootstrap.servers", "b-1.msk-cluster.abc123.kafka.us-east-1.amazonaws.com:9098") \
 9        .option("subscribe", "orders") \
10        .option("kafka.security.protocol", "SASL_SSL") \
11        .option("kafka.sasl.mechanism", "AWS_MSK_IAM") \
12        .option("kafka.sasl.jaas.config", "shadedmskiam.software.amazon.msk.auth.iam.IAMLoginModule required;") \
13        .option("kafka.sasl.client.callback.handler.class", "shadedmskiam.software.amazon.msk.auth.iam.IAMClientCallbackHandler") \
14        .option("startingOffsets", "earliest") \
15        .option("failOnDataLoss", "false") \
16        .load()
17
18    return df

See also

For complete MSK IAM documentation see AWS MSK IAM Access Control.

MSK IAM Requirements:

  • Use port 9098 for IAM authentication (not the standard 9092).

  • Provide all four required options: kafka.security.protocol, kafka.sasl.mechanism, kafka.sasl.jaas.config, and kafka.sasl.client.callback.handler.class.

  • Grant the IAM role appropriate kafka-cluster:* permissions.

  • Rely on IAM for authentication — no credentials are stored.

  • Ensure your Databricks cluster has network access to the MSK cluster.

Advanced Authentication: Azure Event Hubs OAuth

Azure Event Hubs provides Kafka protocol support with OAuth 2.0 authentication using Azure Active Directory.

Prerequisites:

  1. Azure Event Hubs namespace (Premium or Standard tier)

  2. Azure AD App Registration (Service Principal) with appropriate permissions

  3. Service Principal granted “Azure Event Hubs Data Receiver” role on the namespace

  4. Databricks secrets configured for client credentials

YAML Configuration:

actions:
  - name: load_event_hubs_data
    type: load
    readMode: stream
    source:
      type: kafka
      bootstrap_servers: "my-namespace.servicebus.windows.net:9093"
      subscribe: "my-event-hub"
      options:
        kafka.security.protocol: "SASL_SSL"
        kafka.sasl.mechanism: "OAUTHBEARER"
        kafka.sasl.jaas.config: 'kafkashaded.org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required clientId="${secret:azure_secrets/client_id}" clientSecret="${secret:azure_secrets/client_secret}" scope="https://${event_hubs_namespace}/.default" ssl.protocol="SSL";'
        kafka.sasl.oauthbearer.token.endpoint.url: "https://login.microsoft.com/${azure_tenant_id}/oauth2/v2.0/token"
        kafka.sasl.login.callback.handler.class: "kafkashaded.org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler"
        startingOffsets: "earliest"
    target: v_event_hubs_data_raw
    description: "Load data from Azure Event Hubs using OAuth"

Generated PySpark Code:

 1from pyspark import pipelines as dp
 2
 3@dp.temporary_view()
 4def v_event_hubs_data_raw():
 5    """Load data from Azure Event Hubs using OAuth"""
 6    df = spark.readStream \
 7        .format("kafka") \
 8        .option("kafka.bootstrap.servers", "my-namespace.servicebus.windows.net:9093") \
 9        .option("subscribe", "my-event-hub") \
10        .option("kafka.security.protocol", "SASL_SSL") \
11        .option("kafka.sasl.mechanism", "OAUTHBEARER") \
12        .option("kafka.sasl.jaas.config",
13                f'kafkashaded.org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required clientId="{dbutils.secrets.get(scope="azure-secrets", key="client_id")}" clientSecret="{dbutils.secrets.get(scope="azure-secrets", key="client_secret")}" scope="https://my-namespace.servicebus.windows.net/.default" ssl.protocol="SSL";') \
14        .option("kafka.sasl.oauthbearer.token.endpoint.url", "https://login.microsoft.com/12345678-1234-1234-1234-123456789012/oauth2/v2.0/token") \
15        .option("kafka.sasl.login.callback.handler.class", "kafkashaded.org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler") \
16        .option("startingOffsets", "earliest") \
17        .load()
18
19    return df

See also

For complete Event Hubs Kafka documentation see Azure Event Hubs for Apache Kafka.

Event Hubs OAuth Requirements:

  • Always use port 9093 for the Kafka protocol with Event Hubs.

  • Specify the Event Hubs namespace in the format <namespace>.servicebus.windows.net.

  • Match the scope in JAAS config to https://<namespace>.servicebus.windows.net/.default.

  • Provide all five required options: kafka.security.protocol, kafka.sasl.mechanism, kafka.sasl.jaas.config, kafka.sasl.oauthbearer.token.endpoint.url, and kafka.sasl.login.callback.handler.class.

  • Assign the Service Principal the “Azure Event Hubs Data Receiver” role.

  • Rely on the callback handler to refresh OAuth tokens automatically.

  • Always use secrets for client credentials — never hardcode them in YAML.

custom_datasource

Use when you have or want a PySpark DataSource implementation and want LHP to register and invoke it. Custom data source load actions use PySpark’s DataSource API to implement specialised data ingestion from APIs, custom protocols, or any external system that requires custom logic.

YAML Configuration:

actions:
  - name: load_currency_exchange
    type: load
    readMode: stream
    operational_metadata: ["_processing_timestamp"]
    source:
      type: custom_datasource
      module_path: "data_sources/currency_api_source.py"
      custom_datasource_class: "CurrencyAPIStreamingDataSource"
      options:
        apiKey: "${secret:apis/currency_key}"
        baseCurrencies: "USD,EUR,GBP"
        progressPath: "/Volumes/catalog/schema/checkpoints/"
        minCallIntervalSeconds: "300"
        workspaceUrl: "adb-XYZ.azuredatabricks.net"
    target: v_currency_bronze
    description: "Load live currency exchange rates from external API"

Custom DataSource Implementation (data_sources/currency_api_source.py):

 1from pyspark.sql.datasource import DataSource, DataSourceStreamReader, InputPartition
 2from pyspark.sql.types import StructType, StructField, StringType, DoubleType, TimestampType, BooleanType
 3from typing import Iterator, Tuple
 4import requests
 5import time
 6import json
 7
 8class CurrencyInputPartition(InputPartition):
 9    """Input partition for currency API data source"""
10    def __init__(self, start_time, end_time):
11        self.start_time = start_time
12        self.end_time = end_time
13
14class CurrencyAPIStreamingDataSource(DataSource):
15    """
16    Custom data source for live currency exchange rates.
17    Fetches data from external API with rate limiting and progress tracking.
18    """
19
20    @classmethod
21    def name(cls):
22        return "currency_api_stream"
23
24    def schema(self):
25        return """
26            base_currency string,
27            target_currency string,
28            exchange_rate double,
29            api_timestamp timestamp,
30            fetch_timestamp timestamp,
31            rate_change_1h double,
32            is_crypto boolean,
33            data_source string,
34            pipeline_run_id string
35        """
36
37    def streamReader(self, schema: StructType):
38        return CurrencyAPIStreamingReader(schema, self.options)
39
40class CurrencyAPIStreamingReader(DataSourceStreamReader):
41    """Streaming reader implementation with API calls and progress tracking"""
42
43    def __init__(self, schema, options):
44        self.schema = schema
45        self.options = options
46        self.api_key = options.get("apiKey")
47        self.base_currencies = options.get("baseCurrencies", "USD").split(",")
48        self.progress_path = options.get("progressPath")
49        self.min_interval = int(options.get("minCallIntervalSeconds", "300"))
50
51    def initialOffset(self) -> dict:
52        return {"fetch_time": int(time.time() * 1000)}
53
54    def latestOffset(self) -> dict:
55        return {"fetch_time": int(time.time() * 1000)}
56
57    def partitions(self, start: dict, end: dict):
58        return [CurrencyInputPartition(start.get("fetch_time", 0), end.get("fetch_time", 0))]
59
60    def read(self, partition) -> Iterator[Tuple]:
61        """Fetch data from external API and yield as tuples"""
62        # API call logic here
63        for base_currency in self.base_currencies:
64            # Make API calls and yield data
65            yield (base_currency, "USD", 1.0, time.time(), time.time(), 0.0, False, "API", "run_1")

Anatomy of a custom data source load action

  • name: Unique name for this action within the FlowGroup

  • type: Action type - brings data into a temporary view

  • readMode: Either batch or stream - determines if custom DataSource uses batch or stream reader

  • operational_metadata: Add custom metadata columns (e.g., processing timestamp)

  • source: Custom data source configuration
    • type: Use custom_datasource as source type

    • module_path: Path to Python file containing the custom DataSource implementation

    • custom_datasource_class: Name of the DataSource class to register and use

    • options: Dictionary of parameters passed to the DataSource (available via self.options)

  • target: Name of the temporary view created

  • description: Optional documentation for the action

See also

Custom DataSources require implementing the DataSource interface with appropriate reader methods. The framework copies your file to a custom_python_functions/ subdirectory next to the generated pipeline file and imports the class by name — the user file is not inlined into the pipeline. Use the options dictionary to pass configuration parameters from YAML to your DataSource.

File Substitution Support

Custom DataSource Python files support substitution variables:

  • Environment tokens: ${catalog}, ${api_endpoint}, ${environment}

  • Secret references: ${secret:scope/key} for API keys and credentials

Substitutions are applied to the file’s contents as it is copied to custom_python_functions/.

Key Implementation Requirements:

  • Your DataSource class must implement the name() class method returning the format name used in .format()

  • The framework uses the return value of name() method, not the class name, for the format string

  • The class is imported from the copied module; the registration call (spark.dataSource.register) runs at module load before the pipeline body

  • PySpark’s vendored cloudpickle is registered (register_pickle_by_value) so the class survives serialization to executors

File Organization: The module_path is relative to your YAML file location. Common practice is to create a data_sources/ folder alongside your pipeline YAML files.

Schema Definition: Define your schema in the schema() method using DDL string format as shown in the example. This schema should match the data structure returned by your read() method.

Import Management: The framework automatically handles import deduplication and conflict resolution. If your custom source uses wildcard imports (e.g., from pyspark.sql.functions import *), they will take precedence over alias imports, and operational metadata expressions will adapt accordingly.

The above YAML translates to the following PySpark code

The user’s data_sources/currency_api_source.py is copied verbatim into a custom_python_functions/ subdirectory beside the generated pipeline file. The pipeline file imports the class by name and registers it on the local Spark session at module load:

 1# Generated by LakehousePlumber
 2# Pipeline: unirate_api_ingestion
 3# FlowGroup: api_unirate_ingestion_bronze
 4
 5from pyspark import cloudpickle as _lhp_cloudpickle
 6from pyspark.sql import functions as F
 7from pyspark import pipelines as dp
 8from custom_python_functions.currency_api_source import CurrencyAPIStreamingDataSource
 9import custom_python_functions
10
11_lhp_cloudpickle.register_pickle_by_value(custom_python_functions)
12
13# Pipeline Configuration
14PIPELINE_ID = "unirate_api_ingestion"
15FLOWGROUP_ID = "api_unirate_ingestion_bronze"
16
17# ============================================================================
18# SOURCE VIEWS
19# ============================================================================
20
21# Try to register the custom data source
22try:
23    spark.dataSource.register(CurrencyAPIStreamingDataSource)
24except Exception:
25    pass  # Ignore if already registered
26
27@dp.temporary_view()
28def v_currency_bronze():
29    """Load live currency exchange rates from external API"""
30    df = spark.readStream \
31        .format("currency_api_stream") \
32        .option("apiKey", dbutils.secrets.get(scope='apis', key='currency_key')) \
33        .option("baseCurrencies", "USD,EUR,GBP") \
34        .option("progressPath", "/Volumes/catalog/schema/checkpoints/") \
35        .option("minCallIntervalSeconds", "300") \
36        .option("workspaceUrl", "adb-XYZ.azuredatabricks.net") \
37        .load()
38
39    # Add operational metadata columns
40    df = df.withColumn('_processing_timestamp', F.current_timestamp())
41
42    return df

The register_pickle_by_value(custom_python_functions) line registers the package with PySpark’s vendored cloudpickle so the DataSource class survives serialization when SDP ships it to the executors. (The system cloudpickle is silently inert here because PySpark uses its own bundled copy — this is the one-line fix that makes the copy-and-import pattern work across local + executor boundaries.)

See also