Load Actions¶
Concept¶
What¶
A Load action reads data from a source — files, a Delta table, a SQL query,
an RDBMS, a Python function, a Kafka topic, or a custom PySpark DataSource —
and exposes the result as a temporary view inside a FlowGroup. Downstream
Transform and Write actions reference the view by its target name. Each
Load action declares one source via the source.type field; the rest of
source carries the type-specific configuration.
When¶
Every FlowGroup that produces data starts with a Load action. Pick the sub-type by how the source is delivered:
Sub-type |
Use when… |
|---|---|
|
Files arrive in object storage (S3, ADLS, GCS, Unity Catalog volumes); incremental ingestion with checkpoints and schema evolution. Streaming only. |
|
Reading an existing Delta table or its Change Data Feed (CDF). Batch or streaming. |
|
An arbitrary SQL query materialised as a temporary view. |
|
Pulling from an external RDBMS; credentials via Databricks secrets. |
|
The format is not covered by a built-in sub-type, or you need custom pre-processing in Python before the flow sees the data. |
|
Streaming from Apache Kafka, AWS MSK, or Azure Event Hubs. |
|
You have a PySpark |
Minimum example¶
The smallest working Load action reads a Delta table and exposes it as a temporary view:
actions:
- name: customer_raw_load
type: load
readMode: stream
source:
type: delta
catalog: "${catalog}"
schema: "${raw_schema}"
table: customer
target: v_customer_raw
The reference body below documents every sub-type and every option.
Reference¶
LHP supports seven Load sub-types: delta, cloudfiles, sql,
jdbc, python, kafka, and custom_datasource. Additional
sources arrive through the plugin mechanism.
delta¶
Use when reading an existing Delta table or its Change Data Feed (CDF). Batch or streaming.
Deprecated since version 0.7.8: The database field (e.g., database: "${catalog}.${schema}") is deprecated
for delta sources. Use explicit catalog and schema fields instead. The old
format is auto-converted with a deprecation warning. Removal in v1.0.0.
actions:
- name: customer_raw_load
type: load
operational_metadata: ["_processing_timestamp"]
readMode: stream
source:
type: delta
catalog: "${catalog}"
schema: "${raw_schema}"
table: customer
target: v_customer_raw
description: "Load customer table from raw schema"
Anatomy of a delta load action
name: Unique name for this action within the FlowGroup
type: Action type - brings data into a temporary view
operational_metadata: Add custom metadata columns (e.g., processing timestamp)
readMode: Either batch or stream - translates to
spark.read.table()orspark.readStream.table()- source:
type: Use Delta table as source
catalog: Target catalog using substitution variables
schema: Target schema using substitution variables
table: Name of the Delta table to read from
target: Name of the temporary view created
description: Optional documentation for the action
Delta load actions read from both regular Delta tables and Change Data Feed (CDC) enabled tables.
Use readMode: stream for real-time processing or readMode: batch for one-time loads.
Delta Options
Delta load actions support the options field to configure Delta-specific reader options:
actions:
- name: load_orders_cdc
type: load
readMode: stream
source:
type: delta
catalog: "${catalog}"
schema: "bronze"
table: orders
options:
readChangeFeed: "true"
startingVersion: "0"
ignoreDeletes: "true"
target: v_orders_changes
description: "Stream order changes using Delta Change Data Feed"
Supported Delta Options:
Option |
Type |
Description |
|---|---|---|
readChangeFeed |
string/boolean |
Enable Change Data Feed (stream or batch) |
startingVersion |
string |
Starting version for CDC or time travel |
startingTimestamp |
string |
Starting timestamp for CDC (ISO 8601 format) |
endingVersion |
string |
Ending version for batch CDF reads |
endingTimestamp |
string |
Ending timestamp for batch CDF reads |
versionAsOf |
string |
Read specific table version (time travel) |
timestampAsOf |
string |
Read table at specific timestamp (time travel) |
ignoreDeletes |
boolean |
Ignore delete operations in CDC |
skipChangeCommits |
string/boolean |
Skip change commits in CDC stream |
maxFilesPerTrigger |
number |
Maximum files to process per trigger |
Note
readChangeFeedworks in both stream and batch mode. In batch mode, a starting bound (startingVersionorstartingTimestamp) is required.endingVersionandendingTimestampare only valid in batch mode.readChangeFeedandskipChangeCommitsare mutually exclusive — one reads all changes, the other skips them.readChangeFeedcannot be combined with time-travel options (versionAsOf/timestampAsOf).startingVersionandstartingTimestampare mutually exclusive.versionAsOfandtimestampAsOfare mutually exclusive.All option values are validated and cannot be
Noneor empty strings.
Batch CDF Example:
actions:
- name: load_order_changes
type: load
readMode: batch
source:
type: delta
catalog: "${catalog}"
schema: "bronze"
table: orders
options:
readChangeFeed: "true"
startingVersion: "5"
endingVersion: "20"
target: v_order_changes
description: "Read order changes between version 5 and 20"
Warning
When using startingVersion, the specified version may become unavailable after
VACUUM runs. Prefer startingTimestamp for durable references, or use
checkpoint-managed streaming for production workloads.
readChangeFeed vs skipChangeCommits:
readChangeFeed: "true"— reads the Change Data Feed, exposing row-level changes (inserts, updates, deletes) with metadata columns. Use this when you need to process individual changes (e.g., CDC into a downstream table).skipChangeCommits: "true"— skips commits that contain data-changing operations (useful when a table has CDF enabled but you only want the latest state, ignoring change events). Cannot be combined with readChangeFeed.
CDF Metadata Columns:
When readChangeFeed is enabled, the resulting DataFrame includes three additional
columns:
_change_type— the type of change:insert,update_preimage,update_postimage, ordelete_commit_version— the Delta version of the commit_commit_timestamp— the timestamp of the commit
An UPDATE operation produces two rows: one with _change_type = "update_preimage"
(the old values) and one with _change_type = "update_postimage" (the new values).
If writing CDF data to a non-CDC streaming table, you should filter or drop these columns in a transform action:
SELECT * EXCEPT (_change_type, _commit_version, _commit_timestamp)
FROM stream(v_order_changes)
WHERE _change_type != 'delete'
Full Refresh Resilience:
When a Delta table undergoes a full refresh (e.g., TRUNCATE followed by reload), the
CDF stream emits a large batch of delete rows followed by insert rows. This can
overwhelm downstream consumers. Mitigation strategies:
Use
ignoreDeletes: "true"if deletes are not relevant to your pipeline.Use
skipChangeCommits: "true"on non-CDF consumers that share the same source table.For CDC targets, rely on Databricks checkpointing to handle reprocessing gracefully.
Time Travel Example:
actions:
- name: load_customers_snapshot
type: load
readMode: batch
source:
type: delta
catalog: "${catalog}"
schema: "silver"
table: customers
options:
versionAsOf: "10"
where_clause: ["status = 'active'"]
select_columns: ["customer_id", "name", "email"]
target: v_customers_snapshot
description: "Load customers at version 10"
See also
For
streamreadMode see the Databricks documentation on Change Data FeedFor time travel see Delta Time Travel
Operational metadata: Operational Metadata
The above YAML translates to the following PySpark code
1from pyspark import pipelines as dp
2from pyspark.sql.functions import current_timestamp
3
4@dp.temporary_view()
5def v_customer_raw():
6 """Load customer table from raw schema"""
7 df = spark.readStream.table("acmi_edw_dev.edw_raw.customer")
8
9 # Add operational metadata columns
10 df = df.withColumn('_processing_timestamp', current_timestamp())
11
12 return df
cloudfiles¶
Use when files arrive in object storage and you want incremental ingestion with checkpoints and schema evolution. Streaming only. For an end-to-end walkthrough, see Ingest with Auto Loader.
actions:
- name: load_csv_file_from_cloudfiles
type: load
readMode : "stream"
operational_metadata: ["_source_file_path","_source_file_size","_source_file_modification_time"]
source:
type: cloudfiles
path: "${landing_volume}/{{ landing_folder }}/*.csv"
format: csv
options:
cloudFiles.format: csv
header: True
delimiter: "|"
cloudFiles.maxFilesPerTrigger: 11
cloudFiles.inferColumnTypes: False
cloudFiles.schemaEvolutionMode: "addNewColumns"
cloudFiles.rescuedDataColumn: "_rescued_data"
cloudFiles.schemaHints: "schemas/{{ table_name }}_schema.yaml"
target: v_customer_cloudfiles
description: "Load customer CSV files from landing volume"
Anatomy of a cloudFiles load action
name: Unique name for this action within the FlowGroup
type: Action type - brings data into a temporary view
readMode: must be stream (CloudFiles only supports streaming mode). This translates to
spark.readStream.format("cloudFiles")operational_metadata: Add custom metadata columns
- source:
type: Use Databricks Auto Loader (CloudFiles)
path: File path pattern with substitution variables
format: Specify the file format as CSV, JSON, Parquet, etc.
schema: Path to a YAML schema file for full schema enforcement (see below)
- options:
cloudFiles.format: Explicitly set CloudFiles format to CSV
header: First row contains column headers
delimiter: Use pipe character as field separator
cloudFiles.maxFilesPerTrigger: Limit number of files processed per trigger
cloudFiles.schemaHints: Schema definition for Auto Loader (supports multiple formats - see below)
target: Name of the temporary view created
description: Optional documentation for the action
cloudFiles.schemaHints Format Options
The cloudFiles.schemaHints option supports three formats, automatically detected by the framework:
Option 1: Inline DDL String (for simple schemas)
cloudFiles.schemaHints: "customer_id BIGINT, name STRING, email STRING"
Option 2: External YAML File (recommended for complex schemas with metadata)
cloudFiles.schemaHints: "schemas/customer_schema.yaml"
Option 3: External DDL/SQL File (for pre-defined DDL statements)
cloudFiles.schemaHints: "schemas/customer_schema.ddl"
# or
cloudFiles.schemaHints: "schemas/customer_schema.sql"
File Path Organization: Organize schema files in subdirectories relative to your project root:
Root level:
"customer_schema.yaml"Single directory:
"schemas/customer_schema.yaml"Nested subdirectories:
"schemas/bronze/dimensions/customer_schema.yaml"
The framework automatically detects whether the value is an inline DDL string or a file path based on common file indicators (.yaml, .yml, .ddl, .sql, or path separators).
YAML Schema Conversion: When using YAML schema files (Option 2), the nullable field is respected during conversion to DDL:
Columns with
nullable: falseare converted to includeNOT NULLconstraintColumns with
nullable: true(or omitted, default is true) are converted without constraints
Example: A YAML column defined as {name: c_custkey, type: BIGINT, nullable: false} will generate c_custkey BIGINT NOT NULL in the schema hints.
source.schema — Full Schema Enforcement
The source.schema field provides full schema enforcement by applying a StructType schema
on the DataStreamReader before .load(). This disables schema inference entirely, ensuring
the data conforms exactly to the specified schema.
When to use ``schema`` vs ``schemaHints``:
Use
schemawhen you want to enforce a complete, exact schema and disable inference.Use
schemaHintswhen you want to guide Auto Loader’s inference while still allowing it to discover additional columns.
actions:
- name: load_customer_with_schema
type: load
readMode: stream
source:
type: cloudfiles
path: "/data/customers/*.csv"
format: csv
schema: schemas/customer_schema.yaml
options:
cloudFiles.format: csv
target: v_customer_raw
description: "Load customer CSV with explicit schema enforcement"
The schema file uses the same YAML format as schemaHints files:
name: customer
version: "1.0"
columns:
- name: c_custkey
type: BIGINT
nullable: false
- name: c_name
type: STRING
nullable: true
This generates code with .schema() applied on the reader chain before .load():
customer_schema = StructType([
StructField("c_custkey", LongType(), False),
StructField("c_name", StringType(), True),
])
@dp.temporary_view()
def v_customer_raw():
df = spark.readStream \
.format("cloudFiles") \
.schema(customer_schema) \
.option("cloudFiles.format", "csv") \
.load("/data/customers/*.csv")
return df
When you provide source.schema, cloudFiles.schemaEvolutionMode defaults to none
because inference is disabled. Do not combine source.schema with cloudFiles.schemaHints
— these are mutually exclusive approaches.
Lakehouse Plumber uses syntax consistent with Databricks so you can transfer knowledge between the two. All options available here mirror those of Databricks Auto Loader.
See also
For the end-to-end how-to see Ingest with Auto Loader.
For full list of options see the Databricks Auto Loader documentation.
Operational metadata: Operational Metadata
The above Yaml translates to the following Pyspark code
1from pyspark import pipelines as dp
2from pyspark.sql.functions import F
3
4customer_cloudfiles_schema_hints = """
5 c_custkey BIGINT NOT NULL,
6 c_name STRING NOT NULL,
7 c_address STRING,
8 c_nationkey BIGINT NOT NULL,
9 c_phone STRING,
10 c_acctbal DECIMAL(18,2),
11 c_mktsegment STRING,
12 c_comment STRING
13""".strip().replace("\n", " ")
14
15
16@dp.temporary_view()
17def v_customer_cloudfiles():
18 """Load customer CSV files from landing volume"""
19 df = spark.readStream \
20 .format("cloudFiles") \
21 .option("cloudFiles.format", "csv") \
22 .option("header", True) \
23 .option("delimiter", "|") \
24 .option("cloudFiles.maxFilesPerTrigger", 11) \
25 .option("cloudFiles.inferColumnTypes", False) \
26 .option("cloudFiles.schemaEvolutionMode", "addNewColumns") \
27 .option("cloudFiles.rescuedDataColumn", "_rescued_data") \
28 .option("cloudFiles.schemaHints", customer_cloudfiles_schema_hints) \
29 .load("/Volumes/acmi_edw_dev/edw_raw/landing_volume/customer/*.csv")
30
31
32 # Add operational metadata columns
33 df = df.withColumn('_source_file_size', F.col('_metadata.file_size'))
34 df = df.withColumn('_source_file_modification_time', F.col('_metadata.file_modification_time'))
35 df = df.withColumn('_source_file_path', F.col('_metadata.file_path'))
36
37 return df
sql¶
Use when you need an arbitrary SQL query (often joins or windowed aggregates across already-loaded sources) materialised as a temporary view. SQL load actions support both inline SQL and external SQL files.
Option 1: Inline SQL
actions:
- name: load_customer_summary
type: load
readMode: batch
source:
type: sql
sql: |
SELECT
c_custkey,
c_name,
c_mktsegment,
COUNT(*) as order_count,
SUM(o_totalprice) as total_spent
FROM ${catalog}.${raw_schema}.customer c
LEFT JOIN ${catalog}.${raw_schema}.orders o
ON c.c_custkey = o.o_custkey
GROUP BY c_custkey, c_name, c_mktsegment
target: v_customer_summary
description: "Load customer summary with order statistics"
Option 2: External SQL File
actions:
- name: load_customer_metrics
type: load
readMode: batch
source:
type: sql
sql_path: "sql/customer_metrics.sql"
target: v_customer_metrics
description: "Load customer metrics from external SQL file"
Anatomy of an SQL load action
name: Unique name for this action within the FlowGroup
type: Action type - brings data into a temporary view
readMode: Either batch or stream - determines execution mode
- source:
type: Use SQL query as source
sql: SQL statement with substitution variables for dynamic values (inline option)
sql_path: Path to external .sql file (external file option)
target: Name of the temporary view created from query results
description: Optional documentation for the action
See also
For SQL syntax see the Databricks SQL documentation.
Substitution variables: Substitutions & Secrets
SQL load actions let you create complex views from multiple tables using standard SQL.
Use substitution variables like ${catalog} and ${schema} for environment-specific values.
File Substitution Support
Substitution variables work in both inline SQL and external SQL files (sql_path).
The same ${token} and ${secret:scope/key} syntax from YAML works in .sql files.
Files are processed for substitutions before query execution.
File Organization: When using sql_path, the path is relative to your YAML file location.
Common practice is to create a sql/ folder alongside your pipeline YAML files.
The above YAML examples translate to the following PySpark code
For inline SQL:
1from pyspark import pipelines as dp
2
3@dp.temporary_view()
4def v_customer_summary():
5 """Load customer summary with order statistics"""
6 return spark.sql("""
7 SELECT
8 c_custkey,
9 c_name,
10 c_mktsegment,
11 COUNT(*) as order_count,
12 SUM(o_totalprice) as total_spent
13 FROM acmi_edw_dev.edw_raw.customer c
14 LEFT JOIN acmi_edw_dev.edw_raw.orders o
15 ON c.c_custkey = o.o_custkey
16 GROUP BY c_custkey, c_name, c_mktsegment
17 """)
For external SQL file:
1from pyspark import pipelines as dp
2
3@dp.temporary_view()
4def v_customer_metrics():
5 """Load customer metrics from external SQL file"""
6 return spark.sql("""
7 -- Content from sql/customer_metrics.sql file
8 SELECT
9 customer_id,
10 total_orders,
11 avg_order_value,
12 last_order_date
13 FROM ${catalog}.${silver_schema}.customer_analytics
14 WHERE last_order_date >= current_date() - INTERVAL 90 DAYS
15 """)
jdbc¶
Use when pulling from an external RDBMS (Oracle, SQL Server, Postgres, MySQL). Credentials flow through Databricks secrets. JDBC load actions support both table queries and custom SQL queries.
Option 1: Query-based JDBC
actions:
- name: load_external_customers
type: load
readMode: batch
operational_metadata: ["_extraction_timestamp"]
source:
type: jdbc
url: "jdbc:postgresql://db.example.com:5432/production"
driver: "org.postgresql.Driver"
user: "${secret:database/username}"
password: "${secret:database/password}"
query: |
SELECT
customer_id,
first_name,
last_name,
email,
registration_date,
country
FROM customers
WHERE status = 'active'
AND registration_date >= CURRENT_DATE - INTERVAL '7 days'
target: v_external_customers
description: "Load active customers from external PostgreSQL database"
Option 2: Table-based JDBC
actions:
- name: load_external_products
type: load
readMode: batch
source:
type: jdbc
url: "jdbc:mysql://mysql.example.com:3306/catalog"
driver: "com.mysql.cj.jdbc.Driver"
user: "${secret:mysql/username}"
password: "${secret:mysql/password}"
table: "products"
target: v_external_products
description: "Load products table from external MySQL database"
Anatomy of a JDBC load action
name: Unique name for this action within the FlowGroup
type: Action type - brings data into a temporary view
readMode: Either batch or stream - JDBC typically uses batch mode
operational_metadata: Add custom metadata columns (e.g., extraction timestamp)
- source:
type: Use JDBC connection as source
url: JDBC connection string with database server details
driver: JDBC driver class name (database-specific)
user: Database username (supports secret substitution)
password: Database password (supports secret substitution)
query: Custom SQL query to execute (query option)
table: Table name to read entirely (table option)
target: Name of the temporary view created
description: Optional documentation for the action
See also
For JDBC drivers see the Databricks JDBC documentation.
Secret management: Substitutions & Secrets
JDBC load actions require either a query or table field, but not both — providing
both raises an error. Use secret substitution (${secret:scope/key}) for secure credential
management, and ensure the appropriate JDBC driver is available on your Databricks cluster.
Secret Management: Always use ${secret:scope/key} syntax for database credentials.
The framework automatically handles secret substitution during code generation.
The above YAML examples translate to the following PySpark code
For query-based JDBC:
1from pyspark import pipelines as dp
2from pyspark.sql.functions import current_timestamp
3
4@dp.temporary_view()
5def v_external_customers():
6 """Load active customers from external PostgreSQL database"""
7 df = spark.read \
8 .format("jdbc") \
9 .option("url", "jdbc:postgresql://db.example.com:5432/production") \
10 .option("user", "{{ secret_substituted_username }}") \
11 .option("password", "{{ secret_substituted_password }}") \
12 .option("driver", "org.postgresql.Driver") \
13 .option("query", """
14 SELECT
15 customer_id,
16 first_name,
17 last_name,
18 email,
19 registration_date,
20 country
21 FROM customers
22 WHERE status = 'active'
23 AND registration_date >= CURRENT_DATE - INTERVAL '7 days'
24 """) \
25 .load()
26
27 # Add operational metadata columns
28 df = df.withColumn('_extraction_timestamp', current_timestamp())
29
30 return df
For table-based JDBC:
1from pyspark import pipelines as dp
2
3@dp.temporary_view()
4def v_external_products():
5 """Load products table from external MySQL database"""
6 df = spark.read \
7 .format("jdbc") \
8 .option("url", "jdbc:mysql://mysql.example.com:3306/catalog") \
9 .option("user", "{{ secret_substituted_username }}") \
10 .option("password", "{{ secret_substituted_password }}") \
11 .option("driver", "com.mysql.cj.jdbc.Driver") \
12 .option("dbtable", "products") \
13 .load()
14
15 return df
python¶
Use when the source format is not covered by a built-in sub-type, or you need custom pre-processing in Python before the flow sees the data. Python load actions call custom Python functions that return DataFrames.
YAML Configuration:
actions:
- name: load_api_data
type: load
readMode: batch
operational_metadata: ["_api_call_timestamp"]
source:
type: python
module_path: "extractors/api_extractor.py"
function_name: "extract_customer_data"
parameters:
api_endpoint: "https://api.example.com/customers"
api_key: "${secret:apis/customer_api_key}"
batch_size: 1000
start_date: "2024-01-01"
target: v_api_customers
description: "Load customer data from external API"
Python Function (extractors/api_extractor.py):
1import requests
2from pyspark.sql import DataFrame
3from pyspark.sql.types import StructType, StructField, StringType, TimestampType, IntegerType
4
5def extract_customer_data(spark, parameters: dict) -> DataFrame:
6 """Extract customer data from external API.
7
8 Args:
9 spark: SparkSession instance
10 parameters: Configuration parameters from YAML
11
12 Returns:
13 DataFrame: Customer data as PySpark DataFrame
14 """
15 # Extract parameters from YAML configuration
16 api_endpoint = parameters.get("api_endpoint")
17 api_key = parameters.get("api_key")
18 batch_size = parameters.get("batch_size", 1000)
19 start_date = parameters.get("start_date")
20
21 # Call external API
22 headers = {"Authorization": f"Bearer {api_key}"}
23 response = requests.get(
24 f"{api_endpoint}?start_date={start_date}&limit={batch_size}",
25 headers=headers
26 )
27 response.raise_for_status()
28
29 # Convert API response to DataFrame
30 data = response.json()["customers"]
31
32 # Define schema for the DataFrame
33 schema = StructType([
34 StructField("customer_id", IntegerType(), True),
35 StructField("first_name", StringType(), True),
36 StructField("last_name", StringType(), True),
37 StructField("email", StringType(), True),
38 StructField("registration_date", TimestampType(), True)
39 ])
40
41 # Create and return DataFrame
42 return spark.createDataFrame(data, schema)
Anatomy of a Python load action
name: Unique name for this action within the FlowGroup
type: Action type - brings data into a temporary view
readMode: Either batch or stream - Python actions typically use batch mode
operational_metadata: Add custom metadata columns
- source:
type: Use Python function as source
module_path: Path to Python file containing the extraction function
function_name: Name of function to call (defaults to “get_df” if not specified)
parameters: Dictionary of parameters to pass to the function
target: Name of the temporary view created
description: Optional documentation for the action
See also
For PySpark DataFrame operations see the Databricks PySpark documentation.
Custom functions: Architecture
Python functions must accept two parameters: spark (SparkSession) and parameters (dict).
The function must return a PySpark DataFrame that will be used as the view source.
File Organization: When using module_path, the path is relative to your YAML file location.
Common practice is to create an extractors/ or functions/ folder alongside your pipeline YAML files.
Parameter Substitution: The parameters dictionary supports ${token}
substitution for environment-specific values:
parameters:
catalog: "${catalog}"
table_name: "${schema}.users"
api_endpoint: "${api_url}"
batch_size: 1000 # No substitution needed
All tokens are replaced with values from substitutions/{env}.yaml at generation time.
Secret references (${secret:scope/key}) are converted to dbutils.secrets.get() calls.
The above YAML translates to the following PySpark code
1from pyspark import pipelines as dp
2from pyspark.sql.functions import current_timestamp
3from extractors.api_extractor import extract_customer_data
4
5@dp.temporary_view()
6def v_api_customers():
7 """Load customer data from external API"""
8 # Call the external Python function with spark and parameters
9 parameters = {
10 "api_endpoint": "https://api.example.com/customers",
11 "api_key": "{{ secret_substituted_api_key }}",
12 "batch_size": 1000,
13 "start_date": "2024-01-01"
14 }
15 df = extract_customer_data(spark, parameters)
16
17 # Add operational metadata columns
18 df = df.withColumn('_api_call_timestamp', current_timestamp())
19
20 return df
kafka¶
Use when streaming from Apache Kafka, AWS Managed Streaming for Apache Kafka (MSK), or Azure Event Hubs via the Kafka protocol.
actions:
- name: load_kafka_events
type: load
readMode: stream
operational_metadata: ["_processing_timestamp"]
source:
type: kafka
bootstrap_servers: "kafka1.example.com:9092,kafka2.example.com:9092"
subscribe: "events,logs,metrics"
options:
startingOffsets: "latest"
failOnDataLoss: false
kafka.group.id: "lhp-consumer-group"
kafka.session.timeout.ms: 30000
kafka.ssl.truststore.location: "/path/to/truststore.jks"
kafka.ssl.truststore.password: "${secret:scope/truststore-password}"
target: v_kafka_events_raw
description: "Load events from Kafka topics"
Anatomy of a Kafka load action
name: Unique name for this action within the FlowGroup
type: Action type - brings data into a temporary view
readMode: Must be stream - Kafka is always streaming
operational_metadata: Add custom metadata columns (e.g., processing timestamp)
- source:
type: Use Apache Kafka as source
bootstrap_servers: Comma-separated list of Kafka broker addresses (host:port)
subscribe: Comma-separated list of topics to subscribe to (choose ONE subscription method)
subscribePattern: Java regex pattern for topic subscription (alternative to subscribe)
assign: JSON string specifying specific topic partitions (alternative to subscribe)
- options:
startingOffsets: Starting offset position (earliest/latest/JSON)
failOnDataLoss: Whether to fail on potential data loss (default: true)
kafka.group.id: Consumer group ID (use with caution)
kafka.session.timeout.ms: Session timeout in milliseconds
kafka.ssl.*: SSL/TLS configuration options for secure connections
kafka.sasl.*: SASL authentication options
All other kafka.* options from Databricks Kafka connector
target: Name of the temporary view created
description: Optional documentation for the action
See also
For full list of Kafka options see the Databricks Kafka documentation.
Operational metadata: Operational Metadata
Kafka always returns a fixed 7-column schema with binary key/value columns:
key, value, topic, partition, offset, timestamp, timestampType.
Explicitly deserialize the key and value columns using transform actions.
Warning
Subscription Methods: Specify exactly ONE of:
subscribe: Comma-separated list of specific topicssubscribePattern: Java regex pattern for topic namesassign: JSON with specific topic partitions
Using multiple subscription methods will result in an error.
The above YAML translates to the following PySpark code
1from pyspark import pipelines as dp
2from pyspark.sql.functions import current_timestamp
3
4@dp.temporary_view()
5def v_kafka_events_raw():
6 """Load events from Kafka topics"""
7 df = spark.readStream \
8 .format("kafka") \
9 .option("kafka.bootstrap.servers", "kafka1.example.com:9092,kafka2.example.com:9092") \
10 .option("subscribe", "events,logs,metrics") \
11 .option("startingOffsets", "latest") \
12 .option("failOnDataLoss", False) \
13 .option("kafka.group.id", "lhp-consumer-group") \
14 .option("kafka.session.timeout.ms", 30000) \
15 .option("kafka.ssl.truststore.location", "/path/to/truststore.jks") \
16 .option("kafka.ssl.truststore.password", dbutils.secrets.get("scope", "truststore-password")) \
17 .load()
18
19 # Add operational metadata columns
20 df = df.withColumn('_processing_timestamp', current_timestamp())
21
22 return df
Example: Deserializing Kafka Data
Since Kafka returns binary data, you typically need a transform action to deserialize:
actions:
# Load from Kafka (returns binary key/value)
- name: load_kafka_events
type: load
readMode: stream
source:
type: kafka
bootstrap_servers: "localhost:9092"
subscribe: "events"
target: v_kafka_events_raw
# Deserialize and parse JSON
- name: parse_kafka_events
type: transform
transform_type: sql
source: v_kafka_events_raw
target: v_kafka_events_parsed
sql: |
SELECT
CAST(key AS STRING) as message_key,
from_json(CAST(value AS STRING), 'event_type STRING, timestamp BIGINT, data STRING') as parsed_value,
topic,
partition,
offset,
timestamp as kafka_timestamp
FROM $source
Advanced Authentication: AWS MSK IAM
AWS Managed Streaming for Apache Kafka (MSK) supports IAM authentication for secure, credential-free access.
Prerequisites:
AWS MSK cluster configured with IAM authentication enabled
Databricks cluster with IAM role/instance profile with MSK permissions
IAM policy granting
kafka-cluster:Connect,kafka-cluster:DescribeCluster, and topic/group permissions
YAML Configuration:
actions:
- name: load_msk_orders
type: load
readMode: stream
source:
type: kafka
bootstrap_servers: "b-1.msk-cluster.abc123.kafka.us-east-1.amazonaws.com:9098"
subscribe: "orders"
options:
kafka.security.protocol: "SASL_SSL"
kafka.sasl.mechanism: "AWS_MSK_IAM"
kafka.sasl.jaas.config: "shadedmskiam.software.amazon.msk.auth.iam.IAMLoginModule required;"
kafka.sasl.client.callback.handler.class: "shadedmskiam.software.amazon.msk.auth.iam.IAMClientCallbackHandler"
startingOffsets: "earliest"
failOnDataLoss: "false"
target: v_msk_orders_raw
description: "Load orders from MSK using IAM authentication"
With Specific IAM Role:
options:
kafka.security.protocol: "SASL_SSL"
kafka.sasl.mechanism: "AWS_MSK_IAM"
kafka.sasl.jaas.config: 'shadedmskiam.software.amazon.msk.auth.iam.IAMLoginModule required awsRoleArn="${msk_role_arn}";'
kafka.sasl.client.callback.handler.class: "shadedmskiam.software.amazon.msk.auth.iam.IAMClientCallbackHandler"
Generated PySpark Code:
1from pyspark import pipelines as dp
2
3@dp.temporary_view()
4def v_msk_orders_raw():
5 """Load orders from MSK using IAM authentication"""
6 df = spark.readStream \
7 .format("kafka") \
8 .option("kafka.bootstrap.servers", "b-1.msk-cluster.abc123.kafka.us-east-1.amazonaws.com:9098") \
9 .option("subscribe", "orders") \
10 .option("kafka.security.protocol", "SASL_SSL") \
11 .option("kafka.sasl.mechanism", "AWS_MSK_IAM") \
12 .option("kafka.sasl.jaas.config", "shadedmskiam.software.amazon.msk.auth.iam.IAMLoginModule required;") \
13 .option("kafka.sasl.client.callback.handler.class", "shadedmskiam.software.amazon.msk.auth.iam.IAMClientCallbackHandler") \
14 .option("startingOffsets", "earliest") \
15 .option("failOnDataLoss", "false") \
16 .load()
17
18 return df
See also
For complete MSK IAM documentation see AWS MSK IAM Access Control.
MSK IAM Requirements:
Use port 9098 for IAM authentication (not the standard 9092).
Provide all four required options:
kafka.security.protocol,kafka.sasl.mechanism,kafka.sasl.jaas.config, andkafka.sasl.client.callback.handler.class.Grant the IAM role appropriate
kafka-cluster:*permissions.Rely on IAM for authentication — no credentials are stored.
Ensure your Databricks cluster has network access to the MSK cluster.
Advanced Authentication: Azure Event Hubs OAuth
Azure Event Hubs provides Kafka protocol support with OAuth 2.0 authentication using Azure Active Directory.
Prerequisites:
Azure Event Hubs namespace (Premium or Standard tier)
Azure AD App Registration (Service Principal) with appropriate permissions
Service Principal granted “Azure Event Hubs Data Receiver” role on the namespace
Databricks secrets configured for client credentials
YAML Configuration:
actions:
- name: load_event_hubs_data
type: load
readMode: stream
source:
type: kafka
bootstrap_servers: "my-namespace.servicebus.windows.net:9093"
subscribe: "my-event-hub"
options:
kafka.security.protocol: "SASL_SSL"
kafka.sasl.mechanism: "OAUTHBEARER"
kafka.sasl.jaas.config: 'kafkashaded.org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required clientId="${secret:azure_secrets/client_id}" clientSecret="${secret:azure_secrets/client_secret}" scope="https://${event_hubs_namespace}/.default" ssl.protocol="SSL";'
kafka.sasl.oauthbearer.token.endpoint.url: "https://login.microsoft.com/${azure_tenant_id}/oauth2/v2.0/token"
kafka.sasl.login.callback.handler.class: "kafkashaded.org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler"
startingOffsets: "earliest"
target: v_event_hubs_data_raw
description: "Load data from Azure Event Hubs using OAuth"
Generated PySpark Code:
1from pyspark import pipelines as dp
2
3@dp.temporary_view()
4def v_event_hubs_data_raw():
5 """Load data from Azure Event Hubs using OAuth"""
6 df = spark.readStream \
7 .format("kafka") \
8 .option("kafka.bootstrap.servers", "my-namespace.servicebus.windows.net:9093") \
9 .option("subscribe", "my-event-hub") \
10 .option("kafka.security.protocol", "SASL_SSL") \
11 .option("kafka.sasl.mechanism", "OAUTHBEARER") \
12 .option("kafka.sasl.jaas.config",
13 f'kafkashaded.org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required clientId="{dbutils.secrets.get(scope="azure-secrets", key="client_id")}" clientSecret="{dbutils.secrets.get(scope="azure-secrets", key="client_secret")}" scope="https://my-namespace.servicebus.windows.net/.default" ssl.protocol="SSL";') \
14 .option("kafka.sasl.oauthbearer.token.endpoint.url", "https://login.microsoft.com/12345678-1234-1234-1234-123456789012/oauth2/v2.0/token") \
15 .option("kafka.sasl.login.callback.handler.class", "kafkashaded.org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler") \
16 .option("startingOffsets", "earliest") \
17 .load()
18
19 return df
See also
For complete Event Hubs Kafka documentation see Azure Event Hubs for Apache Kafka.
Event Hubs OAuth Requirements:
Always use port 9093 for the Kafka protocol with Event Hubs.
Specify the Event Hubs namespace in the format
<namespace>.servicebus.windows.net.Match the scope in JAAS config to
https://<namespace>.servicebus.windows.net/.default.Provide all five required options:
kafka.security.protocol,kafka.sasl.mechanism,kafka.sasl.jaas.config,kafka.sasl.oauthbearer.token.endpoint.url, andkafka.sasl.login.callback.handler.class.Assign the Service Principal the “Azure Event Hubs Data Receiver” role.
Rely on the callback handler to refresh OAuth tokens automatically.
Always use secrets for client credentials — never hardcode them in YAML.
custom_datasource¶
Use when you have or want a PySpark DataSource implementation and want LHP
to register and invoke it. Custom data source load actions use PySpark’s
DataSource API to implement specialised data ingestion from APIs, custom
protocols, or any external system that requires custom logic.
YAML Configuration:
actions:
- name: load_currency_exchange
type: load
readMode: stream
operational_metadata: ["_processing_timestamp"]
source:
type: custom_datasource
module_path: "data_sources/currency_api_source.py"
custom_datasource_class: "CurrencyAPIStreamingDataSource"
options:
apiKey: "${secret:apis/currency_key}"
baseCurrencies: "USD,EUR,GBP"
progressPath: "/Volumes/catalog/schema/checkpoints/"
minCallIntervalSeconds: "300"
workspaceUrl: "adb-XYZ.azuredatabricks.net"
target: v_currency_bronze
description: "Load live currency exchange rates from external API"
Custom DataSource Implementation (data_sources/currency_api_source.py):
1from pyspark.sql.datasource import DataSource, DataSourceStreamReader, InputPartition
2from pyspark.sql.types import StructType, StructField, StringType, DoubleType, TimestampType, BooleanType
3from typing import Iterator, Tuple
4import requests
5import time
6import json
7
8class CurrencyInputPartition(InputPartition):
9 """Input partition for currency API data source"""
10 def __init__(self, start_time, end_time):
11 self.start_time = start_time
12 self.end_time = end_time
13
14class CurrencyAPIStreamingDataSource(DataSource):
15 """
16 Custom data source for live currency exchange rates.
17 Fetches data from external API with rate limiting and progress tracking.
18 """
19
20 @classmethod
21 def name(cls):
22 return "currency_api_stream"
23
24 def schema(self):
25 return """
26 base_currency string,
27 target_currency string,
28 exchange_rate double,
29 api_timestamp timestamp,
30 fetch_timestamp timestamp,
31 rate_change_1h double,
32 is_crypto boolean,
33 data_source string,
34 pipeline_run_id string
35 """
36
37 def streamReader(self, schema: StructType):
38 return CurrencyAPIStreamingReader(schema, self.options)
39
40class CurrencyAPIStreamingReader(DataSourceStreamReader):
41 """Streaming reader implementation with API calls and progress tracking"""
42
43 def __init__(self, schema, options):
44 self.schema = schema
45 self.options = options
46 self.api_key = options.get("apiKey")
47 self.base_currencies = options.get("baseCurrencies", "USD").split(",")
48 self.progress_path = options.get("progressPath")
49 self.min_interval = int(options.get("minCallIntervalSeconds", "300"))
50
51 def initialOffset(self) -> dict:
52 return {"fetch_time": int(time.time() * 1000)}
53
54 def latestOffset(self) -> dict:
55 return {"fetch_time": int(time.time() * 1000)}
56
57 def partitions(self, start: dict, end: dict):
58 return [CurrencyInputPartition(start.get("fetch_time", 0), end.get("fetch_time", 0))]
59
60 def read(self, partition) -> Iterator[Tuple]:
61 """Fetch data from external API and yield as tuples"""
62 # API call logic here
63 for base_currency in self.base_currencies:
64 # Make API calls and yield data
65 yield (base_currency, "USD", 1.0, time.time(), time.time(), 0.0, False, "API", "run_1")
Anatomy of a custom data source load action
name: Unique name for this action within the FlowGroup
type: Action type - brings data into a temporary view
readMode: Either batch or stream - determines if custom DataSource uses batch or stream reader
operational_metadata: Add custom metadata columns (e.g., processing timestamp)
- source: Custom data source configuration
type: Use custom_datasource as source type
module_path: Path to Python file containing the custom DataSource implementation
custom_datasource_class: Name of the DataSource class to register and use
options: Dictionary of parameters passed to the DataSource (available via self.options)
target: Name of the temporary view created
description: Optional documentation for the action
See also
For PySpark DataSource API see the PySpark DataSource documentation.
Custom integrations: Architecture
Custom DataSources require implementing the DataSource interface with appropriate reader methods.
The framework copies your file to a custom_python_functions/ subdirectory next to the generated
pipeline file and imports the class by name — the user file is not inlined into the pipeline.
Use the options dictionary to pass configuration parameters from YAML to your DataSource.
File Substitution Support
Custom DataSource Python files support substitution variables:
Environment tokens:
${catalog},${api_endpoint},${environment}Secret references:
${secret:scope/key}for API keys and credentials
Substitutions are applied to the file’s contents as it is copied to custom_python_functions/.
Key Implementation Requirements:
Your DataSource class must implement the
name()class method returning the format name used in.format()The framework uses the return value of
name()method, not the class name, for the format stringThe class is imported from the copied module; the registration call (
spark.dataSource.register) runs at module load before the pipeline bodyPySpark’s vendored cloudpickle is registered (
register_pickle_by_value) so the class survives serialization to executors
File Organization: The module_path is relative to your YAML file location.
Common practice is to create a data_sources/ folder alongside your pipeline YAML files.
Schema Definition: Define your schema in the schema() method using DDL string format as shown in the example.
This schema should match the data structure returned by your read() method.
Import Management: The framework automatically handles import deduplication and conflict resolution.
If your custom source uses wildcard imports (e.g., from pyspark.sql.functions import *),
they will take precedence over alias imports, and operational metadata expressions will adapt accordingly.
The above YAML translates to the following PySpark code
The user’s data_sources/currency_api_source.py is copied verbatim into a
custom_python_functions/ subdirectory beside the generated pipeline file.
The pipeline file imports the class by name and registers it on the local
Spark session at module load:
1# Generated by LakehousePlumber
2# Pipeline: unirate_api_ingestion
3# FlowGroup: api_unirate_ingestion_bronze
4
5from pyspark import cloudpickle as _lhp_cloudpickle
6from pyspark.sql import functions as F
7from pyspark import pipelines as dp
8from custom_python_functions.currency_api_source import CurrencyAPIStreamingDataSource
9import custom_python_functions
10
11_lhp_cloudpickle.register_pickle_by_value(custom_python_functions)
12
13# Pipeline Configuration
14PIPELINE_ID = "unirate_api_ingestion"
15FLOWGROUP_ID = "api_unirate_ingestion_bronze"
16
17# ============================================================================
18# SOURCE VIEWS
19# ============================================================================
20
21# Try to register the custom data source
22try:
23 spark.dataSource.register(CurrencyAPIStreamingDataSource)
24except Exception:
25 pass # Ignore if already registered
26
27@dp.temporary_view()
28def v_currency_bronze():
29 """Load live currency exchange rates from external API"""
30 df = spark.readStream \
31 .format("currency_api_stream") \
32 .option("apiKey", dbutils.secrets.get(scope='apis', key='currency_key')) \
33 .option("baseCurrencies", "USD,EUR,GBP") \
34 .option("progressPath", "/Volumes/catalog/schema/checkpoints/") \
35 .option("minCallIntervalSeconds", "300") \
36 .option("workspaceUrl", "adb-XYZ.azuredatabricks.net") \
37 .load()
38
39 # Add operational metadata columns
40 df = df.withColumn('_processing_timestamp', F.current_timestamp())
41
42 return df
The register_pickle_by_value(custom_python_functions) line registers the
package with PySpark’s vendored cloudpickle so the DataSource class
survives serialization when SDP ships it to the executors. (The system
cloudpickle is silently inert here because PySpark uses its own bundled
copy — this is the one-line fix that makes the copy-and-import pattern work
across local + executor boundaries.)
See also¶
Decisions — load source decision matrix, streaming-vs-batch matrix.
Ingest with Auto Loader — end-to-end
cloudfileshow-to.Operational Metadata — audit column catalog.
Substitutions & Secrets —
${token}and${secret:scope/key}syntax.