API Reference

The Lakehouse Plumber (LHP) Python API exposes the same generation engine that powers the lhp command-line interface. Import it to build custom tooling: notebook integrations, CI/CD wrappers, programmatic validation in tests, or embedding LHP inside a larger orchestrator. For day-to-day pipeline authoring, prefer the CLI — the Python API trades convenience for control.

This page catalogs the public surface by stability tier and renders live docstrings via Sphinx autodoc. The tier table is the contract; the autodoc sections below it are the source-of-truth signatures pulled from the code at build time.

Stability tiers

LHP follows a four-tier stability model. The tier governs what level of change to expect between minor and major versions.

Tier

Change policy

Definition

Stable

Semantic versioning; breaking changes only at major releases.

Public API in active use by external integrations. Method signatures, return shapes, and observable behavior are versioned.

Beta

May change between minor versions with a deprecation cycle of at least one release.

Public API still settling. Suitable for production use if you pin LHP to a minor version.

Experimental

May change without notice at any release, including patch releases.

Public API exposed for evaluation. Do not depend on it from long-lived code.

Internal

May change at any time. Not part of the API contract.

Implementation detail. Importing it from outside lhp.* is unsupported even when the import path resolves.

If a symbol is not listed in this page, treat it as Internal regardless of its import path.

Changed in version 0.8.7: This page replaces the prior alpha disclaimer with explicit per-module stability tiers. The CLI surface remains Stable; the Python API is Beta except where noted.

Public API by tier

Stable

No symbol is currently Stable. LHP is pre-1.0; the CLI is the stable contract. Python API consumers should treat the Beta tier as the recommended entry point and pin to a minor version of LHP.

Beta

These symbols are the recommended entry points for programmatic use.

Symbol

Summary

lhp.core.orchestrator.ActionOrchestrator

Drives end-to-end pipeline generation: discovery, expansion, validation, code generation, and bundle synchronization.

lhp.parsers.yaml_parser.YAMLParser

Parses and validates LHP YAML files into Pydantic models. Use to load FlowGroups, Presets, and Templates without invoking the full pipeline.

lhp.parsers.yaml_parser.CachingYAMLParser

Thread-safe caching wrapper around YAMLParser. Suitable when parsing the same files repeatedly within one process.

lhp.presets.preset_manager.PresetManager

Loads presets from a directory and resolves inheritance chains.

lhp.bundle.manager.BundleManager

Synchronizes Databricks Asset Bundle (DAB) resource files with generated pipeline code.

lhp.models.config

Pydantic models for the on-disk YAML schema: FlowGroup, Action, Preset, Template, ProjectConfig, Blueprint, and the associated enumerations (ActionType, LoadSourceType, TransformType, WriteTargetType, TestActionType).

lhp.utils.error_formatter

Public exception hierarchy: LHPError, LHPConfigError, LHPValidationError, LHPFileError. Catch these to handle LHP failures programmatically.

Experimental

These symbols are exposed but may change without notice.

Symbol

Summary

lhp.utils.substitution.EnhancedSubstitutionManager

Resolves ${token}, %{local_var}, and ${secret:scope/key} substitutions. Signature subject to revision while secret handling evolves.

lhp.utils.substitution.SecretReference

Value class representing a parsed ${secret:scope/key} reference.

lhp.core.template_engine.TemplateEngine

Loads and renders Jinja2 FlowGroup templates. Public for advanced template authors; the parameter surface may grow.

lhp.core.validator.ConfigValidator

Standalone validator used by lhp validate. Useful for embedding configuration checks in CI without generating code.

lhp.utils.version.get_version

Returns the installed LHP package version string.

Internal

Everything else under lhp.* is Internal. The following namespaces are explicitly off-limits to external code:

  • lhp.core.services.* — service decomposition used by the orchestrator.

  • lhp.core.commands.* — internal command pattern dispatch.

  • lhp.core.factories, lhp.core.layers, lhp.core.dependency_resolver, lhp.core.parallel_processor — wiring and execution helpers.

  • lhp.generators.* — per-action code generators. Use the orchestrator, not generators directly.

  • lhp.cli.* — Click command implementations. Use the lhp shell entry point or invoke Click’s CliRunner against lhp.cli.main:cli.

  • lhp.utils.smart_file_writer, lhp.utils.performance_timer, lhp.utils.formatter, lhp.utils.source_extractor, and every other lhp.utils.* module not listed under Beta or Experimental above.

  • lhp.schemas.* — JSON Schema generation for IDE integration.

  • lhp.templates.* and lhp.resources.* — packaged Jinja2 templates and static assets.

Warning

Importing from an Internal namespace works today but provides no guarantees. Patch releases may move, rename, or remove these symbols without a deprecation notice.

Detailed reference

The sections below render docstrings, type hints, and signatures directly from the source code. Refer to the tier table above for stability guarantees on each symbol.

Package root: lhp

Lakehouse Plumber - YAML-driven framework for Databricks Lakeflow Spark Declarative Pipelines.

