Source code for petres.interpolators.spatial.kriging

from __future__ import annotations

from abc import abstractmethod
from collections.abc import Callable, Iterable, Sequence
from typing import Any, Literal

import numpy as np

from ..base import BaseInterpolator


from pykrige.ok import OrdinaryKriging
from pykrige.ok3d import OrdinaryKriging3D
from pykrige.uk import UniversalKriging
from pykrige.uk3d import UniversalKriging3D



VariogramModel = Literal[
    "linear",
    "power",
    "gaussian",
    "spherical",
    "exponential",
    "hole-effect",
    "custom",
]

Backend = Literal["vectorized", "loop", "C"]


class BasePyKrigeInterpolator(BaseInterpolator):
    """Provide a common PyKrige-backed point interpolation API.

    This base class adapts PyKrige model classes to the ``BaseInterpolator``
    contract. It handles dependency checks, common model configuration, and
    shared point-execution logic for both ordinary and universal kriging.

    Parameters
    ----------
    variogram_model : {"linear", "power", "gaussian", "spherical", "exponential", "hole-effect", "custom"}, default="linear"
        Variogram model name forwarded to the PyKrige backend model.
    variogram_parameters : dict[str, Any] or Sequence[float] or None, default=None
        Explicit variogram parameters. If ``None``, PyKrige estimates the
        parameters from input data.
    variogram_function : callable or None, default=None
        Custom variogram function used when ``variogram_model="custom"``.
    nlags : int, default=6
        Number of lag bins used for variogram estimation. Must be at least 1.
    weight : bool, default=False
        Whether semivariances are weighted during variogram fitting.
    verbose : bool, default=False
        Whether backend model construction and solve steps emit logs.
    enable_plotting : bool, default=False
        Whether PyKrige variogram fitting plots are enabled.
    exact_values : bool, default=True
        Whether interpolation reproduces training values exactly.
    pseudo_inv : bool, default=False
        Whether to use a pseudo-inverse while solving the kriging system.
    pseudo_inv_type : {"pinv", "pinvh"}, default="pinv"
        Pseudo-inverse algorithm used when ``pseudo_inv=True``.
    backend : {"vectorized", "loop", "C"}, default="vectorized"
        PyKrige execution backend for prediction calls.

    Notes
    -----
    Only point prediction mode is supported. Grid-style execution is
    intentionally omitted to keep the API aligned with point-based workflows.
    """

    allowed_dims = (2, 3)

    def __init__(
        self,
        variogram_model: VariogramModel = "linear",
        variogram_parameters: dict[str, Any] | Sequence[float] | None = None,
        variogram_function: Callable[..., Any] | None = None,
        nlags: int = 6,
        weight: bool = False,
        verbose: bool = False,
        enable_plotting: bool = False,
        exact_values: bool = True,
        pseudo_inv: bool = False,
        pseudo_inv_type: str = "pinv",
        backend: Backend = "vectorized",
    ) -> None:
        """Initialize shared kriging options.

        Raises
        ------
        ValueError
            If ``nlags < 1``, ``pseudo_inv_type`` is unsupported, or ``backend``
            is not one of the accepted values.
        """
        super().__init__()

        if nlags < 1:
            raise ValueError(f"`nlags` must be >= 1. Got {nlags}.")
        if pseudo_inv_type not in {"pinv", "pinvh"}:
            raise ValueError(
                f"`pseudo_inv_type` must be 'pinv' or 'pinvh'. Got {pseudo_inv_type!r}."
            )
        if backend not in {"vectorized", "loop", "C"}:
            raise ValueError(
                f"`backend` must be 'vectorized', 'loop', or 'C'. Got {backend!r}."
            )

        self.variogram_model = variogram_model
        self.variogram_parameters = variogram_parameters
        self.variogram_function = variogram_function
        self.nlags = int(nlags)
        self.weight = bool(weight)
        self.verbose = bool(verbose)
        self.enable_plotting = bool(enable_plotting)
        self.exact_values = bool(exact_values)
        self.pseudo_inv = bool(pseudo_inv)
        self.pseudo_inv_type = pseudo_inv_type
        self.backend = backend

        self._model: Any = None
        self._fit_coordinates: np.ndarray | None = None
        self._fit_values: np.ndarray | None = None

    def _fit_impl(self, coordinates: np.ndarray, values: np.ndarray) -> None:
        """Fit the wrapped PyKrige model.

        Parameters
        ----------
        coordinates : numpy.ndarray
            Training coordinates with shape ``(n_samples, n_dims)``.
        values : numpy.ndarray
            Training target values with shape ``(n_samples,)``.
        """
        self._fit_coordinates = np.asarray(coordinates, dtype=float)
        self._fit_values = np.asarray(values, dtype=float)
        self._model = self._build_model(self._fit_coordinates, self._fit_values)

    def _predict_impl(self, coordinates: np.ndarray) -> np.ndarray:
        """Predict values for query coordinates.

        Parameters
        ----------
        coordinates : numpy.ndarray
            Query coordinates with shape ``(n_queries, n_dims)``.

        Returns
        -------
        numpy.ndarray
            Predicted values with shape ``(n_queries,)``.
        """
        predictions, _ = self._execute_points(coordinates)
        return predictions

    def predict_variance(self, coordinates: np.ndarray, **execute_kwargs: Any) -> np.ndarray:
        """Predict kriging variance for query coordinates.

        Parameters
        ----------
        coordinates : numpy.ndarray
            Query coordinates with shape ``(n_queries, n_dims)``.
        **execute_kwargs : Any
            Additional keyword arguments forwarded to PyKrige ``execute``.

        Returns
        -------
        numpy.ndarray
            Kriging variance values with shape ``(n_queries,)``.
        """
        self._check_fitted()
        coordinates = self._validate_predict_inputs(coordinates)
        _, variance = self._execute_points(coordinates, **execute_kwargs)
        return variance

    def predict_with_variance(
        self,
        coordinates: np.ndarray,
        **execute_kwargs: Any,
    ) -> tuple[np.ndarray, np.ndarray]:
        """Predict values and kriging variance for query coordinates.

        Parameters
        ----------
        coordinates : numpy.ndarray
            Query coordinates with shape ``(n_queries, n_dims)``.
        **execute_kwargs : Any
            Additional keyword arguments forwarded to PyKrige ``execute``.

        Returns
        -------
        tuple[numpy.ndarray, numpy.ndarray]
            Two arrays ``(prediction, variance)``, each shaped ``(n_queries,)``.
        """
        self._check_fitted()
        coordinates = self._validate_predict_inputs(coordinates)
        return self._execute_points(coordinates, **execute_kwargs)

    def _execute_points(
        self,
        coordinates: np.ndarray,
        **execute_kwargs: Any,
    ) -> tuple[np.ndarray, np.ndarray]:
        """Execute point-based kriging on the backend model.

        Parameters
        ----------
        coordinates : numpy.ndarray
            Query coordinates with shape ``(n_queries, n_dims)``.
        **execute_kwargs : Any
            Additional keyword arguments forwarded to PyKrige ``execute``.

        Returns
        -------
        tuple[numpy.ndarray, numpy.ndarray]
            Prediction and variance arrays with shape ``(n_queries,)``.

        Raises
        ------
        RuntimeError
            If the interpolator has no backend model or an unsupported fitted
            dimensionality.
        """
        if coordinates.shape[0] == 0:
            empty = np.empty((0,), dtype=float)
            return empty, empty

        if self._model is None:
            raise RuntimeError("Interpolator missing fitted PyKrige model (internal error).")

        if self.dim_ == 2:
            x = coordinates[:, 0]
            y = coordinates[:, 1]
            pred, var = self._model.execute(
                "points",
                x,
                y,
                backend=self.backend,
                **execute_kwargs,
            )
        elif self.dim_ == 3:
            x = coordinates[:, 0]
            y = coordinates[:, 1]
            z = coordinates[:, 2]
            pred, var = self._model.execute(
                "points",
                x,
                y,
                z,
                backend=self.backend,
                **execute_kwargs,
            )
        else:  # pragma: no cover
            raise RuntimeError(f"Unsupported fitted dim_: {self.dim_}")

        return np.asarray(pred, dtype=float).ravel(), np.asarray(var, dtype=float).ravel()

    @abstractmethod
    def _build_model(self, coordinates: np.ndarray, values: np.ndarray) -> Any:
        """Build and return a fitted backend kriging model.

        Parameters
        ----------
        coordinates : numpy.ndarray
            Training coordinates with shape ``(n_samples, n_dims)``.
        values : numpy.ndarray
            Training target values with shape ``(n_samples,)``.

        Returns
        -------
        Any
            Backend-specific fitted kriging model instance.
        """
        ...

    @staticmethod
    def _normalize_3d_scaling(value: float | tuple[float, float]) -> tuple[float, float]:
        """Normalize 3D anisotropy scaling input to ``(scaling_y, scaling_z)``.

        Parameters
        ----------
        value : float or tuple[float, float]
            User-provided 3D anisotropy scaling.

        Returns
        -------
        tuple[float, float]
            Scaling factors for y and z axes.

        Raises
        ------
        ValueError
            If tuple input does not have length 2.
        """
        if np.isscalar(value):
            v = float(value)
            return v, v

        if len(value) != 2:
            raise ValueError(
                "`anisotropy_scaling` for 3D must be a float or a tuple of length 2 "
                "(scaling_y, scaling_z)."
            )

        return float(value[0]), float(value[1])

    @staticmethod
    def _normalize_3d_angles(
        value: float | tuple[float, float, float],
    ) -> tuple[float, float, float]:
        """Normalize 3D anisotropy angle input to ``(angle_x, angle_y, angle_z)``.

        Parameters
        ----------
        value : float or tuple[float, float, float]
            User-provided 3D anisotropy rotation angles.

        Returns
        -------
        tuple[float, float, float]
            Rotation angles around x, y, and z axes.

        Raises
        ------
        ValueError
            If tuple input does not have length 3.
        """
        if np.isscalar(value):
            v = float(value)
            return v, v, v

        if len(value) != 3:
            raise ValueError(
                "`anisotropy_angle` for 3D must be a float or a tuple of length 3 "
                "(angle_x, angle_y, angle_z)."
            )

        return float(value[0]), float(value[1]), float(value[2])


