Source code for lhp.bundle.manager

"""
Bundle manager for LHP Databricks Asset Bundle integration.

This module provides the main BundleManager class that coordinates bundle
resource operations including resource file synchronization and management.
"""

import logging
from pathlib import Path
from typing import Any, Dict, List, Optional, Union

from ..core.services.monitoring_pipeline_builder import (
    resolve_monitoring_pipeline_name,
)
from ..utils.error_formatter import ErrorCategory, LHPConfigError, LHPError
from ..utils.performance_timer import perf_timer, record_count
from ..utils.template_renderer import TemplateRenderer
from .exceptions import BundleResourceError

logger = logging.getLogger(__name__)


# Top-level pipeline_config keys that pipeline_resource.yml.j2 renders explicitly.
# Anything NOT in this set is passed through as-is via the `toyaml` filter, so
# users can use new Databricks Pipelines API fields (run_as, …) without waiting
# for LHP to explicitly support them.
#
# Kept here (not inside BundleManager) so templates and tests can reference it
# via a single source of truth.
EXPLICITLY_RENDERED_PIPELINE_CONFIG_KEYS = frozenset(
    {
        "catalog",
        "schema",
        "serverless",
        "clusters",
        "configuration",
        "continuous",
        "photon",
        "edition",
        "channel",
        "notifications",
        "tags",
        "event_log",
        "environment",
        "permissions",
    }
)