Orchestrator: lhp.core.orchestrator

Main orchestration for LakehousePlumber pipeline generation.

class lhp.core.orchestrator.ActionOrchestrator(project_root, enforce_version=True, dependencies=None, pipeline_config_path=None, max_workers=None)[source]

Bases: object

Main orchestration for pipeline generation (Service-based architecture).

Implements the business layer interface and coordinates specialized services for discovery, processing, generation, and validation while maintaining the same public API for backward compatibility.

Parameters:
  • project_root (Path)

  • enforce_version (bool)

  • dependencies (OrchestrationDependencies)

  • pipeline_config_path (str | None)

  • max_workers (int | None)

property cached_yaml_parser: CachingYAMLParser

Public accessor for the shared CachingYAMLParser instance.

get_include_patterns()[source]

Get include patterns from project configuration.

Returns:

List of include patterns, or empty list if none specified

Return type:

List[str]

validate_configuration(pipeline_identifier, env)[source]

Validate configuration based on business rules.

Parameters:
  • pipeline_identifier (str)

  • env (str)

Return type:

tuple

discover_flowgroups(pipeline_dir)[source]

Discover all flowgroups in a specific pipeline directory.

Parameters:

pipeline_dir (Path) – Directory containing flowgroup YAML files

Returns:

List of discovered flowgroups

Return type:

List[FlowGroup]

discover_all_flowgroups()[source]

Discover all flowgroups across all directories in the project.

Combines three sources, in order:
  1. Disk-sourced flowgroups via FlowgroupDiscoverer (existing).

  2. Synthetic flowgroups expanded from blueprints x instances. Once expanded, these are indistinguishable from disk-sourced flowgroups for downstream code paths (Step 0.5 -> Step 5).

  3. Synthetic monitoring flowgroup (existing) when monitoring is configured.

Side effects:
  • Populates self._blueprint_provenance with the expansion’s (pipeline, flowgroup) -> BlueprintProvenance mapping. Used by the state dependency resolver, dependency tracker, and source-path index to handle instance-file changes correctly.

  • Populates self._synthetic_contexts with FlowGroupContext envelopes for synthetic flowgroups (blueprint-expanded + monitoring).

  • Stores the full MonitoringBuildResult in self._monitoring_result.

Returns:

List of all discovered flowgroups (regular + synthetic + monitoring).

Return type:

List[FlowGroup]

finalize_monitoring_artifacts(env, output_dir)[source]

Reconcile monitoring artifacts: clean stale, write current.

Called AFTER the pipeline generation loop. Handles all transitions: - Monitoring added: write notebook + job - Monitoring removed: clean up notebook + job - Pipeline renamed: old artifacts removed, new ones written - MVs added/removed: job updated (notebook-only vs full)

Parameters:
  • env (str) – Environment name

  • output_dir (Path) – Base output directory (e.g. generated/dev)

Return type:

None

discover_flowgroups_by_pipeline_field(pipeline_field, pre_discovered_all_flowgroups=None)[source]

Discover all flowgroups with a specific pipeline field across all directories.

Parameters:
  • pipeline_field (str) – The pipeline field value to search for

  • pre_discovered_all_flowgroups (List[FlowGroup] | None) – If provided, filter from this list instead of running a new discovery scan.

Returns:

List of flowgroups with the specified pipeline field

Return type:

List[FlowGroup]

validate_duplicate_pipeline_flowgroup_combinations(flowgroups)[source]

Validate that there are no duplicate pipeline+flowgroup combinations.

Parameters:

flowgroups (List[FlowGroup]) – List of flowgroups to validate

Raises:

ValueError – If duplicate combinations are found

Return type:

None

generate_pipeline_by_field(pipeline_field, env, output_dir=None, specific_flowgroups=None, include_tests=False, pre_discovered_all_flowgroups=None)[source]

Thin shim — delegate to the plural generate_pipelines_by_fields().

On failure the plural method re-raises the original exception unchanged for the single-pipeline case, so callers that catch specific LHPError subclasses continue to work.

Parameters:
Return type:

tuple[str, …]

generate_pipelines_by_fields(*, pipeline_fields, env, output_dir, specific_flowgroups=None, include_tests=False, pre_discovered_all_flowgroups=None, max_workers=None, on_pipeline_complete=None)[source]

Run one worker per pipeline; aggregate results on the main thread.

Each worker owns the full pipeline pass (Phase A discovery + Phase B validation/file writes). The main thread is a pure aggregator.

Parameters:
  • pipeline_fields (Sequence[str]) – Pipeline names to generate. Order is preserved in the returned mapping; on_pipeline_complete fires in completion order (not input order).

  • env (str) – Environment name (e.g. "dev").

  • output_dir (Path | None) – Root output directory (output_dir / <pipeline>) or None for dry-run.

  • specific_flowgroups (List[str] | None) – Optional whitelist of flowgroup names.

  • include_tests (bool) – Include test actions in the generated code.

  • pre_discovered_all_flowgroups (List[FlowGroup] | None) – Re-use the caller’s one-shot discovery instead of re-discovering.

  • max_workers (int | None) – Pool size override. None falls back to self.max_workers.

  • on_pipeline_complete (Callable[[PipelineDelta], None] | None) – Optional main-thread callback fired once per pipeline with the PipelineDelta. Callback exceptions are logged but do not abort the batch.