[docs] class OrdinaryKrigingInterpolator(BasePyKrigeInterpolator): """Interpolate scalar values using ordinary kriging in 2D or 3D. This wrapper selects ``pykrige.ok.OrdinaryKriging`` for 2D inputs and ``pykrige.ok3d.OrdinaryKriging3D`` for 3D inputs. Parameters ---------- variogram_model : {"linear", "power", "gaussian", "spherical", "exponential", "hole-effect", "custom"}, default="linear" Variogram model name forwarded to the selected PyKrige class. variogram_parameters : dict[str, Any] or Sequence[float] or None, default=None Variogram parameters. If ``None``, PyKrige infers them. variogram_function : callable or None, default=None Custom variogram function used only for ``variogram_model="custom"``. nlags : int, default=6 Number of lag bins for variogram fitting. weight : bool, default=False Whether semivariances are weighted in variogram fitting. verbose : bool, default=False Whether PyKrige emits logs. enable_plotting : bool, default=False Whether PyKrige plots variogram fits. exact_values : bool, default=True Whether interpolation reproduces training values exactly. pseudo_inv : bool, default=False Whether to use pseudo-inverse for solving the kriging system. pseudo_inv_type : {"pinv", "pinvh"}, default="pinv" Pseudo-inverse implementation name. backend : {"vectorized", "loop", "C"}, default="vectorized" Execution backend used by PyKrige ``execute``. anisotropy_scaling : float or tuple[float, float], default=1.0 2D uses a single scalar. 3D accepts one scalar or ``(scaling_y, scaling_z)``. anisotropy_angle : float or tuple[float, float, float], default=0.0 2D uses a single scalar. 3D accepts one scalar or ``(angle_x, angle_y, angle_z)``. coordinates_type : {"euclidean", "geographic"}, default="euclidean" Coordinate interpretation for 2D ordinary kriging. """
[docs] def __init__( self, variogram_model: VariogramModel = "linear", variogram_parameters: dict[str, Any] | Sequence[float] | None = None, variogram_function: Callable[..., Any] | None = None, nlags: int = 6, weight: bool = False, verbose: bool = False, enable_plotting: bool = False, exact_values: bool = True, pseudo_inv: bool = False, pseudo_inv_type: str = "pinv", backend: Backend = "vectorized", anisotropy_scaling: float | tuple[float, float] = 1.0, anisotropy_angle: float | tuple[float, float, float] = 0.0, coordinates_type: Literal["euclidean", "geographic"] = "euclidean", ) -> None: """Initialize an ordinary kriging interpolator. Raises ------ ValueError If ``coordinates_type`` is invalid. """ super().__init__( variogram_model=variogram_model, variogram_parameters=variogram_parameters, variogram_function=variogram_function, nlags=nlags, weight=weight, verbose=verbose, enable_plotting=enable_plotting, exact_values=exact_values, pseudo_inv=pseudo_inv, pseudo_inv_type=pseudo_inv_type, backend=backend, ) if coordinates_type not in {"euclidean", "geographic"}: raise ValueError( f"`coordinates_type` must be 'euclidean' or 'geographic'. " f"Got {coordinates_type!r}." ) self.anisotropy_scaling = anisotropy_scaling self.anisotropy_angle = anisotropy_angle self.coordinates_type = coordinates_type
def _build_model(self, coordinates: np.ndarray, values: np.ndarray) -> Any: """Build the ordinary kriging backend model. Parameters ---------- coordinates : numpy.ndarray Training coordinates with shape ``(n_samples, 2)`` or ``(n_samples, 3)``. values : numpy.ndarray Training values with shape ``(n_samples,)``. Returns ------- Any A fitted ``OrdinaryKriging`` or ``OrdinaryKriging3D`` instance. Raises ------ RuntimeError If input dimensionality is not 2 or 3. """ if coordinates.shape[1] == 2: x = coordinates[:, 0] y = coordinates[:, 1] return OrdinaryKriging( x=x, y=y, z=values, variogram_model=self.variogram_model, variogram_parameters=self.variogram_parameters, variogram_function=self.variogram_function, nlags=self.nlags, weight=self.weight, anisotropy_scaling=float(self.anisotropy_scaling), anisotropy_angle=float(self.anisotropy_angle), verbose=self.verbose, enable_plotting=self.enable_plotting, exact_values=self.exact_values, pseudo_inv=self.pseudo_inv, pseudo_inv_type=self.pseudo_inv_type, coordinates_type=self.coordinates_type, ) if coordinates.shape[1] == 3: x = coordinates[:, 0] y = coordinates[:, 1] z = coordinates[:, 2] scaling = self._normalize_3d_scaling(self.anisotropy_scaling) angles = self._normalize_3d_angles(self.anisotropy_angle) return OrdinaryKriging3D( x=x, y=y, z=z, val=values, variogram_model=self.variogram_model, variogram_parameters=self.variogram_parameters, variogram_function=self.variogram_function, nlags=self.nlags, weight=self.weight, anisotropy_scaling_y=scaling[0], anisotropy_scaling_z=scaling[1], anisotropy_angle_x=angles[0], anisotropy_angle_y=angles[1], anisotropy_angle_z=angles[2], verbose=self.verbose, enable_plotting=self.enable_plotting, exact_values=self.exact_values, pseudo_inv=self.pseudo_inv, pseudo_inv_type=self.pseudo_inv_type, ) raise RuntimeError(f"Unsupported dimension: {coordinates.shape[1]}")
[docs] class UniversalKrigingInterpolator(BasePyKrigeInterpolator): """Interpolate scalar values using universal kriging in 2D or 3D. Supports drift terms (regional, specified, functional) consistent with PyKrige's UniversalKriging/UniversalKriging3D implementations. For ``specified`` drift, supply drift arrays to ``predict``/``predict_with_variance`` via ``specified_drift_arrays``. Parameters ---------- variogram_model : {"linear", "power", "gaussian", "spherical", "exponential", "hole-effect", "custom"}, default="linear" Variogram model name forwarded to the selected PyKrige class. variogram_parameters : dict[str, Any] or Sequence[float] or None, default=None Variogram parameters. If ``None``, PyKrige infers them. variogram_function : callable or None, default=None Custom variogram function used only for ``variogram_model="custom"``. nlags : int, default=6 Number of lag bins for variogram fitting. weight : bool, default=False Whether semivariances are weighted in variogram fitting. verbose : bool, default=False Whether PyKrige emits logs. enable_plotting : bool, default=False Whether PyKrige plots variogram fits. exact_values : bool, default=True Whether interpolation reproduces training values exactly. pseudo_inv : bool, default=False Whether to use pseudo-inverse for solving the kriging system. pseudo_inv_type : {"pinv", "pinvh"}, default="pinv" Pseudo-inverse implementation name. backend : {"vectorized", "loop", "C"}, default="vectorized" Execution backend used by PyKrige ``execute``. ``"C"`` is rejected. anisotropy_scaling : float or tuple[float, float], default=1.0 2D uses a single scalar. 3D accepts one scalar or ``(scaling_y, scaling_z)``. anisotropy_angle : float or tuple[float, float, float], default=0.0 2D uses a single scalar. 3D accepts one scalar or ``(angle_x, angle_y, angle_z)``. drift_terms : Iterable[str] or None, default=None Drift terms enabled in universal kriging. point_drift : Any or None, default=None Point-log drift data for 2D universal kriging. external_drift : numpy.ndarray or None, default=None External drift raster for 2D universal kriging. external_drift_x : numpy.ndarray or None, default=None X-axis coordinates for ``external_drift``. external_drift_y : numpy.ndarray or None, default=None Y-axis coordinates for ``external_drift``. specified_drift : Sequence[numpy.ndarray] or None, default=None Per-sample drift arrays used when ``"specified"`` drift is active. functional_drift : Sequence[callable] or None, default=None Callable drift functions used when ``"functional"`` drift is active. """
[docs] def __init__( self, variogram_model: VariogramModel = "linear", variogram_parameters: dict[str, Any] | Sequence[float] | None = None, variogram_function: Callable[..., Any] | None = None, nlags: int = 6, weight: bool = False, verbose: bool = False, enable_plotting: bool = False, exact_values: bool = True, pseudo_inv: bool = False, pseudo_inv_type: str = "pinv", backend: Backend = "vectorized", anisotropy_scaling: float | tuple[float, float] = 1.0, anisotropy_angle: float | tuple[float, float, float] = 0.0, drift_terms: Iterable[str] | None = None, point_drift: Any | None = None, external_drift: np.ndarray | None = None, external_drift_x: np.ndarray | None = None, external_drift_y: np.ndarray | None = None, specified_drift: Sequence[np.ndarray] | None = None, functional_drift: Sequence[Callable[..., Any]] | None = None, ) -> None: """Initialize a universal kriging interpolator. Raises ------ ValueError If ``backend="C"`` is requested. """ super().__init__( variogram_model=variogram_model, variogram_parameters=variogram_parameters, variogram_function=variogram_function, nlags=nlags, weight=weight, verbose=verbose, enable_plotting=enable_plotting, exact_values=exact_values, pseudo_inv=pseudo_inv, pseudo_inv_type=pseudo_inv_type, backend=backend, ) if backend == "C": raise ValueError( "`backend='C'` is not supported for UniversalKriging in PyKrige. " "Use 'vectorized' or 'loop'." ) self.anisotropy_scaling = anisotropy_scaling self.anisotropy_angle = anisotropy_angle self.drift_terms = None if drift_terms is None else list(drift_terms) self.point_drift = point_drift self.external_drift = external_drift self.external_drift_x = external_drift_x self.external_drift_y = external_drift_y self.specified_drift = specified_drift self.functional_drift = functional_drift
def _fit_impl(self, coordinates: np.ndarray, values: np.ndarray) -> None: """Fit the universal kriging model with drift validation. Parameters ---------- coordinates : numpy.ndarray Training coordinates with shape ``(n_samples, n_dims)``. values : numpy.ndarray Training values with shape ``(n_samples,)``. """ self._validate_universal_kriging_args(coordinates, values) self._fit_coordinates = np.asarray(coordinates, dtype=float) self._fit_values = np.asarray(values, dtype=float) self._model = self._build_model(self._fit_coordinates, self._fit_values)
[docs] def predict(self, coordinates: np.ndarray, **execute_kwargs: Any) -> np.ndarray: """Predict values at query coordinates. Parameters ---------- coordinates : numpy.ndarray Query coordinates with shape ``(n_queries, n_dims)``. **execute_kwargs : Any Additional keyword arguments forwarded to PyKrige ``execute``. Returns ------- numpy.ndarray Predicted values with shape ``(n_queries,)``. """ self._check_fitted() coordinates = self._validate_predict_inputs(coordinates) predictions, _ = self._execute_points(coordinates, **execute_kwargs) return predictions
[docs] def predict_variance( self, coordinates: np.ndarray, **execute_kwargs: Any, ) -> np.ndarray: """Predict kriging variance at query coordinates. Parameters ---------- coordinates : numpy.ndarray Query coordinates with shape ``(n_queries, n_dims)``. **execute_kwargs : Any Additional keyword arguments forwarded to PyKrige ``execute``. Returns ------- numpy.ndarray Variance values with shape ``(n_queries,)``. """ self._check_fitted() coordinates = self._validate_predict_inputs(coordinates) _, variance = self._execute_points(coordinates, **execute_kwargs) return variance
[docs] def predict_with_variance( self, coordinates: np.ndarray, **execute_kwargs: Any, ) -> tuple[np.ndarray, np.ndarray]: """Predict values and variance at query coordinates. Parameters ---------- coordinates : numpy.ndarray Query coordinates with shape ``(n_queries, n_dims)``. **execute_kwargs : Any Additional keyword arguments forwarded to PyKrige ``execute``. Returns ------- tuple[numpy.ndarray, numpy.ndarray] Two arrays ``(prediction, variance)``, each shaped ``(n_queries,)``. """ self._check_fitted() coordinates = self._validate_predict_inputs(coordinates) return self._execute_points(coordinates, **execute_kwargs)
def _predict_impl(self, coordinates: np.ndarray) -> np.ndarray: """Internal point prediction implementation. Parameters ---------- coordinates : numpy.ndarray Query coordinates with shape ``(n_queries, n_dims)``. Returns ------- numpy.ndarray Predicted values with shape ``(n_queries,)``. """ predictions, _ = self._execute_points(coordinates) return predictions def _build_model(self, coordinates: np.ndarray, values: np.ndarray) -> Any: """Build the universal kriging backend model. Parameters ---------- coordinates : numpy.ndarray Training coordinates with shape ``(n_samples, 2)`` or ``(n_samples, 3)``. values : numpy.ndarray Training values with shape ``(n_samples,)``. Returns ------- Any A fitted ``UniversalKriging`` or ``UniversalKriging3D`` instance. Raises ------ RuntimeError If input dimensionality is not 2 or 3. """ if coordinates.shape[1] == 2: x = coordinates[:, 0] y = coordinates[:, 1] return UniversalKriging( x=x, y=y, z=values, variogram_model=self.variogram_model, variogram_parameters=self.variogram_parameters, variogram_function=self.variogram_function, nlags=self.nlags, weight=self.weight, anisotropy_scaling=float(self.anisotropy_scaling), anisotropy_angle=float(self.anisotropy_angle), drift_terms=self.drift_terms, point_drift=self.point_drift, external_drift=self.external_drift, external_drift_x=self.external_drift_x, external_drift_y=self.external_drift_y, specified_drift=self.specified_drift, functional_drift=self.functional_drift, verbose=self.verbose, enable_plotting=self.enable_plotting, exact_values=self.exact_values, pseudo_inv=self.pseudo_inv, pseudo_inv_type=self.pseudo_inv_type, ) if coordinates.shape[1] == 3: x = coordinates[:, 0] y = coordinates[:, 1] z = coordinates[:, 2] scaling = self._normalize_3d_scaling(self.anisotropy_scaling) angles = self._normalize_3d_angles(self.anisotropy_angle) return UniversalKriging3D( x=x, y=y, z=z, val=values, variogram_model=self.variogram_model, variogram_parameters=self.variogram_parameters, variogram_function=self.variogram_function, nlags=self.nlags, weight=self.weight, anisotropy_scaling_y=scaling[0], anisotropy_scaling_z=scaling[1], anisotropy_angle_x=angles[0], anisotropy_angle_y=angles[1], anisotropy_angle_z=angles[2], drift_terms=self.drift_terms, specified_drift=self.specified_drift, functional_drift=self.functional_drift, verbose=self.verbose, enable_plotting=self.enable_plotting, exact_values=self.exact_values, pseudo_inv=self.pseudo_inv, pseudo_inv_type=self.pseudo_inv_type, ) raise RuntimeError(f"Unsupported dimension: {coordinates.shape[1]}") def _validate_universal_kriging_args( self, coordinates: np.ndarray, values: np.ndarray, ) -> None: """Validate universal-kriging drift-related inputs. Parameters ---------- coordinates : numpy.ndarray Training coordinates with shape ``(n_samples, n_dims)``. values : numpy.ndarray Training values with shape ``(n_samples,)``. """ dim = int(coordinates.shape[1]) n_samples = int(coordinates.shape[0]) self._validate_drift_terms(dim) self._validate_point_drift(dim) self._validate_external_drift(dim) self._validate_specified_drift(n_samples) self._validate_functional_drift(dim) def _validate_drift_terms(self, dim: int) -> None: """Validate requested drift-term names for the current dimensionality. Parameters ---------- dim : int Interpolation dimensionality (2 or 3). """ if self.drift_terms is None: return if not isinstance(self.drift_terms, (list, tuple)): raise TypeError("`drift_terms` must be a list or tuple of strings.") if len(self.drift_terms) == 0: raise ValueError("`drift_terms` cannot be empty when provided.") allowed_2d = {"regional_linear", "point_log", "external_Z", "specified", "functional"} allowed_3d = {"regional_linear", "specified", "functional"} allowed = allowed_2d if dim == 2 else allowed_3d for term in self.drift_terms: if not isinstance(term, str): raise TypeError( f"Each entry in `drift_terms` must be a string. Got {type(term).__name__}." ) if term not in allowed: raise ValueError( f"Unsupported drift term {term!r} for dim={dim}. " f"Allowed terms are {sorted(allowed)}." ) def _validate_point_drift(self, dim: int) -> None: """Validate point-log drift input for 2D universal kriging. Parameters ---------- dim : int Interpolation dimensionality (2 or 3). """ if self.point_drift is None: return if dim != 2: raise ValueError("`point_drift` is supported only for 2D Universal Kriging.") if self.drift_terms is not None and "point_log" not in self.drift_terms: raise ValueError( "`point_drift` was provided, but `drift_terms` does not include 'point_log'." ) arr = np.asarray(self.point_drift, dtype=float) if arr.ndim == 1: if arr.size == 0: raise ValueError("`point_drift` cannot be empty.") if arr.size % 3 != 0: raise ValueError( "`point_drift` 1D form must contain triples [x, y, value, ...]." ) if not np.isfinite(arr).all(): raise ValueError("`point_drift` contains NaN or inf.") return if arr.ndim == 2: if arr.shape[0] == 0: raise ValueError("`point_drift` cannot be empty.") if arr.shape[1] != 3: raise ValueError( "`point_drift` 2D form must have shape (n_points, 3) " "with columns [x, y, value]." ) if not np.isfinite(arr).all(): raise ValueError("`point_drift` contains NaN or inf.") return raise ValueError( "`point_drift` must be either a 1D flat array or a 2D array " "of shape (n_points, 3)." ) def _validate_external_drift(self, dim: int) -> None: """Validate external drift arrays for 2D universal kriging. Parameters ---------- dim : int Interpolation dimensionality (2 or 3). """ has_any_external = any( item is not None for item in (self.external_drift, self.external_drift_x, self.external_drift_y) ) if not has_any_external: return if dim != 2: raise ValueError("External drift is supported only for 2D Universal Kriging.") if self.drift_terms is not None and "external_Z" not in self.drift_terms: raise ValueError( "External drift arrays were provided, but `drift_terms` " "does not include 'external_Z'." ) all_present = ( self.external_drift is not None and self.external_drift_x is not None and self.external_drift_y is not None ) if not all_present: raise ValueError( "External drift requires all of `external_drift`, " "`external_drift_x`, and `external_drift_y`." ) ext = np.asarray(self.external_drift, dtype=float) ext_x = np.asarray(self.external_drift_x, dtype=float) ext_y = np.asarray(self.external_drift_y, dtype=float) if ext.ndim != 2: raise ValueError("`external_drift` must be a 2D array.") if ext_x.ndim != 1: raise ValueError("`external_drift_x` must be a 1D array.") if ext_y.ndim != 1: raise ValueError("`external_drift_y` must be a 1D array.") if ext.shape != (ext_y.size, ext_x.size): raise ValueError( "`external_drift` shape must match " f"(len(external_drift_y), len(external_drift_x)). " f"Got {ext.shape} vs ({ext_y.size}, {ext_x.size})." ) if ext_x.size < 2 or ext_y.size < 2: raise ValueError( "`external_drift_x` and `external_drift_y` must each contain " "at least 2 coordinates." ) if not np.isfinite(ext).all(): raise ValueError("`external_drift` contains NaN or inf.") if not np.isfinite(ext_x).all(): raise ValueError("`external_drift_x` contains NaN or inf.") if not np.isfinite(ext_y).all(): raise ValueError("`external_drift_y` contains NaN or inf.") def _validate_specified_drift(self, n_samples: int) -> None: """Validate per-sample specified drift arrays. Parameters ---------- n_samples : int Number of training samples used for fitting. """ if self.specified_drift is None: return if not isinstance(self.specified_drift, (list, tuple)): raise TypeError("`specified_drift` must be a list or tuple of arrays.") if len(self.specified_drift) == 0: raise ValueError("`specified_drift` cannot be empty when provided.") if self.drift_terms is not None and "specified" not in self.drift_terms: raise ValueError( "`specified_drift` was provided, but `drift_terms` does not include 'specified'." ) for i, arr in enumerate(self.specified_drift): a = np.asarray(arr, dtype=float) if a.ndim != 1: raise ValueError( f"`specified_drift[{i}]` must be 1D with one value per sample. " f"Got shape {a.shape}." ) if a.shape[0] != n_samples: raise ValueError( f"`specified_drift[{i}]` length must match number of fitted samples " f"({n_samples}). Got {a.shape[0]}." ) if not np.isfinite(a).all(): raise ValueError(f"`specified_drift[{i}]` contains NaN or inf.") def _validate_functional_drift(self, dim: int) -> None: """Validate functional drift callables. Parameters ---------- dim : int Interpolation dimensionality (2 or 3). """ if self.functional_drift is None: return if not isinstance(self.functional_drift, (list, tuple)): raise TypeError("`functional_drift` must be a list or tuple of callables.") if len(self.functional_drift) == 0: raise ValueError("`functional_drift` cannot be empty when provided.") if self.drift_terms is not None and "functional" not in self.drift_terms: raise ValueError( "`functional_drift` was provided, but `drift_terms` does not include 'functional'." ) for i, fn in enumerate(self.functional_drift): if not callable(fn): raise TypeError( f"`functional_drift[{i}]` must be callable. " f"Got {type(fn).__name__}." ) try: if dim == 2: out = fn(np.array([0.0]), np.array([0.0])) elif dim == 3: out = fn(np.array([0.0]), np.array([0.0]), np.array([0.0])) else: # pragma: no cover raise RuntimeError(f"Unsupported dim={dim}.") except Exception as e: raise ValueError( f"`functional_drift[{i}]` could not be called with " f"{dim}D coordinate arrays: {e}" ) from e out = np.asarray(out, dtype=float) if out.shape not in {(), (1,)}: raise ValueError( f"`functional_drift[{i}]` must return a scalar or shape-(n,) array. " f"Test call returned shape {out.shape}." ) if not np.isfinite(out).all(): raise ValueError( f"`functional_drift[{i}]` returned NaN or inf in the validation call." )