Migrate from DLT to LHP¶
This how-to walks you through migrating an existing Databricks Lakeflow Declarative Pipeline codebase (still widely known by its former name, Delta Live Tables, or DLT) to Lakehouse Plumber (LHP) YAML. The four patterns below (streaming table from cloud files, materialized view, Change Data Capture, and expectations) cover the vast majority of production DLT code.
LHP-generated Python imports from pyspark import pipelines as dp and emits
@dp.table, @dp.materialized_view, dp.create_streaming_table,
dp.create_auto_cdc_flow, and friends. If your existing code uses
import dlt and @dlt.table, the decorators are functionally equivalent —
LHP simply moves you to the supported pyspark.pipelines alias as a
side-effect of regeneration.
Why migrate?¶
DLT Python code grows linearly with the number of tables: each table is a hand-written decorator plus a function, and shared logic (operational metadata, column lists, CDC keys) tends to copy-paste across files. LHP collapses the boilerplate into declarative YAML, applies presets and templates across flowgroups, and regenerates idempotently — so a single change to a preset propagates everywhere.
Migration approach¶
Migrate one flowgroup at a time. The LHP-generated Python lives alongside your existing DLT code in the same Lakeflow pipeline until you cut over:
Pick a single table (or a small set of related tables in one bronze/silver layer) to migrate first.
Write the YAML flowgroup under
pipelines/<pipeline_name>/.Run
lhp generate --env devto produce the Python file undergenerated/.Diff the generated Python against your hand-written DLT file. Adjust the YAML until the diff is acceptable (it will not be byte-identical — LHP uses
@dp.append_flowanddp.create_streaming_tablepatterns that may be structured differently from your original).Swap the file references in your pipeline configuration to point at the generated file, retire the original
.py, and move to the next flowgroup.
Because each flowgroup is independent, you can run a hybrid pipeline where some tables are LHP-generated and others are still hand-written DLT during the transition.
Pattern 1: Streaming table from cloud files¶
The most common ingestion pattern in DLT: an Auto Loader streaming table fed
from a cloud storage path. In LHP this becomes a cloudfiles load action
plus a streaming_table write action.
Existing DLT code:
import dlt
from pyspark.sql.functions import col
@dlt.table(
name="main.bronze.events",
comment="Raw events from object storage",
)
def events():
return (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.schemaLocation", "/Volumes/main/bronze/schemas/events")
.option("cloudFiles.inferColumnTypes", "true")
.load("/Volumes/main/landing/events/")
)
LHP YAML:
pipeline: bronze
flowgroup: events
actions:
- name: load_events
type: load
readMode: stream
source:
type: cloudfiles
path: /Volumes/main/landing/events/
format: json
options:
cloudFiles.schemaLocation: /Volumes/main/bronze/schemas/events
cloudFiles.inferColumnTypes: "true"
target: v_events_raw
- name: write_events
type: write
source: v_events_raw
write_target:
type: streaming_table
catalog: main
schema: bronze
table: events
comment: Raw events from object storage
The load action generates a @dp.temporary_view()-decorated function named
v_events_raw that opens the cloudFiles reader. The write action
generates dp.create_streaming_table(name="main.bronze.events", ...) followed
by a @dp.append_flow(target="main.bronze.events", ...) reading from
v_events_raw. The two-step view-then-append pattern is what enables LHP’s
later patterns (multiple sources writing into one table, CDC, etc.) without
restructuring the YAML.
Pattern 2: Materialized view¶
A DLT table that returns a complete DataFrame (not a streaming read) becomes an
LHP materialized_view write. The most common form is a SQL-driven
aggregation against another table.
Existing DLT code:
import dlt
@dlt.table(name="main.gold.daily_revenue")
def daily_revenue():
return spark.sql("""
SELECT order_date, SUM(amount) AS revenue
FROM main.silver.orders
GROUP BY order_date
""")
LHP YAML:
pipeline: gold
flowgroup: daily_revenue
actions:
- name: write_daily_revenue
type: write
write_target:
type: materialized_view
catalog: main
schema: gold
table: daily_revenue
sql: |
SELECT order_date, SUM(amount) AS revenue
FROM main.silver.orders
GROUP BY order_date
This generates @dp.materialized_view(name="main.gold.daily_revenue", ...)
over a function whose body is df = spark.sql("""..."""). If your source is
another LHP view rather than an inline SQL string, drop the sql: key and
use a source: key referencing the upstream view name.
Pattern 3: CDC pipeline¶
DLT’s dlt.apply_changes (the legacy API) and the newer
dp.create_auto_cdc_flow both map to LHP’s streaming_table write with
mode: cdc. LHP emits dp.create_streaming_table(...) followed by
dp.create_auto_cdc_flow(target=..., source=..., keys=..., sequence_by=...,
stored_as_scd_type=...).
Existing DLT code:
import dlt
dlt.create_streaming_table(name="main.silver.dim_customer")
dlt.apply_changes(
target="main.silver.dim_customer",
source="v_customer_changes",
keys=["customer_id"],
sequence_by="_commit_timestamp",
stored_as_scd_type=2,
)
LHP YAML:
pipeline: silver
flowgroup: dim_customer
actions:
- name: load_customer_cdc
type: load
readMode: stream
source:
type: delta
catalog: main
schema: bronze
table: customer
options:
readChangeFeed: "true"
target: v_customer_changes
- name: write_dim_customer
type: write
source: v_customer_changes
write_target:
type: streaming_table
catalog: main
schema: silver
table: dim_customer
mode: cdc
cdc_config:
keys: [customer_id]
sequence_by: _commit_timestamp
scd_type: 2
The exact YAML keys are load-bearing:
mode: cdcselects thedp.create_auto_cdc_flowcode path.keysis required and must be a non-empty list of strings.sequence_byaccepts a string (single column) or a list of strings (which LHP renders assequence_by=struct(...)and adds the requiredfrom pyspark.sql.functions import structimport).scd_typeis1(default) or2. For SCD 2, you may addtrack_history_column_listortrack_history_except_column_list(mutually exclusive).apply_as_truncatesis rejected whenscd_type: 2.
For snapshot-based CDC, set mode: snapshot_cdc and provide a
snapshot_cdc_config with either a source table reference or a
source_function block. LHP emits dp.create_auto_cdc_from_snapshot_flow.
Warning
dp.create_auto_cdc_flow and dp.create_auto_cdc_from_snapshot_flow
require a sufficiently recent Databricks Runtime — Lakeflow Declarative
Pipelines builds. If your existing code still uses dlt.apply_changes and
dlt.apply_changes_from_snapshot, regenerated LHP code is the supported
replacement and runs on the same pipeline engine.
Pattern 4: Expectations¶
DLT decorator-based expectations (@dlt.expect, @dlt.expect_or_drop,
@dlt.expect_or_fail, and their _all variants) map to LHP’s
data_quality transform action. The generated code uses @dp.expect_all,
@dp.expect_all_or_drop, and @dp.expect_all_or_fail.
Existing DLT code:
import dlt
@dlt.table(name="main.silver.orders")
@dlt.expect_all({"valid_amount": "amount > 0"})
@dlt.expect_all_or_drop({"has_customer": "customer_id IS NOT NULL"})
@dlt.expect_all_or_fail({"valid_date": "order_date IS NOT NULL"})
def orders():
return spark.readStream.table("main.bronze.orders")
LHP YAML:
pipeline: silver
flowgroup: orders
actions:
- name: load_bronze_orders
type: load
readMode: stream
source:
type: delta
catalog: main
schema: bronze
table: orders
target: v_orders_raw
- name: validate_orders
type: transform
transform_type: data_quality
source: v_orders_raw
target: v_orders_validated
expectations:
- name: valid_amount
expression: amount > 0
action: warn
- name: has_customer
expression: customer_id IS NOT NULL
action: drop
- name: valid_date
expression: order_date IS NOT NULL
action: fail
- name: write_orders
type: write
source: v_orders_validated
write_target:
type: streaming_table
catalog: main
schema: silver
table: orders
LHP splits each expectation into one of three buckets by its action:
warn → @dp.expect_all, drop → @dp.expect_all_or_drop, fail
→ @dp.expect_all_or_fail. The generated transform is a
@dp.temporary_view() function with the matching expectation decorators
stacked above it.
Verifying the migration¶
After writing each flowgroup, regenerate and compare:
lhp generate --env dev
diff -u original/silver_orders.py generated/silver/orders.py
You are looking for semantic equivalence, not byte-identical output. The generated file will:
Import
from pyspark import pipelines as dpinstead ofimport dlt.Use
@dp.temporary_view()intermediate views (namedv_<something>) even when your original code inlined the read directly inside the table function.Split a single hand-written
@dlt.table-plus-CDC pattern into adp.create_streaming_table(...)call followed bydp.create_auto_cdc_flow(...).
Run lhp validate --env dev first to catch YAML schema errors before
generation. lhp generate --env dev --dry-run previews the output without
writing files.
Tip
When migrating a pipeline that already runs in production, generate into a separate output directory first (configurable per project), point a staging Lakeflow pipeline at the generated files, and validate that the staging pipeline produces an identical result to production before swapping over.
Things that don’t map cleanly¶
A handful of DLT constructs do not have a direct LHP equivalent. Handle them case by case:
Legacy decorator names.
@dlt.viewand@dlt.create_viewhave no direct counterpart. LHP’s@dp.temporary_view()is generated automatically for every load and transform action; if you depended on a named DLT view that other tables read by string, refactor the consumer to read from the LHP view name (v_<target>) instead.Imperative-only DLT code. Code that calls
dlt.read,dlt.read_streaminside arbitrary control flow (for-loops generating tables dynamically) does not fit the declarative YAML model. Either flatten the loop into explicit flowgroups, or use LHP’scustom_datasourceorpythontransform action to embed a hand-written function.Runtime-only behaviors. Pipeline event hooks,
dlt.read_streamagainst another pipeline’s table, and any%pip installmagics in your DLT notebook are configured at the Lakeflow pipeline level (indatabricks.ymlor the pipeline spec), not inside the generated Python. Move them there during the migration.Hand-rolled SCD logic. If you wrote SCD 2 logic by hand instead of using
dlt.apply_changes, the cleanest migration is to convert it tomode: cdcin LHP rather than reproduce the imperative code. The result is supported and considerably shorter.
See also¶
Architecture — how Pipelines, FlowGroups, and Actions relate, and why the load/transform/write split makes incremental migration practical.
Actions Reference — the full reference for every load, transform, write, and test action, including options not covered in the four patterns above.
Quickstart — if you are new to LHP and want to build a working pipeline from scratch before tackling a migration.