# SPDX-FileCopyrightText: 2025 - 2026 Rahil Miten Doshi, Friedrich-Alexander-Universität Erlangen-Nürnberg
# SPDX-FileCopyrightText: 2026 Matthias Markl, Friedrich-Alexander-Universität Erlangen-Nürnberg
# SPDX-License-Identifier: BSD-3-Clause
import logging
from typing import Dict, List, Union
import numpy as np
import sympy as sp
from materforge.algorithms.interpolation import ensure_ascending_order
from materforge.algorithms.regression_processor import RegressionProcessor
from materforge.data.constants import ProcessingConstants
from materforge.parsing.config.yaml_keys import (
CONSTANT_KEY, LINEAR_KEY, BOUNDS_KEY, PRE_KEY, YAML_PLACEHOLDER
)
from materforge.parsing.utils.utilities import ensure_sympy_compatible
logger = logging.getLogger(__name__)
[docs]
class PiecewiseBuilder:
"""Centralised piecewise function creation with different build strategies."""
[docs]
@staticmethod
def build_from_data(dep_array: np.ndarray, prop_array: np.ndarray,
dependency: sp.Symbol, config: Dict, prop_name: str) -> sp.Piecewise:
"""Builds a piecewise function from raw dependency-value arrays.
Handles ascending/descending input, optional pre-regression, and
symbol substitution when the caller uses a non-placeholder symbol.
Args:
dep_array: Dependency axis data.
prop_array: Corresponding property values.
dependency: SymPy symbol for the resulting expression.
config: Property configuration dict (bounds, regression).
prop_name: Property name used in log and error messages.
Returns:
Symbolic piecewise function expressed in terms of dependency.
Raises:
ValueError: On null/empty/mismatched arrays or build failure.
"""
logger.info("Building piecewise function for property: %s", prop_name)
if dep_array is None or prop_array is None:
raise ValueError(f"dep_array and prop_array cannot be None for '{prop_name}'")
if len(dep_array) != len(prop_array):
raise ValueError(f"Array length mismatch for '{prop_name}': "
f"dep_array={len(dep_array)}, prop_array={len(prop_array)}")
if len(dep_array) == 0:
raise ValueError(f"Empty data arrays provided for '{prop_name}'")
try:
dep_array, prop_array = ensure_ascending_order(dep_array, prop_array)
lower_bound_type, upper_bound_type = config[BOUNDS_KEY]
has_regression, simplify_type, degree, segments = RegressionProcessor.process_regression_params(
config, prop_name, len(dep_array))
if has_regression:
logger.info("Regression enabled for %r: type=%s, degree=%d, segments=%d",
prop_name, simplify_type, degree, segments)
if has_regression and simplify_type == PRE_KEY:
pw_result = PiecewiseBuilder._build_with_regression(
dep_array, prop_array, YAML_PLACEHOLDER,
lower_bound_type, upper_bound_type, degree, segments)
else:
pw_result = PiecewiseBuilder._build_without_regression(
dep_array, prop_array, YAML_PLACEHOLDER,
lower_bound_type, upper_bound_type)
if dependency != YAML_PLACEHOLDER:
logger.debug("Substituting %s -> %s for property %r",
YAML_PLACEHOLDER, dependency, prop_name)
pw_result = pw_result.subs(YAML_PLACEHOLDER, dependency)
logger.info("Successfully built piecewise function for property: %s", prop_name)
return pw_result
except Exception as e:
raise ValueError(f"Failed building piecewise from data for '{prop_name}': {str(e)}") from e
@staticmethod
def _build_without_regression(dep_array: np.ndarray, prop_array: np.ndarray,
dependency: sp.Symbol, lower: str, upper: str) -> sp.Piecewise:
"""Builds a linear interpolation piecewise function from data arrays.
Args:
dep_array: Dependency data points (must be sorted ascending).
prop_array: Property values.
dependency: SymPy symbol.
lower: Lower boundary type.
upper: Upper boundary type.
Returns:
Linear interpolation piecewise function.
"""
logger.debug("Building linear interpolation piecewise: %d points, bounds=(%s, %s)",
len(dep_array), lower, upper)
dep_array = [ensure_sympy_compatible(x) for x in dep_array]
prop_array = [ensure_sympy_compatible(x) for x in prop_array]
conditions = []
# Lower boundary
if lower == CONSTANT_KEY:
lower_expr = prop_array[0]
else:
if len(dep_array) > 1:
slope = (prop_array[1] - prop_array[0]) / (dep_array[1] - dep_array[0])
lower_expr = prop_array[0] + slope * (dependency - dep_array[0])
else:
lower_expr = prop_array[0]
conditions.append((lower_expr, dependency < dep_array[0]))
# Interior segments
for i in range(len(dep_array) - 1):
slope = (prop_array[i + 1] - prop_array[i]) / (dep_array[i + 1] - dep_array[i])
expr = prop_array[i] + slope * (dependency - dep_array[i])
conditions.append((expr, sp.And(dependency >= dep_array[i], dependency < dep_array[i + 1])))
# Upper boundary
if upper == CONSTANT_KEY:
upper_expr = prop_array[-1]
else:
if len(dep_array) > 1:
slope = (prop_array[-1] - prop_array[-2]) / (dep_array[-1] - dep_array[-2])
upper_expr = prop_array[-1] + slope * (dependency - dep_array[-1])
else:
upper_expr = prop_array[-1]
conditions.append((upper_expr, dependency >= dep_array[-1]))
return sp.Piecewise(*conditions)
@staticmethod
def _build_with_regression(dep_array: np.ndarray, prop_array: np.ndarray,
dependency: sp.Symbol, lower: str, upper: str,
degree: int, segments: int) -> sp.Piecewise:
"""Builds a regression-based piecewise function.
Delegates to RegressionProcessor, providing a unified build interface.
Args:
dep_array: Dependency data points.
prop_array: Property values.
dependency: SymPy symbol.
lower: Lower boundary type.
upper: Upper boundary type.
degree: Polynomial degree for regression.
segments: Number of piecewise segments.
Returns:
Regression-based piecewise function.
"""
logger.info("Building regression piecewise: degree=%d, segments=%d", degree, segments)
return RegressionProcessor.process_regression(dep_array, prop_array, dependency, lower, upper, degree, segments,
seed=ProcessingConstants.DEFAULT_REGRESSION_SEED)