Returns:

Mapping of {pipeline_name: {generated_path: code}} for successful pipelines. Failed pipelines are absent from the dict — the aggregate LHPError raised at the end captures their errors.

Raises:
  • LHPError – When a single pipeline failed; reconstructed via LHPError.from_worker_exception() so the worker’s error type and full traceback survive.

  • LHPValidationError – When multiple pipelines failed; one aggregate error listing every failure.

Return type:

Dict[str, tuple[str, …]]

process_flowgroup(flowgroup, substitution_mgr, include_tests=True)[source]

Process flowgroup: expand templates, apply presets, apply substitutions.

Backward-compatible shim around FlowgroupProcessor.process_flowgroup(), which now takes/returns FlowGroupContext. This shim wraps the FlowGroup in a default-empty context and returns just the processed FlowGroup for callers that don’t care about provenance.

Parameters:
  • flowgroup (FlowGroup) – FlowGroup to process

  • substitution_mgr (EnhancedSubstitutionManager) – Substitution manager for the environment

  • include_tests (bool) – If False, filter out test actions before processing. Defaults to True for backward compatibility.

Returns:

Processed flowgroup

Return type:

FlowGroup

generate_flowgroup_code(flowgroup, substitution_mgr, output_dir=None, source_yaml=None, env=None, include_tests=False, python_file_copier=None, phase_a_records=None)[source]

Generate complete Python code for a flowgroup.

Parameters:
  • flowgroup (FlowGroup) – FlowGroup to generate code for

  • substitution_mgr (EnhancedSubstitutionManager) – Substitution manager for the environment

  • output_dir (Path | None) – Output directory for generated files

  • source_yaml (Path | None) – Source YAML path for file tracking

  • env (str | None) – Environment name for file tracking

  • include_tests (bool) – Whether to include test actions

  • python_file_copier – Thread-safe Python file copier (for parallel mode)

  • phase_a_records (List[CopiedModuleRecord] | None) – Optional list passed by Phase A workers in the cross-pipeline flat pool; when supplied, the file copier appends CopiedModuleRecord entries to it instead of writing to disk. Phase B replays those records.

Returns:

Complete Python code for the flowgroup

Return type:

str

determine_action_subtype(action)[source]

Determine the sub-type of an action for generator selection.

Parameters:

action (Action) – Action to determine sub-type for

Returns:

Sub-type string for generator selection

Return type:

str

group_write_actions_by_target(write_actions)[source]

Group write actions by their target table.

Parameters:

write_actions (List[Action]) – List of write actions

Returns:

Dictionary mapping target table names to lists of actions

Return type:

Dict[str, List[Action]]

create_combined_write_action(actions, target_table)[source]

Create a combined write action with individual action metadata preserved.

Parameters:
  • actions (List[Action]) – List of write actions targeting the same table

  • target_table (str) – Full target table name

Returns:

Combined action with individual action metadata

Return type:

Action

validate_pipeline_by_field(pipeline_field, env, include_tests=True, pre_discovered_all_flowgroups=None)[source]

Thin shim — delegate to the plural validate_pipelines_by_fields().

Preserves the legacy (errors, warnings) tuple return so the existing ValidateCommand per-pipeline loop and LakehousePlumberApplicationFacade.validate_pipeline keep working unchanged.

Parameters:
Return type:

Tuple[List[str], List[str]]

validate_pipelines_by_fields(*, pipeline_fields, env, include_tests=True, pre_discovered_all_flowgroups=None, max_workers=None, on_pipeline_complete=None)[source]

Flat-pool validate across multiple pipelines.

Mirrors generate_pipelines_by_fields() but simpler: no state save, no Phase B replay, no file writes. Phase A workers call process_flowgroup() (which runs schema + reference + action validation); Phase B per pipeline runs ConfigValidator.validate_cdc_fanin_compatibility() as the post-barrier cross-flowgroup check.

Parameters:
  • pipeline_fields (Sequence[str]) – Pipeline names to validate. Outcomes are returned in input order (stable for display).

  • env (str) – Environment name.

  • include_tests (bool) – When False, test actions are filtered before processing (matches single-pipeline shim default of True).

  • pre_discovered_all_flowgroups (List[FlowGroup] | None) – Re-use caller’s one-shot discovery; if None, runs discover_all_flowgroups().

  • max_workers (int | None) – Process-pool size; falls back through self.max_workers_auto_max_workers() (~80% of detected CPU count, honoring cgroup limits on Linux).

  • on_pipeline_complete (Callable[[PipelineValidationOutcome], None] | None) – Optional callback fired once per pipeline (main thread, completion order).

Returns:

List of PipelineValidationOutcome, one per input pipeline, in input order.

