Quarantine Records¶
This how-to walks you through enabling quarantine mode on a data_quality transform.
The result: failed rows are preserved in a Dead Letter Queue (DLQ) table you can query
and edit, and corrected rows flow back into the pipeline output on the next run.
For the full reference (configuration schema, prerequisites, table DDL, limitations, and troubleshooting), see Quarantine (Dead Letter Queue).
Quick Start¶
1. FlowGroup YAML
1pipeline: acmi_edw_bronze
2flowgroup: quarantine_flow
3
4actions:
5 - name: orders_raw_load
6 type: load
7 readMode: stream
8 source:
9 type: delta
10 database: "${catalog}.${raw_schema}"
11 table: orders_raw
12 target: v_orders_raw
13 description: "Load orders from raw schema"
14
15 - name: orders_quarantine_dq
16 type: transform
17 transform_type: data_quality
18 source: v_orders_raw
19 target: v_orders_validated
20 readMode: stream
21 expectations_file: "expectations/quarantine_quality.yaml"
22 description: "Apply quarantine data quality checks to orders"
23 mode: quarantine
24 quarantine:
25 dlq_table: "${catalog}.${bronze_schema}.universal_dlq"
26 source_table: "${catalog}.${bronze_schema}.orders"
27
28 - name: write_orders_bronze
29 type: write
30 source: v_orders_validated
31 write_target:
32 create_table: true
33 type: streaming_table
34 database: "${catalog}.${bronze_schema}"
35 table: "orders_quarantined"
2. Expectations file
1order_id IS NOT NULL:
2 action: drop
3 name: valid_order_id
4order_amount > 0:
5 action: warn
6 name: positive_amount
7customer_id IS NOT NULL:
8 action: fail
9 name: valid_customer_id
Note the mix of drop, warn, and fail actions. In quarantine mode, all three are
coerced to drop — a warning is emitted during validation for warn and fail entries.
3. Validate and generate
lhp validate --env dev
lhp generate --env dev
What happens: LHP generates a single Python file containing six components — shared constants, a clean view, a DLQ sink with quarantine flow, a recycle sink/flow (inbox → outbox dedup), a recycled validation view, and a final UNION output view — plus the standard load and write actions. See Generated Code Walkthrough below.
Generated Code Walkthrough¶
The following is the quarantine-specific code generated from the Quick Start example above. The load and write sections are standard and omitted for brevity.
1# --- Rules & constants ---
2
3_EXPECTATIONS_v_orders_raw = {
4 "valid_order_id": "order_id IS NOT NULL",
5 "positive_amount": "order_amount > 0",
6 "valid_customer_id": "customer_id IS NOT NULL",
7}
8
9_INVERSE_FILTER_v_orders_raw = (
10 "NOT ((order_id IS NOT NULL) AND (order_amount > 0) AND (customer_id IS NOT NULL))"
11)
12
13_FAILED_RULE_EXPRS_v_orders_raw = [...] # omitted for brevity
14
15DLQ_TABLE_v_orders_raw = "acme_edw_dev.edw_bronze.universal_dlq"
16DLQ_OUTBOX_TABLE_v_orders_raw = "acme_edw_dev.edw_bronze.universal_dlq_outbox"
17SOURCE_TABLE_v_orders_raw = "acme_edw_dev.edw_bronze.orders"
18
19_EXPECTATIONS_RECYCLED_v_orders_raw = {
20 "valid_order_id": "order_id IS NOT NULL",
21 "positive_amount": "order_amount > 0",
22 "valid_customer_id": "customer_id IS NOT NULL",
23}
24
25
26# --- Clean path (provides DQ metrics in event log) ---
27@dp.temporary_view()
28@dp.expect_all_or_drop(_EXPECTATIONS_v_orders_raw)
29def _clean_v_orders_raw():
30 """Apply quarantine data quality checks to orders — clean records"""
31 return spark.readStream.table("v_orders_raw")
32
33
34# --- Quarantine path (DLQ sink + routing) ---
35@dp.foreach_batch_sink(name="dlq_sink_v_orders_raw")
36def dlq_sink_v_orders_raw(batch_df, batch_id):
37 """Write quarantined rows to DLQ table"""
38 ... # MERGE into DLQ inbox (omitted for brevity)
39
40
41@dp.append_flow(target="dlq_sink_v_orders_raw", name="quarantine_flow_v_orders_raw")
42def quarantine_flow_v_orders_raw():
43 """Route failed rows to DLQ"""
44 return (
45 spark.readStream.table("v_orders_raw")
46 .filter(_INVERSE_FILTER_v_orders_raw)
47 .withColumn("_dlq_failed_rules",
48 F.array_compact(F.array(*_FAILED_RULE_EXPRS_v_orders_raw)))
49 )
50
51
52# --- Recycle path (dedup inbox → outbox) ---
53@dp.foreach_batch_sink(name="recycle_sink_v_orders_raw")
54def recycle_sink_v_orders_raw(batch_df, batch_id):
55 """Deduplicate fixed DLQ rows and write to outbox"""
56 if batch_df.isEmpty():
57 return
58
59 spark = batch_df.sparkSession
60 w = Window.partitionBy("_dlq_sk").orderBy(F.desc("_commit_version"))
61 deduped = (
62 batch_df.withColumn("_rn", F.row_number().over(w))
63 .filter("_rn = 1")
64 .drop("_rn")
65 .select(
66 "_dlq_sk", "_dlq_source_table", "_row_data",
67 F.current_timestamp().alias("_dlq_recycled_at"),
68 )
69 )
70
71 outbox = DeltaTable.forName(spark, DLQ_OUTBOX_TABLE_v_orders_raw)
72 (
73 outbox.alias("out")
74 .merge(deduped.alias("new"), "out._dlq_sk = new._dlq_sk")
75 .whenNotMatchedInsertAll()
76 .execute()
77 )
78
79
80@dp.append_flow(
81 target="recycle_sink_v_orders_raw", name="recycle_flow_v_orders_raw"
82)
83def recycle_flow_v_orders_raw():
84 """Read fixed rows from DLQ inbox via CDF"""
85 return (
86 spark.readStream.option("readChangeFeed", "true")
87 .table(DLQ_TABLE_v_orders_raw)
88 .filter(
89 "_dlq_status = 'fixed' "
90 "AND _change_type IN ('insert', 'update_postimage') "
91 f"AND _dlq_source_table = '{SOURCE_TABLE_v_orders_raw}'"
92 )
93 )
94
95
96# --- Recycled path (outbox → validated recycled view) ---
97@dp.temporary_view()
98@dp.expect_all_or_drop(_EXPECTATIONS_RECYCLED_v_orders_raw)
99def _recycled_v_orders_raw():
100 """Validate recycled rows from DLQ outbox"""
101 clean = spark.readStream.table("_clean_v_orders_raw")
102 return (
103 spark.readStream.option("skipChangeCommits", "true")
104 .table(DLQ_OUTBOX_TABLE_v_orders_raw)
105 .filter(f"_dlq_source_table = '{SOURCE_TABLE_v_orders_raw}'")
106 .select([
107 F.try_variant_get(
108 F.col("_row_data"), f"$.{field.name}", field.dataType.simpleString()
109 ).alias(field.name)
110 for field in clean.schema.fields
111 ])
112 )
113
114
115# --- Validated output (clean + recycled) ---
116@dp.temporary_view()
117def v_orders_validated():
118 """Apply quarantine data quality checks to orders — clean + recycled records"""
119 clean = spark.readStream.table("_clean_v_orders_raw")
120 recycled = spark.readStream.table("_recycled_v_orders_raw")
121
122 df = clean.union(recycled)
123
124 return df
Component 2: Clean View¶
The _clean_* view applies @dp.expect_all_or_drop to the source view. Rows passing all
expectations flow through to this view. Failed rows are silently dropped here but captured
separately by the quarantine flow (Component 3).
The clean view is also visible in the Databricks DLT event log, providing data quality metrics (pass/fail counts) without additional configuration.
Component 3: DLQ Sink + Quarantine Flow¶
Two functions work together:
``dlq_sink_*`` — a
@dp.foreach_batch_sinkthat receives micro-batches of failed rows andMERGEs them into the DLQ table. The MERGE uses_dlq_sk(a deterministicxxhash64hash) to prevent duplicate inserts on retries.``quarantine_flow_*`` — an
@dp.append_flowthat reads the same source view, applies the inverse filter to select only failed rows, and annotates each row with_dlq_failed_rules.
Component 4: Recycle Sink + Flow (Inbox → Outbox)¶
Two functions handle deduplication of fixed DLQ rows:
``recycle_sink_*`` — a
@dp.foreach_batch_sinkthat deduplicates fixed rows by_dlq_skusing aWindowwithrow_number(), keeping only the latest_commit_version. The deduplicated rows areMERGEd into the outbox table (whenNotMatchedInsertAll), ensuring each row is processed exactly once.``recycle_flow_*`` — an
@dp.append_flowthat reads the DLQ inbox via CDF, filtering for_dlq_status = 'fixed'rows.
This design prevents duplicate ingestion when a DLQ row is updated multiple times (e.g. multiple edits before a pipeline run, or re-edits after recycling).
Component 5: Recycled Validation View¶
The _recycled_* view reads from the outbox table with skipChangeCommits (to avoid
reprocessing MERGE commits) and reconstructs the original schema using try_variant_get.
It applies @dp.expect_all_or_drop(_EXPECTATIONS_RECYCLED_*) to validate recycled rows,
using a filtered expectations dict that excludes _rescued_data rules (since recycled rows
are stored as VARIANT and don’t have the original _rescued_data column).
Component 6: Final Output View (UNION)¶
The target view (e.g. v_orders_validated) UNIONs two streams:
Clean stream — from
_clean_*(rows that passed all expectations).Recycled stream — from
_recycled_*(validated rows from the outbox).
If operational metadata columns are configured, they are added to the UNION output.
DLQ Operations Guide¶
This section covers querying, inspecting, and recycling quarantined rows. These operations are performed directly against the DLQ table using Databricks SQL.
Querying Quarantined Rows¶
SELECT
_dlq_sk,
_dlq_source_table,
_dlq_status,
_dlq_timestamp,
_dlq_failed_rules,
_row_data
FROM catalog.schema.universal_dlq
WHERE _dlq_source_table = 'catalog.schema.orders'
AND _dlq_status = 'quarantined'
ORDER BY _dlq_timestamp DESC;
Inspecting Failed Rules¶
SELECT
_dlq_sk,
rule.name AS rule_name,
rule.rule AS rule_expression,
_dlq_timestamp
FROM catalog.schema.universal_dlq
LATERAL VIEW EXPLODE(_dlq_failed_rules) AS rule
WHERE _dlq_source_table = 'catalog.schema.orders'
AND _dlq_status = 'quarantined';
Extracting Row Data¶
SELECT
_dlq_sk,
_row_data:order_id::INT AS order_id,
_row_data:customer_id::STRING AS customer_id,
_row_data:order_amount::DOUBLE AS order_amount,
_dlq_failed_rules
FROM catalog.schema.universal_dlq
WHERE _dlq_source_table = 'catalog.schema.orders'
AND _dlq_status = 'quarantined';
Fixing Rows (Recycling Workflow)¶
The recycling workflow lets operators mark corrected rows as fixed. The pipeline’s CDF reader
automatically picks up these changes and includes them in the next micro-batch of the output view.
Step-by-step:
Query the DLQ to identify rows that need fixing.
Verify or correct the underlying data issue (e.g. backfill a missing
customer_id).Update the row status to
'fixed':
UPDATE catalog.schema.universal_dlq
SET _dlq_status = 'fixed'
WHERE _dlq_sk = '<surrogate_key_value>'
AND _dlq_status = 'quarantined';
On the next pipeline refresh, the CDF reader detects the
update_postimageevent and the recycled row flows through the UNION into the target view.
Warning
Recycling relies on Change Data Feed (CDF). If CDF is disabled on the DLQ table, status
updates will not be detected by the pipeline. Ensure delta.enableChangeDataFeed = 'true'
is set as a table property.
Note
The _dlq_sk is a deterministic hash (xxhash64) of the source table name and the
row’s JSON representation. The same row will always produce the same key, which prevents
duplicate inserts on retries and allows targeted updates.
See also¶
Quarantine (Dead Letter Queue) — Configuration reference, prerequisites/DDL, limitations, CloudFiles support, and integration with operational metadata, substitutions, and presets.
Error Reference — full error code catalog with resolution steps.