import logging
import threading
from pathlib import Path
from typing import Any, Callable, Dict, List, Optional, Tuple, TypeVar
from ..models.config import ActionType, FlowGroup, Preset, Template
from ..utils.error_formatter import (
ErrorCategory,
ErrorFormatter,
LHPConfigError,
LHPError,
LHPValidationError,
)
from ..utils.yaml_loader import load_yaml_documents_all, load_yaml_file
_CacheValueT = TypeVar("_CacheValueT")
[docs]
class YAMLParser:
"""Parse and validate YAML configuration files."""
def __init__(self):
self.logger = logging.getLogger(__name__)
[docs]
def parse_file(self, file_path: Path) -> Dict[str, Any]:
"""Parse a single YAML file."""
self.logger.debug(f"Parsing YAML file: {file_path}")
try:
content = load_yaml_file(file_path, error_context=f"YAML file {file_path}")
return content or {}
except Exception as e:
# Check if it's an LHPError that should be re-raised
if isinstance(e, LHPError):
raise # Re-raise LHPError as-is
elif isinstance(e, ValueError):
# For backward compatibility, convert back to generic error for non-LHPErrors
if "File not found" in str(e):
raise LHPConfigError(
category=ErrorCategory.IO,
code_number="004",
title="Error reading YAML file",
details=f"Error reading {file_path}: {e}",
suggestions=[
"Check the file path is correct",
"Ensure the file exists and is readable",
],
context={"file": str(file_path)},
)
raise # Re-raise ValueError as-is for YAML errors
else:
raise LHPConfigError(
category=ErrorCategory.IO,
code_number="004",
title="Error reading YAML file",
details=f"Error reading {file_path}: {e}",
suggestions=[
"Check the file path is correct",
"Ensure the file exists and is readable",
"Verify the YAML syntax is correct",
],
context={"file": str(file_path)},
)
def _validate_action_types(self, doc: Dict[str, Any], file_path: Path) -> None:
"""Pre-check ``type:`` on every action before Pydantic constructs FlowGroup.
Pydantic's own validation rejects unknown action types, but produces a
generic ValidationError. Doing the check here lets us emit a friendlier
LHP-ACT-001 with a did-you-mean suggestion sourced from
``ErrorFormatter.unknown_type_with_suggestion``.
Args:
doc: A single parsed YAML document (one flowgroup's worth, or an
array-syntax shared-fields document — both expose ``actions:``
the same way when iterated on a per-flowgroup basis).
file_path: Source file path, surfaced in the resulting error
context for user diagnostics.
Raises:
LHPConfigError: When an action's ``type:`` value is not a member
of :class:`ActionType`.
"""
valid_values = [t.value for t in ActionType]
actions = doc.get("actions") or []
if not isinstance(actions, list):
return # Let Pydantic emit the structural error for non-list actions
for action in actions:
if not isinstance(action, dict):
continue
action_type = action.get("type")
if action_type is None:
continue # Missing type: Pydantic enforces required field
if action_type not in valid_values:
error = ErrorFormatter.unknown_type_with_suggestion(
value_type="action type",
provided_value=str(action_type),
valid_values=valid_values,
example_usage="""actions:
- name: my_action
type: load # Valid types: load, transform, write, test""",
)
error.context["file"] = str(file_path)
action_name = action.get("name")
if action_name:
error.context["action"] = action_name
raise error
[docs]
def parse_flowgroups_from_file(self, file_path: Path) -> List[FlowGroup]:
"""Parse one or more FlowGroups from a YAML file.
Supports both multi-document syntax (---) and flowgroups array syntax.
Args:
file_path: Path to YAML file containing one or more flowgroups
Returns:
List of FlowGroup objects
Raises:
ValueError: For duplicate flowgroup names, mixed syntax, or parsing errors
"""
from ..utils.yaml_loader import load_yaml_documents_all
from .blueprint_parser import BlueprintParser
try:
documents = load_yaml_documents_all(
file_path, error_context=f"flowgroup file {file_path}"
)
except ValueError:
# Re-raise with better context
raise
if not documents:
raise LHPConfigError(
category=ErrorCategory.CONFIG,
code_number="005",
title="Empty flowgroup file",
details=f"No content found in {file_path}",
suggestions=[
"Ensure the file contains valid YAML content",
"Check that the file is not empty",
"Verify the file has a 'flowgroup' key at the top level",
],
context={"file": str(file_path)},
)
is_multi_doc = len(documents) > 1
self.logger.debug(
f"Loaded {len(documents)} document(s) from {file_path}"
f"{' (multi-document)' if is_multi_doc else ''}"
)
flowgroups = []
seen_flowgroup_names = set()
uses_array_syntax = False
uses_regular_syntax = False
# Process each document
for doc_index, doc in enumerate(documents, start=1):
# Defensive guard: catch a blueprint *definition* accidentally
# placed under `include:` (pipelines/) instead of `blueprint_include:`.
# Without this, the array-syntax path below would attempt to
# construct a FlowGroup from a BlueprintFlowgroupSpec and crash with
# an opaque error about missing `actions:` and unresolved %{var}.
if BlueprintParser.looks_like_blueprint(doc):
raise LHPConfigError(
category=ErrorCategory.CONFIG,
code_number="040",
title="Blueprint file in flowgroup directory",
details=(
f"{file_path} appears to be a blueprint (has 'parameters' "
"and 'flowgroups' but no 'actions'). Blueprint files must "
"live under blueprints/ (or your configured "
"blueprint_include patterns), not in pipelines/."
),
suggestions=[
f"Move {file_path} to blueprints/",
"Or adjust 'include:' / 'blueprint_include:' patterns "
"in lhp.yaml",
],
context={"file": str(file_path)},
)
# Routing: instance files (use_blueprint or legacy blueprint+flat)
# may live alongside flowgroups under pipelines/. Skip them here so
# the BlueprintDiscoverer can pick them up via instance_include.
if BlueprintParser.looks_like_instance(doc):
self.logger.debug(
f"Skipping instance file {file_path} during flowgroup parse "
"(routed to BlueprintDiscoverer)"
)
return []
# Check if this document uses array syntax
if "flowgroups" in doc:
uses_array_syntax = True
# Extract document-level shared fields
shared_fields = {k: v for k, v in doc.items() if k != "flowgroups"}
# Process each flowgroup in the array
for fg_config in doc["flowgroups"]:
# Apply inheritance: only inherit if key not present in fg_config
inheritable_fields = [
"pipeline",
"use_template",
"presets",
"operational_metadata",
"job_name",
]
for field in inheritable_fields:
if field not in fg_config and field in shared_fields:
fg_config[field] = shared_fields[field]
# Check for duplicate flowgroup name
fg_name = fg_config.get("flowgroup")
if fg_name in seen_flowgroup_names:
raise LHPValidationError(
category=ErrorCategory.VALIDATION,
code_number="013",
title=f"Duplicate flowgroup name '{fg_name}'",
details=f"Duplicate flowgroup name '{fg_name}' in file {file_path}. Each flowgroup must have a unique name.",
suggestions=[
f"Rename one of the '{fg_name}' flowgroups to a unique name",
"Check for copy-paste errors in the flowgroups array",
],
context={"file": str(file_path), "flowgroup": fg_name},
)
if fg_name:
seen_flowgroup_names.add(fg_name)
# Pre-check action types (better error than Pydantic's generic
# ValidationError; emits LHP-ACT-001 with did-you-mean).
self._validate_action_types(fg_config, file_path)
# Build the flowgroup. LHPError subclasses (e.g. the pre-check
# above, or LHPValidationError raised inside Pydantic validators)
# already carry structured context — let them propagate as-is.
# Pydantic's ValidationError also propagates and is detailed
# enough for the user; no need to wrap it.
flowgroups.append(FlowGroup(**fg_config))
else:
# Regular syntax (one flowgroup per document)
uses_regular_syntax = True
# Check for duplicate flowgroup name
fg_name = doc.get("flowgroup")
if fg_name in seen_flowgroup_names:
raise LHPValidationError(
category=ErrorCategory.VALIDATION,
code_number="013",
title=f"Duplicate flowgroup name '{fg_name}'",
details=f"Duplicate flowgroup name '{fg_name}' in file {file_path}. Each flowgroup must have a unique name.",
suggestions=[
f"Rename one of the '{fg_name}' flowgroups to a unique name",
"Check for copy-paste errors in the multi-document YAML",
],
context={"file": str(file_path), "flowgroup": fg_name},
)
if fg_name:
seen_flowgroup_names.add(fg_name)
# Pre-check action types (see array-syntax branch above).
self._validate_action_types(doc, file_path)
# See array-syntax branch comments above.
flowgroups.append(FlowGroup(**doc))
self.logger.debug(
f"Parsed {len(flowgroups)} flowgroup(s) from {file_path}"
f" (syntax: {'array' if uses_array_syntax else 'regular'})"
)
# Check for mixed syntax
if uses_array_syntax and uses_regular_syntax:
raise LHPValidationError(
category=ErrorCategory.VALIDATION,
code_number="014",
title="Mixed flowgroup syntax",
details=f"Mixed syntax detected in {file_path}: cannot use both multi-document (---) and flowgroups array syntax in the same file.",
suggestions=[
"Use multi-document syntax (--- separators) OR flowgroups array syntax, not both",
"Multi-document: separate flowgroups with '---' on its own line",
"Array syntax: use 'flowgroups:' key with a list of flowgroups",
],
context={"file": str(file_path)},
)
return flowgroups
[docs]
def parse_flowgroup(self, file_path: Path) -> FlowGroup:
"""Parse a FlowGroup YAML file.
Note: This method only supports single-flowgroup files. If the file contains
multiple flowgroups (via --- separator or flowgroups array), use
parse_flowgroups_from_file() instead.
"""
from ..utils.yaml_loader import load_yaml_documents_all
# Check if file contains multiple flowgroups
try:
documents = load_yaml_documents_all(file_path)
except ValueError as e:
# If we can't even load it, fall back to original behavior
self.logger.debug(
f"Multi-document load failed for {file_path}, falling back to single-document parse: {e}"
)
content = self.parse_file(file_path)
return FlowGroup(**content)
# Check for multiple documents
if len(documents) > 1:
raise LHPValidationError(
category=ErrorCategory.VALIDATION,
code_number="015",
title="Multiple documents in single-flowgroup parse",
details=f"File {file_path} contains multiple flowgroups (multiple documents). Use parse_flowgroups_from_file() instead.",
suggestions=[
"Use parse_flowgroups_from_file() for multi-document YAML files",
"Split into separate files if single-flowgroup parsing is needed",
],
context={"file": str(file_path)},
)
# Check for array syntax
if documents and "flowgroups" in documents[0]:
raise LHPValidationError(
category=ErrorCategory.VALIDATION,
code_number="015",
title="Array syntax in single-flowgroup parse",
details=f"File {file_path} contains multiple flowgroups (array syntax). Use parse_flowgroups_from_file() instead.",
suggestions=[
"Use parse_flowgroups_from_file() for array-syntax YAML files",
"Split into separate files if single-flowgroup parsing is needed",
],
context={"file": str(file_path)},
)
# Single flowgroup - use original parsing
content = self.parse_file(file_path)
return FlowGroup(**content)
[docs]
def parse_template_raw(self, file_path: Path) -> Template:
"""Parse a Template YAML file with raw actions (no Action object creation).
This is used during template loading to avoid validation of template syntax
like {{ table_properties }}. Actions will be validated later during rendering
when actual parameter values are available.
"""
content = self.parse_file(file_path)
# Create template with raw actions
raw_actions = content.pop("actions", [])
template = Template(**content, actions=raw_actions)
template._raw_actions = True # Set flag after creation
return template
[docs]
def parse_preset(self, file_path: Path) -> Preset:
"""Parse a Preset YAML file."""
content = self.parse_file(file_path)
return Preset(**content)
[docs]
def discover_presets(self, presets_dir: Path) -> List[Preset]:
"""Discover all Preset files."""
self.logger.debug(f"Discovering presets in {presets_dir}")
presets = []
for yaml_file in presets_dir.glob("*.yaml"):
if yaml_file.is_file():
try:
preset = self.parse_preset(yaml_file)
presets.append(preset)
except Exception as e:
self.logger.warning(f"Could not parse preset {yaml_file}: {e}")
return presets
[docs]
class CachingYAMLParser:
"""Thread-safe caching wrapper for YAMLParser.
Uses file path + modification time as cache key to automatically
invalidate cache when files change.
"""
def __init__(
self, base_parser: Optional["YAMLParser"] = None, max_cache_size: int = 500
) -> None:
"""Initialize caching parser.
Args:
base_parser: Underlying YAMLParser instance (creates new if None)
max_cache_size: Maximum number of cached entries
"""
self._parser: YAMLParser = base_parser or YAMLParser()
self._cache: Dict[Tuple[str, float], List[FlowGroup]] = {}
self._documents_cache: Dict[Tuple[str, float], List[Dict[str, Any]]] = {}
self._max_cache_size: int = max_cache_size
self._lock: threading.RLock = threading.RLock()
self._hits: int = 0
self._misses: int = 0
def _cached_load(
self,
path: Path,
cache: Dict[Tuple[str, float], _CacheValueT],
loader: Callable[[], _CacheValueT],
label: str,
) -> _CacheValueT:
"""Cache-shell shared by every sub-cache: mtime-keyed lookup, FIFO
eviction at the shared ``max_cache_size`` ceiling, hit/miss counters.
``loader`` is the no-arg fallback invoked on cache miss (and on the
OSError fast-path where the key can't be computed).
"""
resolved_path: Path = path.resolve()
try:
mtime: float = resolved_path.stat().st_mtime
except OSError as e:
self._parser.logger.debug(
f"Could not stat {resolved_path}, skipping cache: {e}"
)
return loader()
cache_key: Tuple[str, float] = (str(resolved_path), mtime)
with self._lock:
if cache_key in cache:
self._hits += 1
self._parser.logger.debug(
f"{label} cache hit for {path} (hits={self._hits})"
)
return cache[cache_key]
self._misses += 1
if len(cache) >= self._max_cache_size:
# Remove ~10% of entries (FIFO approximation)
keys_to_remove = list(cache.keys())[: self._max_cache_size // 10]
for key in keys_to_remove:
del cache[key]
result: _CacheValueT = loader()
cache[cache_key] = result
return result
[docs]
def parse_flowgroups_from_file(self, path: Path) -> List[FlowGroup]:
"""Parse flowgroups with caching based on file mtime."""
return self._cached_load(
path,
self._cache,
lambda: self._parser.parse_flowgroups_from_file(path),
label="Flowgroup",
)
[docs]
def load_documents_all(
self, path: Path, error_context: Optional[str] = None
) -> List[Dict[str, Any]]:
"""Load all YAML documents from a file with mtime-keyed caching.
Hit/miss counters are shared with the flowgroup sub-cache;
``get_cache_stats()`` reports per-sub-cache sizes separately.
"""
return self._cached_load(
path,
self._documents_cache,
lambda: load_yaml_documents_all(path, error_context=error_context),
label="Documents",
)
[docs]
def clear_cache(self) -> None:
"""Clear all cached entries across both sub-caches."""
with self._lock:
self._cache.clear()
self._documents_cache.clear()
self._hits = 0
self._misses = 0
[docs]
def get_cache_stats(self) -> Dict[str, Any]:
"""Get cache performance statistics.
Hit/miss counters are aggregated across both sub-caches (flowgroup
parsing and raw-document loading). Sub-cache sizes are reported
separately for visibility.
Returns:
Dictionary with cache hits, misses, hit rate, and per-sub-cache sizes
"""
with self._lock:
total: int = self._hits + self._misses
hit_rate: float = (self._hits / total * 100) if total > 0 else 0
return {
"hits": self._hits,
"misses": self._misses,
"total": total,
"hit_rate_percent": round(hit_rate, 1),
"cache_size": len(self._cache),
"documents_cache_size": len(self._documents_cache),
}
def __getattr__(self, name: str) -> Any:
"""Delegate other methods to base parser.
Args:
name: Attribute name
Returns:
Attribute from base parser
"""
return getattr(self._parser, name)