Return type:

List[PipelineValidationOutcome]

YAML parsing: lhp.parsers.yaml_parser

class lhp.parsers.yaml_parser.YAMLParser[source]

Bases: object

Parse and validate YAML configuration files.

parse_file(file_path)[source]

Parse a single YAML file.

Parameters:

file_path (Path)

Return type:

Dict[str, Any]

parse_flowgroups_from_file(file_path)[source]

Parse one or more FlowGroups from a YAML file.

Supports both multi-document syntax (—) and flowgroups array syntax.

Parameters:

file_path (Path) – Path to YAML file containing one or more flowgroups

Returns:

List of FlowGroup objects

Raises:

ValueError – For duplicate flowgroup names, mixed syntax, or parsing errors

Return type:

List[FlowGroup]

parse_flowgroup(file_path)[source]

Parse a FlowGroup YAML file.

Note: This method only supports single-flowgroup files. If the file contains multiple flowgroups (via — separator or flowgroups array), use parse_flowgroups_from_file() instead.

Parameters:

file_path (Path)

Return type:

FlowGroup

parse_template_raw(file_path)[source]

Parse a Template YAML file with raw actions (no Action object creation).

This is used during template loading to avoid validation of template syntax like {{ table_properties }}. Actions will be validated later during rendering when actual parameter values are available.

Parameters:

file_path (Path)

Return type:

Template

parse_preset(file_path)[source]

Parse a Preset YAML file.

Parameters:

file_path (Path)

Return type:

Preset

discover_presets(presets_dir)[source]

Discover all Preset files.

Parameters:

presets_dir (Path)

Return type:

List[Preset]

class lhp.parsers.yaml_parser.CachingYAMLParser(base_parser=None, max_cache_size=500)[source]

Bases: object

Thread-safe caching wrapper for YAMLParser.

Uses file path + modification time as cache key to automatically invalidate cache when files change.

Parameters:
parse_flowgroups_from_file(path)[source]

Parse flowgroups with caching based on file mtime.

Parameters:

path (Path)

Return type:

List[FlowGroup]

load_documents_all(path, error_context=None)[source]

Load all YAML documents from a file with mtime-keyed caching.

Hit/miss counters are shared with the flowgroup sub-cache; get_cache_stats() reports per-sub-cache sizes separately.

Parameters:
  • path (Path)

  • error_context (str | None)

Return type:

List[Dict[str, Any]]

clear_cache()[source]

Clear all cached entries across both sub-caches.

Return type:

None

get_cache_stats()[source]

Get cache performance statistics.

Hit/miss counters are aggregated across both sub-caches (flowgroup parsing and raw-document loading). Sub-cache sizes are reported separately for visibility.

Returns:

Dictionary with cache hits, misses, hit rate, and per-sub-cache sizes

Return type:

Dict[str, Any]

Presets: lhp.presets.preset_manager

Preset management with hierarchical inheritance.

class lhp.presets.preset_manager.PresetManager(presets_dir)[source]

Bases: object

Manages preset loading and inheritance resolution.

Parameters:

presets_dir (Path)

resolve_preset_chain(preset_names)[source]

Resolve a chain of presets with inheritance.

Parameters:

preset_names (List[str])

Return type:

Dict[str, Any]

get_preset(preset_name)[source]

Get a preset by name.

Parameters:

preset_name (str)

Return type:

Preset | None

list_presets()[source]

List all available preset names.

Return type:

List[str]

get_operational_metadata_selection(preset_names)[source]

Get operational metadata selection from resolved preset chain.

Parameters:

preset_names (List[str]) – List of preset names to resolve

Returns:

Operational metadata selection (bool, list, or None)

Return type:

bool | List[str] | None

validate_operational_metadata_references(preset_names, available_columns)[source]

Validate that operational metadata references in presets are valid.

Parameters:
  • preset_names (List[str]) – List of preset names to validate

  • available_columns (set) – Set of available column names from project config

Returns:

List of validation errors (empty if valid)

Return type:

List[str]

Asset Bundle integration: lhp.bundle.manager

Bundle manager for LHP Databricks Asset Bundle integration.

This module provides the main BundleManager class that coordinates bundle resource operations including resource file synchronization and management.

class lhp.bundle.manager.BundleManager(project_root, pipeline_config_path=None, project_config=None)[source]

Bases: object

Manages Databricks Asset Bundle resource files.

Wipe-and-regenerate contract: resources/lhp/ is fully wiped by the CLI before sync, and BundleManager re-renders one resource file per pipeline under generated/<env>/. databricks.yml is never read or mutated. Catalog and schema must come from pipeline_config.yaml (per-pipeline or via the top-level project_defaults block).

Parameters:
  • project_root (Path | str)

  • pipeline_config_path (str | None)

  • project_config (Any | None)

sync_resources_with_generated_files(output_dir, env)[source]

Write one bundle resource file per current pipeline directory.

Wipe-and-regenerate contract: callers must clear resources/lhp/ before invoking this method. BundleManager only writes the resource files for the pipelines that exist under output_dir — it does not preserve, back up, or delete any pre-existing files.

