API Reference¶
The Lakehouse Plumber (LHP) Python API exposes the same generation engine that
powers the lhp command-line interface. Import it to build custom tooling:
notebook integrations, CI/CD wrappers, programmatic validation in tests, or
embedding LHP inside a larger orchestrator. For day-to-day pipeline authoring,
prefer the CLI — the Python API trades convenience for control.
This page catalogs the public surface by stability tier and renders live docstrings via Sphinx autodoc. The tier table is the contract; the autodoc sections below it are the source-of-truth signatures pulled from the code at build time.
Stability tiers¶
LHP follows a four-tier stability model. The tier governs what level of change to expect between minor and major versions.
Tier |
Change policy |
Definition |
|---|---|---|
Stable |
Semantic versioning; breaking changes only at major releases. |
Public API in active use by external integrations. Method signatures, return shapes, and observable behavior are versioned. |
Beta |
May change between minor versions with a deprecation cycle of at least one release. |
Public API still settling. Suitable for production use if you pin LHP to a minor version. |
Experimental |
May change without notice at any release, including patch releases. |
Public API exposed for evaluation. Do not depend on it from long-lived code. |
Internal |
May change at any time. Not part of the API contract. |
Implementation detail. Importing it from outside |
If a symbol is not listed in this page, treat it as Internal regardless of its import path.
Changed in version 0.8.7: This page replaces the prior alpha disclaimer with explicit per-module stability tiers. The CLI surface remains Stable; the Python API is Beta except where noted.
Public API by tier¶
Stable¶
No symbol is currently Stable. LHP is pre-1.0; the CLI is the stable contract. Python API consumers should treat the Beta tier as the recommended entry point and pin to a minor version of LHP.
Beta¶
These symbols are the recommended entry points for programmatic use.
Symbol |
Summary |
|---|---|
|
Drives end-to-end pipeline generation: discovery, expansion, validation, code generation, and bundle synchronization. |
|
Parses and validates LHP YAML files into Pydantic models. Use to load FlowGroups, Presets, and Templates without invoking the full pipeline. |
|
Thread-safe caching wrapper around |
|
Loads presets from a directory and resolves inheritance chains. |
|
Synchronizes Databricks Asset Bundle (DAB) resource files with generated pipeline code. |
|
Pydantic models for the on-disk YAML schema: |
|
Public exception hierarchy: |
Experimental¶
These symbols are exposed but may change without notice.
Symbol |
Summary |
|---|---|
|
Resolves |
|
Value class representing a parsed |
|
Loads and renders Jinja2 FlowGroup templates. Public for advanced template authors; the parameter surface may grow. |
|
Standalone validator used by |
|
Returns the installed LHP package version string. |
Internal¶
Everything else under lhp.* is Internal. The following namespaces are
explicitly off-limits to external code:
lhp.core.services.*— service decomposition used by the orchestrator.lhp.core.commands.*— internal command pattern dispatch.lhp.core.factories,lhp.core.layers,lhp.core.dependency_resolver,lhp.core.parallel_processor— wiring and execution helpers.lhp.generators.*— per-action code generators. Use the orchestrator, not generators directly.lhp.cli.*— Click command implementations. Use thelhpshell entry point or invoke Click’sCliRunneragainstlhp.cli.main:cli.lhp.utils.smart_file_writer,lhp.utils.performance_timer,lhp.utils.formatter,lhp.utils.source_extractor, and every otherlhp.utils.*module not listed under Beta or Experimental above.lhp.schemas.*— JSON Schema generation for IDE integration.lhp.templates.*andlhp.resources.*— packaged Jinja2 templates and static assets.
Warning
Importing from an Internal namespace works today but provides no guarantees. Patch releases may move, rename, or remove these symbols without a deprecation notice.
Detailed reference¶
The sections below render docstrings, type hints, and signatures directly from the source code. Refer to the tier table above for stability guarantees on each symbol.
Package root: lhp¶
Lakehouse Plumber - YAML-driven framework for Databricks Lakeflow Spark Declarative Pipelines.
Orchestrator: lhp.core.orchestrator¶
Main orchestration for LakehousePlumber pipeline generation.
- class lhp.core.orchestrator.ActionOrchestrator(project_root, enforce_version=True, dependencies=None, pipeline_config_path=None, max_workers=None)[source]¶
Bases:
objectMain orchestration for pipeline generation (Service-based architecture).
Implements the business layer interface and coordinates specialized services for discovery, processing, generation, and validation while maintaining the same public API for backward compatibility.
- Parameters:
- property cached_yaml_parser: CachingYAMLParser¶
Public accessor for the shared CachingYAMLParser instance.
- validate_configuration(pipeline_identifier, env)[source]¶
Validate configuration based on business rules.
- discover_flowgroups(pipeline_dir)[source]¶
Discover all flowgroups in a specific pipeline directory.
- discover_all_flowgroups()[source]¶
Discover all flowgroups across all directories in the project.
- Combines three sources, in order:
Disk-sourced flowgroups via FlowgroupDiscoverer (existing).
Synthetic flowgroups expanded from blueprints x instances. Once expanded, these are indistinguishable from disk-sourced flowgroups for downstream code paths (Step 0.5 -> Step 5).
Synthetic monitoring flowgroup (existing) when monitoring is configured.
- Side effects:
Populates self._blueprint_provenance with the expansion’s (pipeline, flowgroup) -> BlueprintProvenance mapping. Used by the state dependency resolver, dependency tracker, and source-path index to handle instance-file changes correctly.
Populates self._synthetic_contexts with FlowGroupContext envelopes for synthetic flowgroups (blueprint-expanded + monitoring).
Stores the full MonitoringBuildResult in self._monitoring_result.
- finalize_monitoring_artifacts(env, output_dir)[source]¶
Reconcile monitoring artifacts: clean stale, write current.
Called AFTER the pipeline generation loop. Handles all transitions: - Monitoring added: write notebook + job - Monitoring removed: clean up notebook + job - Pipeline renamed: old artifacts removed, new ones written - MVs added/removed: job updated (notebook-only vs full)
- discover_flowgroups_by_pipeline_field(pipeline_field, pre_discovered_all_flowgroups=None)[source]¶
Discover all flowgroups with a specific pipeline field across all directories.
- validate_duplicate_pipeline_flowgroup_combinations(flowgroups)[source]¶
Validate that there are no duplicate pipeline+flowgroup combinations.
- Parameters:
flowgroups (List[FlowGroup]) – List of flowgroups to validate
- Raises:
ValueError – If duplicate combinations are found
- Return type:
None
- generate_pipeline_by_field(pipeline_field, env, output_dir=None, specific_flowgroups=None, include_tests=False, pre_discovered_all_flowgroups=None)[source]¶
Thin shim — delegate to the plural
generate_pipelines_by_fields().On failure the plural method re-raises the original exception unchanged for the single-pipeline case, so callers that catch specific
LHPErrorsubclasses continue to work.
- generate_pipelines_by_fields(*, pipeline_fields, env, output_dir, specific_flowgroups=None, include_tests=False, pre_discovered_all_flowgroups=None, max_workers=None, on_pipeline_complete=None)[source]¶
Run one worker per pipeline; aggregate results on the main thread.
Each worker owns the full pipeline pass (Phase A discovery + Phase B validation/file writes). The main thread is a pure aggregator.
- Parameters:
pipeline_fields (Sequence[str]) – Pipeline names to generate. Order is preserved in the returned mapping;
on_pipeline_completefires in completion order (not input order).env (str) – Environment name (e.g.
"dev").output_dir (Path | None) – Root output directory (
output_dir / <pipeline>) orNonefor dry-run.specific_flowgroups (List[str] | None) – Optional whitelist of flowgroup names.
include_tests (bool) – Include test actions in the generated code.
pre_discovered_all_flowgroups (List[FlowGroup] | None) – Re-use the caller’s one-shot discovery instead of re-discovering.
max_workers (int | None) – Pool size override.
Nonefalls back toself.max_workers.on_pipeline_complete (Callable[[PipelineDelta], None] | None) – Optional main-thread callback fired once per pipeline with the
PipelineDelta. Callback exceptions are logged but do not abort the batch.
- Returns:
Mapping of
{pipeline_name: {generated_path: code}}for successful pipelines. Failed pipelines are absent from the dict — the aggregateLHPErrorraised at the end captures their errors.- Raises:
LHPError – When a single pipeline failed; reconstructed via
LHPError.from_worker_exception()so the worker’s error type and full traceback survive.LHPValidationError – When multiple pipelines failed; one aggregate error listing every failure.
- Return type:
- process_flowgroup(flowgroup, substitution_mgr, include_tests=True)[source]¶
Process flowgroup: expand templates, apply presets, apply substitutions.
Backward-compatible shim around
FlowgroupProcessor.process_flowgroup(), which now takes/returnsFlowGroupContext. This shim wraps the FlowGroup in a default-empty context and returns just the processed FlowGroup for callers that don’t care about provenance.- Parameters:
flowgroup (FlowGroup) – FlowGroup to process
substitution_mgr (EnhancedSubstitutionManager) – Substitution manager for the environment
include_tests (bool) – If False, filter out test actions before processing. Defaults to True for backward compatibility.
- Returns:
Processed flowgroup
- Return type:
- generate_flowgroup_code(flowgroup, substitution_mgr, output_dir=None, source_yaml=None, env=None, include_tests=False, python_file_copier=None, phase_a_records=None)[source]¶
Generate complete Python code for a flowgroup.
- Parameters:
flowgroup (FlowGroup) – FlowGroup to generate code for
substitution_mgr (EnhancedSubstitutionManager) – Substitution manager for the environment
output_dir (Path | None) – Output directory for generated files
source_yaml (Path | None) – Source YAML path for file tracking
env (str | None) – Environment name for file tracking
include_tests (bool) – Whether to include test actions
python_file_copier – Thread-safe Python file copier (for parallel mode)
phase_a_records (List[CopiedModuleRecord] | None) – Optional list passed by Phase A workers in the cross-pipeline flat pool; when supplied, the file copier appends
CopiedModuleRecordentries to it instead of writing to disk. Phase B replays those records.
- Returns:
Complete Python code for the flowgroup
- Return type:
- determine_action_subtype(action)[source]¶
Determine the sub-type of an action for generator selection.
- create_combined_write_action(actions, target_table)[source]¶
Create a combined write action with individual action metadata preserved.
- validate_pipeline_by_field(pipeline_field, env, include_tests=True, pre_discovered_all_flowgroups=None)[source]¶
Thin shim — delegate to the plural
validate_pipelines_by_fields().Preserves the legacy
(errors, warnings)tuple return so the existingValidateCommandper-pipeline loop andLakehousePlumberApplicationFacade.validate_pipelinekeep working unchanged.
- validate_pipelines_by_fields(*, pipeline_fields, env, include_tests=True, pre_discovered_all_flowgroups=None, max_workers=None, on_pipeline_complete=None)[source]¶
Flat-pool validate across multiple pipelines.
Mirrors
generate_pipelines_by_fields()but simpler: no state save, no Phase B replay, no file writes. Phase A workers callprocess_flowgroup()(which runs schema + reference + action validation); Phase B per pipeline runsConfigValidator.validate_cdc_fanin_compatibility()as the post-barrier cross-flowgroup check.- Parameters:
pipeline_fields (Sequence[str]) – Pipeline names to validate. Outcomes are returned in input order (stable for display).
env (str) – Environment name.
include_tests (bool) – When False, test actions are filtered before processing (matches single-pipeline shim default of True).
pre_discovered_all_flowgroups (List[FlowGroup] | None) – Re-use caller’s one-shot discovery; if None, runs
discover_all_flowgroups().max_workers (int | None) – Process-pool size; falls back through
self.max_workers→_auto_max_workers()(~80% of detected CPU count, honoring cgroup limits on Linux).on_pipeline_complete (Callable[[PipelineValidationOutcome], None] | None) – Optional callback fired once per pipeline (main thread, completion order).
- Returns:
List of
PipelineValidationOutcome, one per input pipeline, in input order.- Return type:
List[PipelineValidationOutcome]
YAML parsing: lhp.parsers.yaml_parser¶
- class lhp.parsers.yaml_parser.YAMLParser[source]¶
Bases:
objectParse and validate YAML configuration files.
- parse_flowgroups_from_file(file_path)[source]¶
Parse one or more FlowGroups from a YAML file.
Supports both multi-document syntax (—) and flowgroups array syntax.
- Parameters:
file_path (Path) – Path to YAML file containing one or more flowgroups
- Returns:
List of FlowGroup objects
- Raises:
ValueError – For duplicate flowgroup names, mixed syntax, or parsing errors
- Return type:
- parse_flowgroup(file_path)[source]¶
Parse a FlowGroup YAML file.
Note: This method only supports single-flowgroup files. If the file contains multiple flowgroups (via — separator or flowgroups array), use parse_flowgroups_from_file() instead.
- parse_template_raw(file_path)[source]¶
Parse a Template YAML file with raw actions (no Action object creation).
This is used during template loading to avoid validation of template syntax like {{ table_properties }}. Actions will be validated later during rendering when actual parameter values are available.
- class lhp.parsers.yaml_parser.CachingYAMLParser(base_parser=None, max_cache_size=500)[source]¶
Bases:
objectThread-safe caching wrapper for YAMLParser.
Uses file path + modification time as cache key to automatically invalidate cache when files change.
- Parameters:
base_parser (YAMLParser | None)
max_cache_size (int)
- load_documents_all(path, error_context=None)[source]¶
Load all YAML documents from a file with mtime-keyed caching.
Hit/miss counters are shared with the flowgroup sub-cache;
get_cache_stats()reports per-sub-cache sizes separately.
Presets: lhp.presets.preset_manager¶
Preset management with hierarchical inheritance.
- class lhp.presets.preset_manager.PresetManager(presets_dir)[source]¶
Bases:
objectManages preset loading and inheritance resolution.
- Parameters:
presets_dir (Path)
- get_operational_metadata_selection(preset_names)[source]¶
Get operational metadata selection from resolved preset chain.
Asset Bundle integration: lhp.bundle.manager¶
Bundle manager for LHP Databricks Asset Bundle integration.
This module provides the main BundleManager class that coordinates bundle resource operations including resource file synchronization and management.
- class lhp.bundle.manager.BundleManager(project_root, pipeline_config_path=None, project_config=None)[source]¶
Bases:
objectManages Databricks Asset Bundle resource files.
Wipe-and-regenerate contract:
resources/lhp/is fully wiped by the CLI before sync, and BundleManager re-renders one resource file per pipeline undergenerated/<env>/. databricks.yml is never read or mutated. Catalog and schema must come from pipeline_config.yaml (per-pipeline or via the top-levelproject_defaultsblock).- sync_resources_with_generated_files(output_dir, env)[source]¶
Write one bundle resource file per current pipeline directory.
Wipe-and-regenerate contract: callers must clear
resources/lhp/before invoking this method. BundleManager only writes the resource files for the pipelines that exist underoutput_dir— it does not preserve, back up, or delete any pre-existing files.
- get_pipeline_directories(output_dir)[source]¶
Get list of pipeline directories in the output directory.
- get_resource_file_path(pipeline_name)[source]¶
Return the path where this pipeline’s resource file is written.
Under the wipe-and-regenerate contract,
resources/lhp/is empty when sync starts and BundleManager always writes the canonical<pipeline>.pipeline.ymlfilename, so no existence probing is needed.
- generate_resource_file_content(pipeline_name, output_dir, env)[source]¶
Generate content for a bundle resource file using Jinja2 template.
Applies LHP token substitution to ALL fields in pipeline_config.yaml, enabling environment-specific configuration for node types, policies, emails, and all other pipeline settings. Catalog and schema MUST be defined in pipeline_config.yaml (either per-pipeline or via the top-level
project_defaultsblock); they are never read fromdatabricks.yml.Catalog/schema validation is performed upstream by
bundle.preflight.validate_catalog_schemabefore any wipes occur. The guard here is a defense-in-depth assertion that fires only if a non-CLI caller invokes this method without running preflight first.- Parameters:
- Returns:
YAML content for the resource file with fully substituted pipeline config
- Raises:
LHPConfigError –
LHP-GEN-001if preflight was bypassed and catalog/schema is still missing/empty at the bundle-write phase.- Return type:
Configuration models: lhp.models.config¶
Pydantic models that mirror the on-disk YAML schema. Import these to type your own loaders or to construct configurations programmatically.
- class lhp.models.config.ViolationAction(*values)[source]¶
-
Actions to take when test expectations are violated.
- class lhp.models.config.MetadataColumnConfig(*, expression, description=None, applies_to=['streaming_table', 'materialized_view'], additional_imports=None, enabled=True)[source]¶
Bases:
BaseModelConfiguration for a single metadata column.
- class lhp.models.config.MetadataPresetConfig(*, columns, description=None)[source]¶
Bases:
BaseModelConfiguration for a metadata column preset.
- class lhp.models.config.OperationalMetadataSelection(*, enabled=True, preset=None, columns=None, include_columns=None, exclude_columns=None)[source]¶
Bases:
BaseModelOperational metadata selection configuration (used in flowgroups/actions/presets).
- class lhp.models.config.QuarantineConfig(*, dlq_table, source_table)[source]¶
Bases:
BaseModelConfiguration for quarantine mode in data quality transforms.
- class lhp.models.config.ProjectOperationalMetadataConfig(*, columns, presets=None, defaults=None)[source]¶
Bases:
BaseModelProject-level operational metadata configuration (definitions only).
- Parameters:
columns (Dict[str, MetadataColumnConfig])
presets (Dict[str, MetadataPresetConfig] | None)
- class lhp.models.config.EventLogConfig(*, enabled=True, catalog=None, schema=None, name_prefix='', name_suffix='')[source]¶
Bases:
BaseModelProject-level event log configuration for pipeline resource generation.
- class lhp.models.config.MonitoringMaterializedViewConfig(*, name, sql=None, sql_path=None)[source]¶
Bases:
BaseModelConfiguration for a single monitoring materialized view.
- class lhp.models.config.MonitoringConfig(*, enabled=True, pipeline_name=None, catalog=None, schema=None, streaming_table='all_pipelines_event_log', checkpoint_path='', job_config_path=None, max_concurrent_streams=10, materialized_views=None, enable_job_monitoring=False)[source]¶
Bases:
BaseModelProject-level monitoring pipeline configuration.
Generates two artifacts:
A standalone notebook that runs N independent streaming queries (one per pipeline event log) appending into a user-created Delta table.
A DLT pipeline with materialized views only, reading from that Delta table.
A Databricks Workflow job chains: notebook_task (union) → pipeline_task (MVs).
- class lhp.models.config.TestReportingConfig(*, module_path, function_name, config_file=None)[source]¶
Bases:
BaseModelConfiguration for test result reporting to external systems.
- class lhp.models.config.ProjectConfig(*, name, version='1.0', description=None, author=None, created_date=None, include=None, blueprint_include=None, instance_include=None, operational_metadata=None, event_log=None, monitoring=None, required_lhp_version=None, test_reporting=None)[source]¶
Bases:
BaseModelProject-level configuration loaded from lhp.yaml.
- Parameters:
name (str)
version (str)
description (str | None)
author (str | None)
created_date (str | None)
operational_metadata (ProjectOperationalMetadataConfig | None)
event_log (EventLogConfig | None)
monitoring (MonitoringConfig | None)
required_lhp_version (str | None)
test_reporting (TestReportingConfig | None)
- class lhp.models.config.WriteTarget(*, type, catalog=None, schema=None, database=None, table=None, create_table=True, comment=None, table_properties=None, partition_columns=None, cluster_columns=None, spark_conf=None, table_schema=None, row_filter=None, temporary=False, path=None, refresh_schedule=None, sql=None, sql_path=None, sink_type=None, sink_name=None, bootstrap_servers=None, topic=None, module_path=None, custom_sink_class=None, batch_handler=None, options=None)[source]¶
Bases:
BaseModelWrite target configuration for streaming tables, materialized views, and sinks.
- Parameters:
type (WriteTargetType)
catalog (str | None)
schema (str | None)
database (str | None)
table (str | None)
create_table (bool)
comment (str | None)
table_schema (str | None)
row_filter (str | None)
temporary (bool)
path (str | None)
refresh_schedule (str | None)
sql (str | None)
sql_path (str | None)
sink_type (str | None)
sink_name (str | None)
bootstrap_servers (str | None)
topic (str | None)
module_path (str | None)
custom_sink_class (str | None)
batch_handler (str | None)
- class lhp.models.config.Action(*, name, type, source=None, target=None, description=None, readMode=None, write_target=None, transform_type=None, sql=None, sql_path=None, operational_metadata=None, expectations_file=None, mode=None, quarantine=None, schema_inline=None, schema_file=None, enforcement=None, module_path=None, function_name=None, parameters=None, custom_datasource_class=None, once=None, test_type=None, on_violation=None, tolerance=None, columns=None, filter=None, reference=None, source_columns=None, reference_columns=None, required_columns=None, column=None, min_value=None, max_value=None, lookup_table=None, lookup_columns=None, lookup_result_columns=None, expectations=None, test_id=None)[source]¶
Bases:
BaseModel- Parameters:
name (str)
type (ActionType)
source (str | List[str | Dict[str, Any]] | Dict[str, Any] | None)
target (str | None)
description (str | None)
readMode (str | None)
write_target (WriteTarget | Dict[str, Any] | None)
transform_type (TransformType | None)
sql (str | None)
sql_path (str | None)
expectations_file (str | None)
mode (str | None)
quarantine (QuarantineConfig | None)
schema_inline (str | None)
schema_file (str | None)
enforcement (str | None)
module_path (str | None)
function_name (str | None)
custom_datasource_class (str | None)
once (bool | None)
test_type (str | None)
on_violation (str | None)
tolerance (int | None)
filter (str | None)
reference (str | None)
column (str | None)
min_value (Any | None)
max_value (Any | None)
lookup_table (str | None)
test_id (str | None)
- class lhp.models.config.FlowGroup(*, pipeline, flowgroup, job_name=None, variables=None, presets=[], use_template=None, template_parameters=None, actions=[], operational_metadata=None)[source]¶
Bases:
BaseModel
- class lhp.models.config.FlowGroupContext(flowgroup, source_yaml, synthetic=False, auxiliary_files=<factory>)[source]¶
Bases:
objectEnvelope carrying per-flowgroup provenance across the worker boundary.
- class lhp.models.config.Template(*, name, version='1.0', description=None, presets=[], parameters=[], actions=[])[source]¶
Bases:
BaseModel- Parameters:
- has_raw_actions()[source]¶
Check if template contains raw action dictionaries (not validated Action objects).
- Return type:
- get_actions_as_dicts()[source]¶
Get actions as dictionaries, converting from Action objects if needed.
- model_post_init(context, /)¶
This function is meant to behave like a BaseModel method to initialize private attributes.
It takes context as an argument since that’s what pydantic-core passes when calling it.
- Parameters:
self (BaseModel) – The BaseModel instance.
context (Any) – The context.
- Return type:
None
- class lhp.models.config.Preset(*, name, version='1.0', extends=None, description=None, defaults=None)[source]¶
Bases:
BaseModel
- class lhp.models.config.BlueprintParameter(*, name, required=False, default=None, description=None)[source]¶
Bases:
BaseModelA declared parameter on a blueprint, with optional default and required flag.
- class lhp.models.config.BlueprintFlowgroupSpec(*, pipeline, flowgroup, job_name=None, variables=None, presets=[], use_template=None, template_parameters=None, actions=[], operational_metadata=None)[source]¶
Bases:
BaseModelA flowgroup template inside a blueprint.
Same shape as FlowGroup, but pipeline and flowgroup are templates that contain %{var} placeholders resolved at expansion time against instance parameter values.
- class lhp.models.config.Blueprint(*, name, version='1.0', description=None, parameters=[], flowgroups)[source]¶
Bases:
BaseModelA reusable collection of flowgroups instantiated once per BlueprintInstance.
Distinguishing fields from a regular flowgroup file are parameters and flowgroups (the array of BlueprintFlowgroupSpec). looks_like_blueprint() keys on the presence of both fields together with the absence of actions.
- Parameters:
name (str)
version (str)
description (str | None)
parameters (List[BlueprintParameter])
flowgroups (List[BlueprintFlowgroupSpec])
- class lhp.models.config.BlueprintInstance(*, use_blueprint=None, parameters=None, overrides=None, blueprint=None)[source]¶
Bases:
BaseModelAn instance of a blueprint with concrete parameter values.
Two input shapes are accepted:
New (preferred):
use_blueprint:references the blueprint and a nestedparameters:block holds parameter values; an optionaloverrides:block is reserved for future use. This shape mirrors theuse_template:/template_parameters:pattern operators already know.Legacy (deprecated, removed in V0.9):
blueprint:plus flat top-level parameter keys. A deprecation warning is emitted once per file when this form is encountered.
A model_validator(mode=’before’) is the single normalization point: it converts both shapes into the canonical (use_blueprint, parameters) form so all downstream code reads from one place. Mixing the two forms in the same file raises LHP-CFG-061.
- Parameters:
Errors and exceptions: lhp.utils.error_formatter¶
Catch LHPError for any LHP-originated failure. The concrete subclasses
disambiguate configuration, validation, and I/O failures and carry an error
code that maps to entries in the error reference.
Error formatter for user-friendly error messages.
- class lhp.utils.error_formatter.ErrorCategory(*values)[source]¶
Bases:
EnumError categories with prefixes.
- exception lhp.utils.error_formatter.LHPError(category, code_number, title, details, suggestions=None, example=None, context=None, doc_link=None)[source]¶
Bases:
ExceptionUser-friendly error with formatting support.
Base class for all LHP-specific exceptions. Subclasses use dual inheritance (e.g.
LHPValidationError(LHPError, ValueError)) so that existingexcept ValueErrorhandlers still catch them.- Parameters:
- classmethod from_worker_exception(*, pipeline_name, error_type, error_message, error_traceback)[source]¶
Reconstruct a worker-side exception as an LHPError on the main thread.
Used by the parallel pipeline executor to surface worker failures through the project’s unified error type without losing the worker’s original exception type or chained traceback. The full traceback is preserved on
worker_tracebackso log handlers and CI surfaces can capture it;worker_error_typecarries the original exception class name for structured error classification.
- exception lhp.utils.error_formatter.LHPValidationError(category, code_number, title, details, suggestions=None, example=None, context=None, doc_link=None)[source]¶
Bases:
LHPError,ValueErrorLHPError subclass that is also a ValueError.
Use when replacing a bare ValueError so that existing
except ValueErrorhandlers continue to catch it.
- exception lhp.utils.error_formatter.LHPConfigError(category, code_number, title, details, suggestions=None, example=None, context=None, doc_link=None)[source]¶
Bases:
LHPError,ValueErrorLHPError subclass for configuration errors.
Also a ValueError for backward compatibility with existing
except ValueErrorhandlers.
- exception lhp.utils.error_formatter.LHPFileError(category, code_number, title, details, suggestions=None, example=None, context=None, doc_link=None)[source]¶
Bases:
LHPError,FileNotFoundErrorLHPError subclass that is also a FileNotFoundError.
Use when replacing a bare FileNotFoundError so that existing
except FileNotFoundErrorhandlers continue to catch it.
Substitution (Experimental): lhp.utils.substitution¶
Token, local variable, and secret substitution. Substitutions are applied in
this order: %{local_var} → {{ template_param }} → ${env_token} →
${secret:scope/key}. The bare-braces {token} form is deprecated.
Enhanced token and secret substitution for LakehousePlumber.
- class lhp.utils.substitution.EnhancedSubstitutionManager(substitution_file=None, env='dev', skip_validation=False)[source]¶
Bases:
objectEnhanced substitution manager with YAML and secret support.
- validate_no_unresolved_tokens(data, path='config')[source]¶
Detect unresolved tokens after substitution.
Scans configuration for any remaining {token} patterns that weren’t resolved during substitution, indicating missing values in substitutions file.
- Parameters:
- Returns:
List of error messages describing unresolved tokens with their locations
- Return type:
Examples
>>> mgr = EnhancedSubstitutionManager() >>> mgr.mappings = {"catalog": "main"} >>> data = {"path": "s3://{bucket}/{missing}/data"} >>> errors = mgr.validate_no_unresolved_tokens(data) >>> print(errors[0]) "Unresolved token '{missing}' found at config.path. Check substitutions/dev.yaml"
- class lhp.utils.substitution.SecretReference(scope, key)[source]¶
Bases:
objectRepresents a secret reference with scope and key.
- to_dbutils_call()[source]¶
Generate a dbutils.secrets.get() call as a Python expression.
Single-quoted scope/key arguments so the call is safe to embed inside double-quoted string literals (e.g. inside JDBC URL templates) without breaking the outer quote nesting. This matches the format the legacy post-pass SecretCodeGenerator emitted before inline emission replaced it.
- Return type:
Templates (Experimental): lhp.core.template_engine¶
Template engine for LakehousePlumber YAML templates.
- class lhp.core.template_engine.TemplateEngine(templates_dir=None)[source]¶
Bases:
objectEngine for handling YAML templates with parameter expansion.
- Parameters:
templates_dir (Path)
Standalone validation (Experimental): lhp.core.validator¶
Configuration validator for LakehousePlumber.
- class lhp.core.validator.ConfigValidator(project_root=None, project_config=None)[source]¶
Bases:
objectValidate LakehousePlumber configurations.
- validate_table_creation_rules(flowgroups)[source]¶
Validate table creation rules across the entire pipeline.
Delegates to TableCreationValidator for the actual validation logic.
- validate_cdc_fanin_compatibility(flowgroups)[source]¶
Validate compatibility across CDC actions sharing a target.
Delegates to CdcFanInCompatibilityValidator. Mismatches on shared fields surface as
LHPConfigError(raised from the delegate); mode-mixing between CDC and non-CDC contributors returns plain error strings in the result list.
Version (Experimental): lhp.utils.version¶
Version utilities for LakehousePlumber.
Notes on usage¶
All public classes log through
logging.getLogger("lhp.<module>"). Configure thelhplogger to control LHP output independently of your application’s logging.LHP raises
LHPError(or a subclass) for every recoverable failure. Unexpected exceptions indicate a bug; file an issue with the traceback.Pydantic v2 powers
lhp.models.config. Treat the models as immutable once constructed; usemodel_copy(update=...)for derived instances.ActionOrchestratoris not thread-safe. Construct one instance per pipeline run.CachingYAMLParseris thread-safe and shared internally.
See also¶
CLI Reference — command-line reference. Most users should drive LHP through the CLI rather than the Python API.
Architecture — design rationale, service decomposition, and the reasoning behind the orchestrator’s split into discovery, processing, generation, and validation services.
Migrate from DLT to LHP — how to move an existing Delta Live Tables project to LHP. Uses the CLI by default and the Python API for advanced cases.