[docs] class BundleManager: """ Manages Databricks Asset Bundle resource files. Wipe-and-regenerate contract: ``resources/lhp/`` is fully wiped by the CLI before sync, and BundleManager re-renders one resource file per pipeline under ``generated/<env>/``. databricks.yml is never read or mutated. Catalog and schema must come from pipeline_config.yaml (per-pipeline or via the top-level ``project_defaults`` block). """ def __init__( self, project_root: Union[Path, str], pipeline_config_path: Optional[str] = None, project_config: Optional[Any] = None, ): """ Initialize the bundle manager. Args: project_root: Path to the project root directory pipeline_config_path: Optional path to custom pipeline config file (relative to project_root) project_config: Optional ProjectConfig with project-level settings (e.g., event_log) Raises: TypeError: If project_root is None """ if project_root is None: raise LHPConfigError( category=ErrorCategory.CONFIG, code_number="028", title="BundleManager requires a project root", details="project_root cannot be None when initializing BundleManager.", suggestions=[ "Ensure a valid project root path is provided", "Run from within an LHP project directory", ], ) # Convert string to Path if necessary if isinstance(project_root, str): project_root = Path(project_root) self.project_root = project_root self.project_config = project_config self.resources_dir = project_root / "resources" / "lhp" self.logger = logging.getLogger(__name__) self.template_renderer = TemplateRenderer.from_package() from ..core.services.pipeline_config_loader import PipelineConfigLoader self.config_loader = PipelineConfigLoader( self.project_root, pipeline_config_path, monitoring_pipeline_name=self._get_monitoring_pipeline_name(), ) # Cached per env to avoid re-reading + re-expanding substitutions/<env>.yaml # once per pipeline during a sync. Built lazily on first lookup. self._sub_mgr_cache: Dict[str, Optional[Any]] = {} def _get_substitution_manager(self, env: str) -> Optional[Any]: """Return a cached EnhancedSubstitutionManager for env, or None if no file.""" if env in self._sub_mgr_cache: return self._sub_mgr_cache[env] substitution_file = self.project_root / "substitutions" / f"{env}.yaml" if substitution_file.exists(): from ..utils.substitution import EnhancedSubstitutionManager sub_mgr: Optional[Any] = EnhancedSubstitutionManager(substitution_file, env) else: self.logger.debug( f"No substitution file found at {substitution_file}, using raw config" ) sub_mgr = None self._sub_mgr_cache[env] = sub_mgr return sub_mgr
[docs] def sync_resources_with_generated_files( self, output_dir: Path, env: str, ) -> int: """ Write one bundle resource file per current pipeline directory. Wipe-and-regenerate contract: callers must clear ``resources/lhp/`` before invoking this method. BundleManager only writes the resource files for the pipelines that exist under ``output_dir`` — it does not preserve, back up, or delete any pre-existing files. Args: output_dir: Directory containing generated Python files env: Environment name for template processing Returns: Number of resource files written Raises: BundleResourceError: If synchronization fails """ self.logger.info("Syncing bundle resources for environment: %s", env) with perf_timer("bundle_sync_resources"): current_pipeline_dirs = self._setup_sync_environment(output_dir) record_count("pipelines_synced", len(current_pipeline_dirs)) written_count = self._process_current_pipelines(current_pipeline_dirs, env) self._log_sync_summary(written_count) return written_count
def _sync_pipeline_resource( self, pipeline_name: str, pipeline_dir: Path, env: str, ) -> bool: """ Write a single pipeline's resource file unconditionally. Callers are expected to have wiped ``resources/lhp/`` before invoking sync, so this method always (re)creates the resource file for the pipeline. Args: pipeline_name: Name of the pipeline pipeline_dir: Directory containing pipeline Python files env: Environment name Returns: True (a file was written) """ self._create_new_resource_file(pipeline_name, pipeline_dir.parent, env) return True
[docs] def ensure_resources_directory(self): """Create resources/lhp directory if it doesn't exist.""" self._safe_directory_create(self.resources_dir, "LHP resources directory")
[docs] def get_pipeline_directories(self, output_dir: Path) -> List[Path]: """ Get list of pipeline directories in the output directory. Args: output_dir: Directory to scan for pipeline directories Returns: List of pipeline directory paths in sorted order Raises: BundleResourceError: If directory access fails """ # Validate directory access using utility self._safe_directory_access(output_dir, "output directory") try: pipeline_dirs = [] # Sort directories to ensure deterministic processing order across platforms for item in sorted(output_dir.iterdir()): if item.is_dir(): pipeline_dirs.append(item) self.logger.debug("Found pipeline directory: %s", item.name) return pipeline_dirs except (OSError, PermissionError) as e: raise BundleResourceError( f"Error scanning output directory {output_dir}: {e}", e ) from e
[docs] def get_resource_file_path(self, pipeline_name: str) -> Path: """Return the path where this pipeline's resource file is written. Under the wipe-and-regenerate contract, ``resources/lhp/`` is empty when sync starts and BundleManager always writes the canonical ``<pipeline>.pipeline.yml`` filename, so no existence probing is needed. """ return self.resources_dir / f"{pipeline_name}.pipeline.yml"
def _inject_project_event_log( self, pipeline_config: Dict[str, Any], pipeline_name: str ) -> Dict[str, Any]: """Inject project-level event_log into pipeline config if applicable. Injection rules: - No project_config or no event_log → return unchanged - event_log disabled → return unchanged - pipeline_config has event_log: false → delete key, return (opt-out) - pipeline_config has event_log dict → return unchanged (full replace) - Otherwise → inject event_log block from project config Args: pipeline_config: Raw pipeline config dict (pre-substitution) pipeline_name: Name of the pipeline (used for event_log name generation) Returns: Pipeline config dict, potentially with event_log injected """ # No project config or no event_log configured if not self.project_config or not getattr( self.project_config, "event_log", None ): return pipeline_config event_log_cfg = self.project_config.event_log # Project-level event_log is disabled if not event_log_cfg.enabled: return pipeline_config # Check pipeline-level override if "event_log" in pipeline_config: pipeline_event_log = pipeline_config["event_log"] # Explicit opt-out: event_log: false if pipeline_event_log is False: del pipeline_config["event_log"] logger.debug( f"Pipeline '{pipeline_name}' opted out of project-level event_log" ) return pipeline_config # Pipeline has its own event_log dict → full replace, leave unchanged if isinstance(pipeline_event_log, dict): logger.debug( f"Pipeline '{pipeline_name}' has its own event_log config, " f"skipping project-level injection" ) return pipeline_config # Inject project-level event_log event_log_name = ( f"{event_log_cfg.name_prefix}{pipeline_name}{event_log_cfg.name_suffix}" ) pipeline_config["event_log"] = { "name": event_log_name, "catalog": event_log_cfg.catalog, "schema": event_log_cfg.schema_, } logger.debug( f"Injected project-level event_log for pipeline '{pipeline_name}': " f"name={event_log_name}" ) return pipeline_config def _get_monitoring_pipeline_name(self) -> Optional[str]: return resolve_monitoring_pipeline_name(self.project_config)
[docs] def generate_resource_file_content( self, pipeline_name: str, output_dir: Path, env: str ) -> str: """ Generate content for a bundle resource file using Jinja2 template. Applies LHP token substitution to ALL fields in pipeline_config.yaml, enabling environment-specific configuration for node types, policies, emails, and all other pipeline settings. Catalog and schema MUST be defined in pipeline_config.yaml (either per-pipeline or via the top-level ``project_defaults`` block); they are never read from ``databricks.yml``. Catalog/schema validation is performed upstream by ``bundle.preflight.validate_catalog_schema`` before any wipes occur. The guard here is a defense-in-depth assertion that fires only if a non-CLI caller invokes this method without running preflight first. Args: pipeline_name: Name of the pipeline output_dir: Output directory env: Environment name for token resolution (REQUIRED) Returns: YAML content for the resource file with fully substituted pipeline config Raises: LHPConfigError: ``LHP-GEN-001`` if preflight was bypassed and catalog/schema is still missing/empty at the bundle-write phase. """ pipeline_config_raw = self.config_loader.get_pipeline_config(pipeline_name) # Skip event_log injection for the monitoring pipeline (no self-reference) monitoring_name = self._get_monitoring_pipeline_name() if pipeline_name != monitoring_name: pipeline_config_raw = self._inject_project_event_log( pipeline_config_raw, pipeline_name ) sub_mgr = self._get_substitution_manager(env) if sub_mgr is not None: pipeline_config_resolved = sub_mgr.substitute_yaml(pipeline_config_raw) else: pipeline_config_resolved = pipeline_config_raw catalog = pipeline_config_resolved.get("catalog") schema = pipeline_config_resolved.get("schema") if ( not catalog or not schema or not str(catalog).strip() or not str(schema).strip() ): raise LHPConfigError( category=ErrorCategory.GENERAL, code_number="001", title="Internal error: preflight bypassed for bundle resource generation", details=( f"Pipeline '{pipeline_name}' reached the bundle-write phase " f"with missing/empty resolved catalog/schema " f"(catalog={catalog!r}, schema={schema!r}). " f"This indicates preflight validation did not run." ), suggestions=[ "This is a programming bug; preflight should have caught this. " "Verify generate_command.py calls validate_catalog_schema " "before BundleManager.", ], context={"pipeline": pipeline_name, "env": env}, ) self.logger.info( f"Pipeline '{pipeline_name}' using catalog/schema from config: {catalog}.{schema}" ) # Build template context with fully resolved config context = { "pipeline_name": pipeline_name, "pipeline_config": pipeline_config_resolved, # Fully substituted! "catalog": catalog, "schema": schema, # Scoped here (not on TemplateRenderer.env) so the set stays bound # to bundle pipeline rendering and doesn't leak into other templates. "explicitly_rendered_keys": EXPLICITLY_RENDERED_PIPELINE_CONFIG_KEYS, } return self.template_renderer.render_template( "bundle/pipeline_resource.yml.j2", context )
# === UTILITY METHODS === def _safe_directory_create( self, directory: Path, error_context: str = "directory" ) -> None: """ Safely create directory with consistent error handling. Args: directory: Path to directory to create error_context: Context for error messages Raises: BundleResourceError: If directory creation fails """ try: directory.mkdir(parents=True, exist_ok=True) self.logger.debug("Ensured %s exists: %s", error_context, directory) except OSError as e: raise BundleResourceError( f"Failed to create {error_context}: {e}", e ) from e def _safe_directory_access( self, directory: Path, error_context: str = "directory" ) -> None: """ Safely validate directory access with consistent error handling. Args: directory: Path to directory to validate error_context: Context for error messages Raises: BundleResourceError: If directory access fails """ try: if not directory.exists(): raise BundleResourceError( f"{error_context.capitalize()} does not exist: {directory}" ) except (OSError, PermissionError) as e: raise BundleResourceError( f"Cannot access {error_context} {directory}: {e}", e ) from e def _handle_pipeline_error( self, pipeline_name: str, error: Exception, operation: str ) -> BundleResourceError: """ Wrap a per-pipeline failure in a consistent BundleResourceError. The CLI error boundary handles logging; this method only constructs the user-facing exception. """ if isinstance(error, OSError): error_msg = f"File system error for pipeline '{pipeline_name}': {error}" else: error_msg = f"{operation} failed for pipeline '{pipeline_name}': {error}" return BundleResourceError(error_msg, error) # === MAIN WORKFLOW METHODS === def _setup_sync_environment(self, output_dir: Path) -> List[Path]: """Ensure resources/lhp/ exists and return the current pipeline dirs.""" self.ensure_resources_directory() return self.get_pipeline_directories(output_dir) def _process_current_pipelines( self, current_pipeline_dirs: List[Path], env: str, ) -> int: """ Write a resource file for each current pipeline directory. Args: current_pipeline_dirs: List of pipeline directories to process env: Environment name for template processing Returns: Number of resource files written Raises: BundleResourceError: If pipeline processing fails """ written_count = 0 for pipeline_dir in current_pipeline_dirs: pipeline_name = pipeline_dir.name try: if self._sync_pipeline_resource(pipeline_name, pipeline_dir, env): written_count += 1 self.logger.debug("Successfully synced pipeline: %s", pipeline_name) except (LHPError, BundleResourceError): # Structured errors (LHPError, BundleResourceError and its # YAMLProcessingError subclasses) propagate # as-is to the CLI error boundary. raise except Exception as e: raise self._handle_pipeline_error( pipeline_name, e, "Pipeline sync" ) from e return written_count def _log_sync_summary(self, written_count: int) -> None: """ Log wipe-and-regenerate sync results summary. Args: written_count: Number of resource files written """ if written_count > 0: self.logger.info( f"Wrote {written_count} bundle resource file(s) under resources/lhp/" ) else: self.logger.info( "No pipelines found under generated/<env>/ — nothing to write" ) def _create_new_resource_file(self, pipeline_name: str, output_dir: Path, env: str): """ Create new resource file for a pipeline. Args: pipeline_name: Name of the pipeline output_dir: Output directory containing generated Python files env: Environment name for template context """ with perf_timer("_create_new_resource_file", category="bundle_create_resource"): # resources/lhp/ is already created by _setup_sync_environment for # the sync flow; safe to assume it exists here. resource_file = self.get_resource_file_path(pipeline_name) # Generate resource file content from Python files content = self.generate_resource_file_content( pipeline_name, output_dir, env ) try: resource_file.write_text(content, encoding="utf-8") self.logger.info(f"Created new resource file: {resource_file}") except (OSError, PermissionError) as e: raise BundleResourceError( f"Failed to create resource file {resource_file}: {e}", e )