Parameters:
  • output_dir (Path) – Directory containing generated Python files

  • env (str) – Environment name for template processing

Returns:

Number of resource files written

Raises:

BundleResourceError – If synchronization fails

Return type:

int

ensure_resources_directory()[source]

Create resources/lhp directory if it doesn’t exist.

get_pipeline_directories(output_dir)[source]

Get list of pipeline directories in the output directory.

Parameters:

output_dir (Path) – Directory to scan for pipeline directories

Returns:

List of pipeline directory paths in sorted order

Raises:

BundleResourceError – If directory access fails

Return type:

List[Path]

get_resource_file_path(pipeline_name)[source]

Return the path where this pipeline’s resource file is written.

Under the wipe-and-regenerate contract, resources/lhp/ is empty when sync starts and BundleManager always writes the canonical <pipeline>.pipeline.yml filename, so no existence probing is needed.

Parameters:

pipeline_name (str)

Return type:

Path

generate_resource_file_content(pipeline_name, output_dir, env)[source]

Generate content for a bundle resource file using Jinja2 template.

Applies LHP token substitution to ALL fields in pipeline_config.yaml, enabling environment-specific configuration for node types, policies, emails, and all other pipeline settings. Catalog and schema MUST be defined in pipeline_config.yaml (either per-pipeline or via the top-level project_defaults block); they are never read from databricks.yml.

Catalog/schema validation is performed upstream by bundle.preflight.validate_catalog_schema before any wipes occur. The guard here is a defense-in-depth assertion that fires only if a non-CLI caller invokes this method without running preflight first.

Parameters:
  • pipeline_name (str) – Name of the pipeline

  • output_dir (Path) – Output directory

  • env (str) – Environment name for token resolution (REQUIRED)

Returns:

YAML content for the resource file with fully substituted pipeline config

Raises:

LHPConfigErrorLHP-GEN-001 if preflight was bypassed and catalog/schema is still missing/empty at the bundle-write phase.

Return type:

str

Configuration models: lhp.models.config

Pydantic models that mirror the on-disk YAML schema. Import these to type your own loaders or to construct configurations programmatically.

class lhp.models.config.ActionType(*values)[source]

Bases: str, Enum

class lhp.models.config.TestActionType(*values)[source]

Bases: str, Enum

Types of test actions available.

class lhp.models.config.ViolationAction(*values)[source]

Bases: str, Enum

Actions to take when test expectations are violated.

class lhp.models.config.LoadSourceType(*values)[source]

Bases: str, Enum

class lhp.models.config.TransformType(*values)[source]

Bases: str, Enum

class lhp.models.config.DQMode(*values)[source]

Bases: str, Enum

Data quality enforcement modes.

class lhp.models.config.WriteTargetType(*values)[source]

Bases: str, Enum

class lhp.models.config.MetadataColumnConfig(*, expression, description=None, applies_to=['streaming_table', 'materialized_view'], additional_imports=None, enabled=True)[source]

Bases: BaseModel

Configuration for a single metadata column.

Parameters:
class lhp.models.config.MetadataPresetConfig(*, columns, description=None)[source]

Bases: BaseModel

Configuration for a metadata column preset.

Parameters:
class lhp.models.config.OperationalMetadataSelection(*, enabled=True, preset=None, columns=None, include_columns=None, exclude_columns=None)[source]

Bases: BaseModel

Operational metadata selection configuration (used in flowgroups/actions/presets).

Parameters:
class lhp.models.config.QuarantineConfig(*, dlq_table, source_table)[source]

Bases: BaseModel

Configuration for quarantine mode in data quality transforms.

Parameters:
  • dlq_table (str)

  • source_table (str)

class lhp.models.config.ProjectOperationalMetadataConfig(*, columns, presets=None, defaults=None)[source]

Bases: BaseModel

Project-level operational metadata configuration (definitions only).

Parameters:
class lhp.models.config.EventLogConfig(*, enabled=True, catalog=None, schema=None, name_prefix='', name_suffix='')[source]

Bases: BaseModel

Project-level event log configuration for pipeline resource generation.

Parameters:
  • enabled (bool)

  • catalog (str | None)

  • schema (str | None)

  • name_prefix (str)

  • name_suffix (str)

class lhp.models.config.MonitoringMaterializedViewConfig(*, name, sql=None, sql_path=None)[source]

Bases: BaseModel

Configuration for a single monitoring materialized view.

Parameters:
  • name (str)

  • sql (str | None)

  • sql_path (str | None)

class lhp.models.config.MonitoringConfig(*, enabled=True, pipeline_name=None, catalog=None, schema=None, streaming_table='all_pipelines_event_log', checkpoint_path='', job_config_path=None, max_concurrent_streams=10, materialized_views=None, enable_job_monitoring=False)[source]

Bases: BaseModel

Project-level monitoring pipeline configuration.

