Source code for lhp.core.validator
"""Configuration validator for LakehousePlumber."""
from __future__ import annotations
import logging
from typing import TYPE_CHECKING, List
from ..models.config import Action, ActionType, FlowGroup
from ..utils.error_formatter import LHPError
from .action_registry import ActionRegistry
from .config_field_validator import ConfigFieldValidator
from .dependency_resolver import DependencyResolver
from .validators.base_validator import ValidationError
logger = logging.getLogger(__name__)
from .validators import (
CdcFanInCompatibilityValidator,
LoadActionValidator,
TableCreationValidator,
TestActionValidator,
TransformActionValidator,
WriteActionValidator,
)
if TYPE_CHECKING:
pass
[docs]
class ConfigValidator:
"""Validate LakehousePlumber configurations."""
def __init__(self, project_root=None, project_config=None):
self.logger = logging.getLogger(__name__)
self.project_root = project_root
self.project_config = project_config
self.action_registry = ActionRegistry()
self.dependency_resolver = DependencyResolver()
self.field_validator = ConfigFieldValidator()
# Initialize action validators
self.load_validator = LoadActionValidator(
self.action_registry, self.field_validator
)
self.transform_validator = TransformActionValidator(
self.action_registry,
self.field_validator,
self.project_root,
self.project_config,
)
self.write_validator = WriteActionValidator(
self.action_registry, self.field_validator, self.logger
)
self.test_validator = TestActionValidator(
self.action_registry, self.field_validator
)
self.table_creation_validator = TableCreationValidator()
self.cdc_fanin_validator = CdcFanInCompatibilityValidator()
[docs]
def validate_flowgroup(self, flowgroup: FlowGroup) -> List[ValidationError]:
"""Validate flowgroups and actions.
Args:
flowgroup: FlowGroup to validate
Returns:
List of validation errors (strings or LHPError objects)
"""
errors = []
# Validate basic fields
if not flowgroup.pipeline:
errors.append("FlowGroup must have a 'pipeline' name")
if not flowgroup.flowgroup:
errors.append("FlowGroup must have a 'flowgroup' name")
if not flowgroup.actions:
errors.append("FlowGroup must have at least one action")
# Validate each action
action_names = set()
target_names = set()
for i, action in enumerate(flowgroup.actions):
action_errors = self.validate_action(action, i)
errors.extend(action_errors)
# Check for duplicate action names
if action.name in action_names:
errors.append(f"Duplicate action name: '{action.name}'")
action_names.add(action.name)
# Check for duplicate target names
if action.target and action.target in target_names:
errors.append(
f"Duplicate target name: '{action.target}' in action '{action.name}'"
)
if action.target:
target_names.add(action.target)
# Validate dependencies
if flowgroup.actions:
try:
dependency_errors = self.dependency_resolver.validate_relationships(
flowgroup.actions
)
errors.extend(dependency_errors)
except LHPError as e:
logger.debug(f"Dependency validation error: {e.title}")
errors.append(e)
except Exception as e:
logger.debug(f"Dependency validation error: {e}")
errors.append(str(e))
# Validate template usage
if flowgroup.use_template and not flowgroup.template_parameters:
self.logger.warning(
f"FlowGroup uses template '{flowgroup.use_template}' but no parameters provided"
)
return errors
[docs]
def validate_action(self, action: Action, index: int) -> List[ValidationError]:
"""Validate action types and required fields.
Args:
action: Action to validate
index: Action index in the flowgroup
Returns:
List of validation errors (strings or LHPError objects)
"""
errors = []
prefix = f"Action[{index}] '{action.name}'"
# Basic validation
if not action.name:
errors.append(f"Action[{index}]: Missing 'name' field")
return errors # Can't continue without name
if not action.type:
errors.append(f"{prefix}: Missing 'type' field")
return errors # Can't continue without type
# Strict field validation - validate action-level fields
try:
action_dict = action.model_dump()
self.field_validator.validate_action_fields(action_dict, action.name)
except LHPError:
# Re-raise LHPError as-is (it's already well-formatted)
raise
except Exception as e:
errors.append(str(e))
return errors
# Type-specific validation using action validators
if action.type == ActionType.LOAD:
errors.extend(self.load_validator.validate(action, prefix))
elif action.type == ActionType.TRANSFORM:
errors.extend(self.transform_validator.validate(action, prefix))
elif action.type == ActionType.WRITE:
errors.extend(self.write_validator.validate(action, prefix))
elif action.type == ActionType.TEST:
errors.extend(self.test_validator.validate(action, prefix))
else:
errors.append(f"{prefix}: Unknown action type '{action.type}'")
return errors
[docs]
def validate_action_references(self, actions: List[Action]) -> List[str]:
"""Validate that all action references are valid."""
errors = []
# Build set of all available views/targets
available_views = set()
for action in actions:
if action.target:
available_views.add(action.target)
# Check all references
for action in actions:
sources = self._extract_all_sources(action)
for source in sources:
# Skip external sources
if not source.startswith("v_") and "." in source:
continue # Likely an external table like bronze.customers
if source.startswith("v_") and source not in available_views:
errors.append(
f"Action '{action.name}' references view '{source}' which is not defined"
)
return errors
def _extract_all_sources(self, action: Action) -> List[str]:
"""Extract all source references from an action."""
sources = []
if isinstance(action.source, str):
sources.append(action.source)
elif isinstance(action.source, list):
sources.extend(action.source)
elif isinstance(action.source, dict):
# Check various fields that might contain source references
for field in ["view", "source", "views", "sources"]:
value = action.source.get(field)
if isinstance(value, str):
sources.append(value)
elif isinstance(value, list):
sources.extend(value)
return sources
[docs]
def validate_table_creation_rules(self, flowgroups: List[FlowGroup]) -> List[str]:
"""Validate table creation rules across the entire pipeline.
Delegates to TableCreationValidator for the actual validation logic.
Args:
flowgroups: List of all flowgroups in the pipeline
Returns:
List of validation error messages
"""
return self.table_creation_validator.validate(flowgroups)
[docs]
def validate_cdc_fanin_compatibility(
self, flowgroups: List[FlowGroup]
) -> List[str]:
"""Validate compatibility across CDC actions sharing a target.
Delegates to CdcFanInCompatibilityValidator. Mismatches on shared
fields surface as ``LHPConfigError`` (raised from the delegate);
mode-mixing between CDC and non-CDC contributors returns plain error
strings in the result list.
Args:
flowgroups: List of all flowgroups in the pipeline
Returns:
List of validation error messages
"""
return self.cdc_fanin_validator.validate(flowgroups)
[docs]
def validate_duplicate_pipeline_flowgroup(
self, flowgroups: List[FlowGroup]
) -> List[str]:
"""Validate that there are no duplicate pipeline+flowgroup combinations.
Args:
flowgroups: List of all flowgroups to validate
Returns:
List of validation error messages
"""
errors = []
seen_combinations = set()
for flowgroup in flowgroups:
# Create a unique key from pipeline and flowgroup
combination_key = f"{flowgroup.pipeline}.{flowgroup.flowgroup}"
if combination_key in seen_combinations:
errors.append(
f"Duplicate pipeline+flowgroup combination: '{combination_key}'. "
f"Each pipeline+flowgroup combination must be unique across all YAML files."
)
else:
seen_combinations.add(combination_key)
return errors