Generates two artifacts:

  1. A standalone notebook that runs N independent streaming queries (one per pipeline event log) appending into a user-created Delta table.

  2. A DLT pipeline with materialized views only, reading from that Delta table.

A Databricks Workflow job chains: notebook_task (union) → pipeline_task (MVs).

Parameters:
class lhp.models.config.TestReportingConfig(*, module_path, function_name, config_file=None)[source]

Bases: BaseModel

Configuration for test result reporting to external systems.

Parameters:
  • module_path (str)

  • function_name (str)

  • config_file (str | None)

class lhp.models.config.ProjectConfig(*, name, version='1.0', description=None, author=None, created_date=None, include=None, blueprint_include=None, instance_include=None, operational_metadata=None, event_log=None, monitoring=None, required_lhp_version=None, test_reporting=None)[source]

Bases: BaseModel

Project-level configuration loaded from lhp.yaml.

Parameters:
class lhp.models.config.WriteTarget(*, type, catalog=None, schema=None, database=None, table=None, create_table=True, comment=None, table_properties=None, partition_columns=None, cluster_columns=None, spark_conf=None, table_schema=None, row_filter=None, temporary=False, path=None, refresh_schedule=None, sql=None, sql_path=None, sink_type=None, sink_name=None, bootstrap_servers=None, topic=None, module_path=None, custom_sink_class=None, batch_handler=None, options=None)[source]

Bases: BaseModel

Write target configuration for streaming tables, materialized views, and sinks.

Parameters:
  • type (WriteTargetType)

  • catalog (str | None)

  • schema (str | None)

  • database (str | None)

  • table (str | None)

  • create_table (bool)

  • comment (str | None)

  • table_properties (Dict[str, Any] | None)

  • partition_columns (List[str] | None)

  • cluster_columns (List[str] | None)

  • spark_conf (Dict[str, Any] | None)

  • table_schema (str | None)

  • row_filter (str | None)

  • temporary (bool)

  • path (str | None)

  • refresh_schedule (str | None)

  • sql (str | None)

  • sql_path (str | None)

  • sink_type (str | None)

  • sink_name (str | None)

  • bootstrap_servers (str | None)

  • topic (str | None)

  • module_path (str | None)

  • custom_sink_class (str | None)

  • batch_handler (str | None)

  • options (Dict[str, Any] | None)

class lhp.models.config.Action(*, name, type, source=None, target=None, description=None, readMode=None, write_target=None, transform_type=None, sql=None, sql_path=None, operational_metadata=None, expectations_file=None, mode=None, quarantine=None, schema_inline=None, schema_file=None, enforcement=None, module_path=None, function_name=None, parameters=None, custom_datasource_class=None, once=None, test_type=None, on_violation=None, tolerance=None, columns=None, filter=None, reference=None, source_columns=None, reference_columns=None, required_columns=None, column=None, min_value=None, max_value=None, lookup_table=None, lookup_columns=None, lookup_result_columns=None, expectations=None, test_id=None)[source]

Bases: BaseModel

Parameters:
property resolved_test_target: str

explicit target or tmp_test_{name}.

Type:

Canonical target name for test actions

model_post_init(_Action__context)[source]

Post-initialization processing - normalize all path fields for cross-platform compatibility.

Parameters:

_Action__context (Any)

Return type:

None

class lhp.models.config.FlowGroup(*, pipeline, flowgroup, job_name=None, variables=None, presets=[], use_template=None, template_parameters=None, actions=[], operational_metadata=None)[source]

Bases: BaseModel

Parameters:
class lhp.models.config.FlowGroupContext(flowgroup, source_yaml, synthetic=False, auxiliary_files=<factory>)[source]

Bases: object

Envelope carrying per-flowgroup provenance across the worker boundary.

Parameters:
class lhp.models.config.Template(*, name, version='1.0', description=None, presets=[], parameters=[], actions=[])[source]

Bases: BaseModel

Parameters:
has_raw_actions()[source]

Check if template contains raw action dictionaries (not validated Action objects).

Return type:

bool

get_actions_as_dicts()[source]

Get actions as dictionaries, converting from Action objects if needed.

Return type:

List[Dict[str, Any]]

model_post_init(context, /)

This function is meant to behave like a BaseModel method to initialize private attributes.

It takes context as an argument since that’s what pydantic-core passes when calling it.

Parameters:
  • self (BaseModel) – The BaseModel instance.

  • context (Any) – The context.

Return type:

None

class lhp.models.config.Preset(*, name, version='1.0', extends=None, description=None, defaults=None)[source]

Bases: BaseModel

Parameters:
class lhp.models.config.BlueprintParameter(*, name, required=False, default=None, description=None)[source]

Bases: BaseModel

A declared parameter on a blueprint, with optional default and required flag.

Parameters:
  • name (str)

  • required (bool)

  • default (Any | None)

  • description (str | None)

class lhp.models.config.BlueprintFlowgroupSpec(*, pipeline, flowgroup, job_name=None, variables=None, presets=[], use_template=None, template_parameters=None, actions=[], operational_metadata=None)[source]

Bases: BaseModel

A flowgroup template inside a blueprint.

Same shape as FlowGroup, but pipeline and flowgroup are templates that contain %{var} placeholders resolved at expansion time against instance parameter values.

Parameters:
class lhp.models.config.Blueprint(*, name, version='1.0', description=None, parameters=[], flowgroups)[source]

Bases: BaseModel

A reusable collection of flowgroups instantiated once per BlueprintInstance.

Distinguishing fields from a regular flowgroup file are parameters and flowgroups (the array of BlueprintFlowgroupSpec). looks_like_blueprint() keys on the presence of both fields together with the absence of actions.

Parameters:
class lhp.models.config.BlueprintInstance(*, use_blueprint=None, parameters=None, overrides=None, blueprint=None)[source]

Bases: BaseModel

An instance of a blueprint with concrete parameter values.

Two input shapes are accepted:

  • New (preferred): use_blueprint: references the blueprint and a nested parameters: block holds parameter values; an optional overrides: block is reserved for future use. This shape mirrors the use_template: / template_parameters: pattern operators already know.

  • Legacy (deprecated, removed in V0.9): blueprint: plus flat top-level parameter keys. A deprecation warning is emitted once per file when this form is encountered.

A model_validator(mode=’before’) is the single normalization point: it converts both shapes into the canonical (use_blueprint, parameters) form so all downstream code reads from one place. Mixing the two forms in the same file raises LHP-CFG-061.

Parameters:
property blueprint_name: str

Normalized blueprint name (works for both input shapes).

parameter_values()[source]

Return the parameter values supplied in this instance file.

Return type:

Dict[str, Any]

Errors and exceptions: lhp.utils.error_formatter

Catch LHPError for any LHP-originated failure. The concrete subclasses disambiguate configuration, validation, and I/O failures and carry an error code that maps to entries in the error reference.

Error formatter for user-friendly error messages.

class lhp.utils.error_formatter.ErrorCategory(*values)[source]

Bases: Enum

Error categories with prefixes.

exception lhp.utils.error_formatter.LHPError(category, code_number, title, details, suggestions=None, example=None, context=None, doc_link=None)[source]

Bases: Exception

User-friendly error with formatting support.

Base class for all LHP-specific exceptions. Subclasses use dual inheritance (e.g. LHPValidationError(LHPError, ValueError)) so that existing except ValueError handlers still catch them.

Parameters:
classmethod from_worker_exception(*, pipeline_name, error_type, error_message, error_traceback)[source]

Reconstruct a worker-side exception as an LHPError on the main thread.

Used by the parallel pipeline executor to surface worker failures through the project’s unified error type without losing the worker’s original exception type or chained traceback. The full traceback is preserved on worker_traceback so log handlers and CI surfaces can capture it; worker_error_type carries the original exception class name for structured error classification.

Parameters:
  • pipeline_name (str)

  • error_type (str)

  • error_message (str)

  • error_traceback (str)

Return type:

LHPError

exception lhp.utils.error_formatter.LHPValidationError(category, code_number, title, details, suggestions=None, example=None, context=None, doc_link=None)[source]

Bases: LHPError, ValueError

LHPError subclass that is also a ValueError.

Use when replacing a bare ValueError so that existing except ValueError handlers continue to catch it.

Parameters:
exception lhp.utils.error_formatter.LHPConfigError(category, code_number, title, details, suggestions=None, example=None, context=None, doc_link=None)[source]

Bases: LHPError, ValueError

LHPError subclass for configuration errors.

Also a ValueError for backward compatibility with existing except ValueError handlers.

Parameters:
exception lhp.utils.error_formatter.LHPFileError(category, code_number, title, details, suggestions=None, example=None, context=None, doc_link=None)[source]

Bases: LHPError, FileNotFoundError

LHPError subclass that is also a FileNotFoundError.

Use when replacing a bare FileNotFoundError so that existing except FileNotFoundError handlers continue to catch it.

Parameters:

Substitution (Experimental): lhp.utils.substitution

Token, local variable, and secret substitution. Substitutions are applied in this order: %{local_var}{{ template_param }}${env_token}${secret:scope/key}. The bare-braces {token} form is deprecated.

Enhanced token and secret substitution for LakehousePlumber.

class lhp.utils.substitution.EnhancedSubstitutionManager(substitution_file=None, env='dev', skip_validation=False)[source]

Bases: object

Enhanced substitution manager with YAML and secret support.

Parameters:
  • substitution_file (Path)

  • env (str)

  • skip_validation (bool)

substitute_yaml(data)[source]

Recursively substitute tokens and collect secret references.

Parameters:

data (Dict[str, Any])

Return type:

Dict[str, Any]

validate_no_unresolved_tokens(data, path='config')[source]

Detect unresolved tokens after substitution.

Scans configuration for any remaining {token} patterns that weren’t resolved during substitution, indicating missing values in substitutions file.

Parameters:
  • data (Any) – Configuration data to validate (dict, list, str, or other)

  • path (str) – Current path in config tree for error reporting

Returns:

List of error messages describing unresolved tokens with their locations

Return type:

List[str]

Examples

>>> mgr = EnhancedSubstitutionManager()
>>> mgr.mappings = {"catalog": "main"}
>>> data = {"path": "s3://{bucket}/{missing}/data"}
>>> errors = mgr.validate_no_unresolved_tokens(data)
>>> print(errors[0])
"Unresolved token '{missing}' found at config.path. Check substitutions/dev.yaml"
class lhp.utils.substitution.SecretReference(scope, key)[source]

Bases: object

Represents a secret reference with scope and key.

Parameters:
to_dbutils_call()[source]

Generate a dbutils.secrets.get() call as a Python expression.

Single-quoted scope/key arguments so the call is safe to embed inside double-quoted string literals (e.g. inside JDBC URL templates) without breaking the outer quote nesting. This matches the format the legacy post-pass SecretCodeGenerator emitted before inline emission replaced it.

Return type:

str

Templates (Experimental): lhp.core.template_engine

Template engine for LakehousePlumber YAML templates.

class lhp.core.template_engine.TemplateEngine(templates_dir=None)[source]

Bases: object

Engine for handling YAML templates with parameter expansion.

Parameters:

templates_dir (Path)

get_template(template_name)[source]

Get a template by name.

Parameters:

template_name (str) – Name of the template

Returns:

Template model or None if not found

Return type:

Template | None

render_template(template_name, parameters)[source]

Implement template parameter handling.

Render a template with given parameters, returning expanded actions.

Parameters:
  • template_name (str) – Name of the template to render

  • parameters (Dict[str, Any]) – Parameters to apply to the template

Returns:

List of actions with parameters expanded

Return type:

List[Action]

list_templates()[source]

List all available template names.

Return type:

List[str]

get_template_info(template_name)[source]

Get information about a template including parameters.

Parameters:

template_name (str)

Return type:

Dict[str, Any]

Standalone validation (Experimental): lhp.core.validator

Configuration validator for LakehousePlumber.

class lhp.core.validator.ConfigValidator(project_root=None, project_config=None)[source]

Bases: object

Validate LakehousePlumber configurations.

validate_flowgroup(flowgroup)[source]

Validate flowgroups and actions.

Parameters:

flowgroup (FlowGroup) – FlowGroup to validate

Returns:

List of validation errors (strings or LHPError objects)

Return type:

List[str | LHPError]

validate_action(action, index)[source]

Validate action types and required fields.

Parameters:
  • action (Action) – Action to validate

  • index (int) – Action index in the flowgroup

Returns:

List of validation errors (strings or LHPError objects)

Return type:

List[str | LHPError]

validate_action_references(actions)[source]

Validate that all action references are valid.

Parameters:

actions (List[Action])

Return type:

List[str]

validate_table_creation_rules(flowgroups)[source]

Validate table creation rules across the entire pipeline.

Delegates to TableCreationValidator for the actual validation logic.

Parameters:

flowgroups (List[FlowGroup]) – List of all flowgroups in the pipeline

Returns:

List of validation error messages

Return type:

List[str]

validate_cdc_fanin_compatibility(flowgroups)[source]

Validate compatibility across CDC actions sharing a target.

Delegates to CdcFanInCompatibilityValidator. Mismatches on shared fields surface as LHPConfigError (raised from the delegate); mode-mixing between CDC and non-CDC contributors returns plain error strings in the result list.

Parameters:

flowgroups (List[FlowGroup]) – List of all flowgroups in the pipeline

Returns:

List of validation error messages

Return type:

List[str]

validate_duplicate_pipeline_flowgroup(flowgroups)[source]

Validate that there are no duplicate pipeline+flowgroup combinations.

Parameters:

flowgroups (List[FlowGroup]) – List of all flowgroups to validate

Returns:

List of validation error messages

Return type:

List[str]

Version (Experimental): lhp.utils.version

Version utilities for LakehousePlumber.

lhp.utils.version.get_version()[source]

Get the package version dynamically from package metadata.

Returns:

Package version string

Return type:

str

Notes on usage

  • All public classes log through logging.getLogger("lhp.<module>"). Configure the lhp logger to control LHP output independently of your application’s logging.

  • LHP raises LHPError (or a subclass) for every recoverable failure. Unexpected exceptions indicate a bug; file an issue with the traceback.

  • Pydantic v2 powers lhp.models.config. Treat the models as immutable once constructed; use model_copy(update=...) for derived instances.

  • ActionOrchestrator is not thread-safe. Construct one instance per pipeline run. CachingYAMLParser is thread-safe and shared internally.

See also

  • CLI Reference — command-line reference. Most users should drive LHP through the CLI rather than the Python API.

  • Architecture — design rationale, service decomposition, and the reasoning behind the orchestrator’s split into discovery, processing, generation, and validation services.

  • Migrate from DLT to LHP — how to move an existing Delta Live Tables project to LHP. Uses the CLI by default and the Python API for advanced cases.