from __future__ import annotations
import warnings
from collections.abc import Callable, Sequence
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any, Literal
import numpy as np
from ..grids.sampling._validation import _validate_vertex_array
from ..grids.sampling._vertices import _resolve_xyz_vertices
from .builders.cornerpoint import _build_zcorn_from_zones
from ..errors.property import MissingEclipseKeywordError
from ..errors.grid import UnsupportedGridAttributeError
from ..eclipse.grids.write import GRDECLWriter
from ..eclipse.grids.read import GRDECLReader
from .pillars import PillarGrid
from ..models.zone import Zone
from ..models.boundary import BoundaryPolygon
from ..models.wells import VerticalWell, _validate_well_sequence
from ..grids.properties import GridProperties, GridProperty
from .._validation import _validate_finite_float
@dataclass(frozen=True)
class GridAttributeSpec:
"""Specification for a computed grid attribute.
Parameters
----------
name : str
Canonical attribute name exposed to users.
description : str
Human-readable description of the attribute semantics.
resolver : Callable[[CornerPointGrid], np.ndarray]
Callable that computes the attribute array from a grid instance.
aliases : tuple[str, ...], default ()
Additional accepted names that resolve to the same attribute.
Notes
-----
Instances of this class are immutable and used to build lookup tables for
attribute-name and alias resolution.
"""
name: str
description: str
resolver: Callable[[CornerPointGrid], np.ndarray]
aliases: tuple[str, ...] = ()
GRID_ATTRIBUTES = (
GridAttributeSpec("x", "cell-center x coordinate", lambda g: g.cell_centers[..., 0]),
GridAttributeSpec("y", "cell-center y coordinate", lambda g: g.cell_centers[..., 1]),
GridAttributeSpec(
"depth",
"cell-center depth (positive downward)",
lambda g: g.cell_centers[..., 2],
aliases=("z",),
),
GridAttributeSpec("top", "top depth of cell", lambda g: g._cell_top_depth()),
GridAttributeSpec("bottom", "bottom depth of cell", lambda g: g._cell_bottom_depth()),
GridAttributeSpec("thickness", "cell thickness", lambda g: g._cell_thickness()),
GridAttributeSpec("active", "1 for active cells, 0 for inactive", lambda g: g.active.astype(int)),
)
GRID_ATTRIBUTE_BY_NAME = {spec.name: spec for spec in GRID_ATTRIBUTES}
GRID_ATTRIBUTE_BY_ALIAS = {
alias: spec
for spec in GRID_ATTRIBUTES
for alias in (spec.name, *spec.aliases)
}
RESERVED_GRID_PROPERTY_NAMES = frozenset(GRID_ATTRIBUTE_BY_ALIAS)
[docs]
@dataclass
class CornerPointGrid:
"""3D corner-point grid with explicit corner coordinates.
Parameters
----------
pillars : PillarGrid
Lateral pillar geometry defining i–j topology.
zcorn : ndarray
Corner depths shaped ``(2*nk, 2*nj, 2*ni)`` in Eclipse order.
active : ndarray or None, optional
Boolean mask ``(nk, nj, ni)``; ``None`` marks all cells active.
zone_index : ndarray or None, optional
Zone id per cell, same shape as the grid. ``0`` denotes gap.
zone_names : dict[int, str], optional
Mapping of zone id to zone name.
Notes
-----
Properties are managed via :class:`GridProperties` and are not stored in
``zcorn``. Use :meth:`from_zones` to build grids from stratigraphic zones.
"""
pillars: PillarGrid
zcorn: np.ndarray # Shape (2*nk, 2*nj, 2*ni)
active: np.ndarray | None = None # Shape (nk, nj, ni), boolean, or None for all active
zone_index: np.ndarray | None = None # shape (nk, nj, ni), int
zone_names: dict[int, str] = field(default_factory=dict)
_properties: dict[str, GridProperty] = field(default_factory=dict, repr=False)
_zone_name_to_id: dict[str, int] = field(default_factory=dict, init=False, repr=False)
[docs]
def __post_init__(self) -> None:
"""Validate grid arrays and initialize zone lookup metadata.
Raises
------
ValueError
If ``zcorn``, ``active``, or attached property shapes do not match
the grid dimensions, or when a property belongs to a different grid.
TypeError
If an attached entry in ``_properties`` is not a
:class:`GridProperty` instance.
Notes
-----
When ``active`` is ``None``, an all-active broadcast view is created for
memory efficiency.
"""
nj, ni = self.pillars.cell_shape
nk = self.zcorn.shape[0]//2
expected_corner_shape = (2*nk, 2*nj, 2*ni)
expected_cell_shape = (nk, nj, ni)
expected_active_shape = expected_cell_shape
if self.zcorn.shape != expected_corner_shape:
raise ValueError(
f"zcorn shape {self.zcorn.shape} != expected {expected_corner_shape}"
)
# Initialize or validate active array
if self.active is None:
# Use broadcast for memory-efficient all-active representation
self.active = np.broadcast_to(True, expected_active_shape)
else:
self.active = np.asarray(self.active, dtype=bool)
if self.active.shape != expected_active_shape:
raise ValueError(
f"active shape {self.active.shape} != expected {expected_active_shape}"
)
# Validate properties
for name, prop in self._properties.items():
if not isinstance(prop, GridProperty):
raise TypeError(
f"Property '{name}' must be a GridProperty, got {type(prop)}."
)
if prop.values.shape != expected_cell_shape:
raise ValueError(
f"Property '{name}' shape {prop.values.shape} != expected {expected_cell_shape}"
)
if prop.grid is not self:
raise ValueError(
f"Property '{name}' belongs to a different grid instance."
)
self.set_zones(zone_index=self.zone_index, zone_names=self.zone_names)
def _zone_mask(self, zone: str | Zone) -> np.ndarray:
"""Build a boolean mask for cells that belong to a selected zone.
Parameters
----------
zone : str or Zone
Zone name or zone object to select.
Returns
-------
np.ndarray
Boolean mask with shape ``(nk, nj, ni)`` where ``True`` marks cells
assigned to the requested zone.
Raises
------
ValueError
If no zone definition is available on the grid.
"""
if self.zone_index is None:
raise ValueError("Grid has no zones defined.")
zone_name = self._normalize_zone_name(zone)
zone_id = self._get_zone_id_from_name(zone_name)
return self.zone_index == zone_id
[docs]
def summary(self) -> str:
"""Create a human-readable summary of grid dimensions and metadata.
Returns
-------
str
Multi-line summary including shape, active/inactive counts,
registered property names, and zone names.
Examples
--------
>>> text = grid.summary()
>>> "Grid Summary" in text
True
"""
nk, nj, ni = self.shape
total_cells = self.n_cells
active = self.n_active
inactive = self.n_inactive
active_pct = 100 * active / total_cells if total_cells else 0.0
inactive_pct = 100.0 - active_pct
prop_names = list(self._properties.keys())
zone_names = list(self.zone_names.values()) if self.zone_names else []
def fmt(items: list[str], max_items: int = 4) -> str:
if not items:
return "Not Defined"
if len(items) <= max_items:
return ", ".join(items)
return f"{', '.join(items[:max_items])}, … (+{len(items) - max_items} more)"
lines = [
"Grid Summary",
"════════════",
f"Shape : {nk}×{nj}×{ni} (k×j×i)",
f"Cells : {total_cells}",
f"Active : {active} ({active_pct:.2f}%)",
f"Inactive : {inactive} ({inactive_pct:.2f}%)",
f"Properties : {fmt(prop_names)}",
f"Zones : {fmt(zone_names)}",
]
return "\n".join(lines)
@staticmethod
def _normalize_zone_name(zone: str | Zone) -> str:
"""Normalize a zone selector into a zone-name string.
Parameters
----------
zone : str or Zone
Zone selector provided by caller.
Returns
-------
str
Zone name to be used in internal lookups.
Raises
------
TypeError
If ``zone`` is neither a string nor a :class:`Zone`.
"""
if isinstance(zone, Zone):
return zone.name
if not isinstance(zone, str):
raise TypeError("`zone` must be str, or `Zone` instance.")
return zone
def _get_zone_id_from_name(self, name: str) -> int:
"""Resolve a zone name to its integer zone identifier.
Parameters
----------
name : str
Name of the zone.
Returns
-------
int
Integer identifier associated with the zone name.
Raises
------
ValueError
If the provided zone name is not present in the grid lookup table.
"""
if name not in self._zone_name_to_id:
raise ValueError(f"Zone '{name}' not found in grid zones.")
return self._zone_name_to_id[name]
[docs]
def set_zones(
self,
zone_index: np.ndarray | None,
zone_names: dict[int, str] | None,
) -> None:
"""Assign zone membership arrays and zone-name metadata.
Parameters
----------
zone_index : np.ndarray
Integer array of shape ``(nk, nj, ni)`` assigning each cell to a
zone. Values use this convention:
- ``0`` means gap, undefined, or unassigned.
- ``> 0`` means a valid zone id.
zone_names : dict[int, str], optional
Mapping from zone id to zone name.
If not provided, names will be auto-generated as "Zone {id}".
Raises
------
ValueError
If shapes mismatch, invalid ids are present, or names are inconsistent.
TypeError
If ``zone_index`` cannot be converted to an integer ndarray.
"""
# ----------------------------
# 1. Validate zone_index
# ----------------------------
if zone_index is None:
if not zone_names:
zone_names = {}
else:
raise ValueError("`zone_names` provided without `zone_index`.")
else:
try:
zone_index = np.asarray(zone_index, dtype=np.int32)
except Exception as e:
raise TypeError(f"Failed to convert `zone_index` to numpy array: {e}") from e
if zone_index.shape != self.shape:
raise ValueError(
f"`zone_index` shape {zone_index.shape} != expected {self.shape}"
)
if np.any(zone_index < 0):
raise ValueError("`zone_index` cannot contain negative values.")
# Extract used zone IDs (exclude 0 = gap)
unique_ids = np.unique(zone_index)
unique_ids = unique_ids[unique_ids != 0]
# ----------------------------
# 2. Handle zone_names
# ----------------------------
if not zone_names:
zone_names = {int(i): f"Zone {int(i)}" for i in unique_ids}
else:
# Ensure dict[int, str]
if not isinstance(zone_names, dict):
raise ValueError("`zone_names` must be a dict[int, str].")
if 0 in zone_names:
raise ValueError("Zone ID 0 is reserved for gaps and cannot be named.")
# Convert keys to int
zone_names = {int(k): str(v) for k, v in zone_names.items()}
# ---- 2a. Check duplicate names ----
names = list(zone_names.values())
if len(names) != len(set(names)):
duplicates = {n for n in names if names.count(n) > 1}
raise ValueError(f"Duplicate zone names detected: {duplicates}")
# ---- 2b. Check all used IDs have names except 0 (gap) ----
missing = set(unique_ids) - set(zone_names.keys())
missing.discard(0) # Allow 0 (gap) to be unnamed
if missing:
raise ValueError(
f"`zone_names` missing definitions for zone ids: {sorted(missing)}"
)
# ---- 2c. Optional: warn about unused names ----
unused = set(zone_names.keys()) - set(unique_ids)
unused.discard(0) # Allow 0 (gap) to be unused
if unused:
warnings.warn(
f"Unused zone ids in `zone_names`: {sorted(unused)}",
UserWarning,
)
# ----------------------------
# 3. Assign
# ----------------------------
self.zone_index = zone_index
self.zone_names = zone_names
self._zone_name_to_id = {v: k for k, v in zone_names.items()}
# ----------------------------
# Dimensions
# ----------------------------
@property
def properties(self) -> GridProperties:
"""Return the property manager bound to this grid.
Returns
-------
GridProperties
Dictionary-like facade used to access and validate cell properties.
"""
return GridProperties(self)
@property
def ni(self) -> int:
"""Get the number of cells in the i direction.
Returns
-------
int
Cell count along the i axis.
"""
return self.pillars.ni
@property
def nj(self) -> int:
"""Get the number of cells in the j direction.
Returns
-------
int
Cell count along the j axis.
"""
return self.pillars.nj
@property
def nk(self) -> int:
"""Get the number of layers in the k direction.
Returns
-------
int
Layer count inferred from ``zcorn``.
"""
return self.zcorn.shape[0] // 2
@property
def shape(self) -> tuple[int, int, int]:
"""Get grid dimensions ordered as ``(nk, nj, ni)``.
Returns
-------
tuple[int, int, int]
Layer, row, and column cell counts.
"""
return (self.nk, self.nj, self.ni)
@property
def n_cells(self) -> int:
"""Get the total number of cells in the grid.
Returns
-------
int
Product of ``nk * nj * ni``.
"""
return self.nk * self.nj * self.ni
@property
def n_active(self) -> int:
"""Get the number of active cells.
Returns
-------
int
Count of ``True`` values in ``active``.
"""
return int(np.nansum(self.active))
@property
def n_inactive(self) -> int:
"""Get the number of inactive cells.
Returns
-------
int
Difference between total and active cell counts.
"""
return self.n_cells - self.n_active
@property
def cell_centers(self) -> np.ndarray:
"""Cell centers computed as the mean of 8 corners.
Returns
-------
ndarray
Coordinates shaped ``(nk, nj, ni, 3)``.
"""
corners = self._compute_cell_corners() # Shape (nk, nj, ni, 8, 3)
cell_centers = np.mean(corners, axis=3) # Average over 8 corners
return cell_centers
@property
def cell_volumes(self) -> np.ndarray:
"""Cell volumes computed via hexahedral decomposition.
Returns
-------
ndarray
Volumes shaped ``(nk, nj, ni)``.
"""
corners = self._compute_cell_corners() # Shape (nk, nj, ni, 8, 3)
# Decompose each hexahedron into 6 tetrahedra using diagonal 0-7
# Tetrahedron volume formula: |det([v1-v0, v2-v0, v3-v0])| / 6
# Corner ordering (from _compute_cell_corners):
# 0-3: Top face (i,j), (i+1,j), (i,j+1), (i+1,j+1)
# 4-7: Bottom face (i,j), (i+1,j), (i,j+1), (i+1,j+1)
def tet_volume(
v0: np.ndarray,
v1: np.ndarray,
v2: np.ndarray,
v3: np.ndarray,
) -> np.ndarray:
"""Compute signed tetrahedron volumes for vectorized vertices.
Parameters
----------
v0 : np.ndarray
First tetrahedron vertex array.
v1 : np.ndarray
Second tetrahedron vertex array.
v2 : np.ndarray
Third tetrahedron vertex array.
v3 : np.ndarray
Fourth tetrahedron vertex array.
Returns
-------
np.ndarray
Signed tetrahedron volumes with broadcast-compatible shape.
"""
# v1-v0, v2-v0, v3-v0 form the edge vectors
# Volume = det([v1-v0, v2-v0, v3-v0]) / 6
a = v1 - v0
b = v2 - v0
c = v3 - v0
# Compute determinant using scalar triple product: a · (b × c)
cross = np.cross(b, c)
det = np.sum(a * cross, axis=-1)
return det / 6.0
# Extract corners for each tetrahedron
c0, c1, c2, c3 = corners[..., 0, :], corners[..., 1, :], corners[..., 2, :], corners[..., 3, :]
c4, c5, c6, c7 = corners[..., 4, :], corners[..., 5, :], corners[..., 6, :], corners[..., 7, :]
# Decompose into 6 tetrahedra using diagonal 0-7
vol = np.zeros(corners.shape[:3]) # Shape (nk, nj, ni)
vol += tet_volume(c0, c1, c3, c7)
vol += tet_volume(c0, c1, c5, c7)
vol += tet_volume(c0, c4, c5, c7)
vol += tet_volume(c0, c2, c3, c7)
vol += tet_volume(c0, c2, c6, c7)
vol += tet_volume(c0, c4, c6, c7)
# Take absolute value to handle orientation
volumes = np.abs(vol)
return volumes
[docs]
@classmethod
def from_grdecl(
cls, path: str | Path,
*,
use_actnum: bool = True,
properties: Sequence[str] | None = None,
) -> CornerPointGrid:
"""Load a grid from a GRDECL file.
Parameters
----------
path : str or Path
Path to GRDECL file containing COORD/ZCORN (and optional ACTNUM).
use_actnum : bool, default True
When True, read ACTNUM to set active cells; otherwise all active.
Returns
-------
CornerPointGrid
Grid populated from GRDECL arrays.
"""
if properties is not None:
try:
properties = tuple(properties)
except Exception as e:
raise TypeError(f"`properties` must be a sequence of property names.") from e
data = GRDECLReader().read(path, use_actnum=use_actnum, properties=properties) # returns raw arrays/spec, not a grid
coord = data.coord
zcorn = data.zcorn
actnum = data.actnum
# For properties, we need to convert raw arrays into GridProperty instances
pillars = PillarGrid.from_eclipse_coord(coord)
grid = cls(pillars=pillars, zcorn=zcorn, active=actnum)
for kw, val in data.properties.items():
if kw in RESERVED_GRID_PROPERTY_NAMES:
raise UnsupportedGridAttributeError(
f"Cannot load property '{kw}' from GRDECL because it conflicts with a reserved grid attribute name."
)
prop = grid.properties.create(
name=kw,
eclipse_keyword=kw
)
prop.from_array(val)
return grid
[docs]
def to_grdecl(
self,
path: str | Path,
*,
properties: Sequence[str] | None = None,
include_actnum: bool = True,
) -> None:
"""Write grid geometry and selected properties to a GRDECL file.
Parameters
----------
path : str or Path
Output file path.
properties : Sequence[str] or None, optional
Property names to export. When ``None``, all attached properties are
written.
include_actnum : bool, default True
Whether to export ``ACTNUM`` from the grid active mask.
Raises
------
TypeError
If ``properties`` is not a valid sequence of names.
MissingEclipseKeywordError
If an included property has no Eclipse keyword configured.
"""
coord = self.pillars.to_eclipse_coord()
zcorn = self.zcorn
actnum = self.active.astype(int) if include_actnum else None
writer = GRDECLWriter()
if properties is None:
properties = list(self._properties.keys())
else:
try:
properties = tuple(properties)
except Exception as e:
raise TypeError(f"`properties` must be a sequence of property names.") from e
_properties = {}
if properties:
for prop_name in properties:
prop = self.properties[prop_name]
eclipse_keyword = prop.eclipse_keyword
if eclipse_keyword is None:
raise MissingEclipseKeywordError(property_name=prop_name)
_properties[eclipse_keyword] = prop.values
return writer.write(
path=path,
coord=coord,
zcorn=zcorn,
actnum=actnum,
properties=_properties,
)
[docs]
def show(
self,
show_inactive: bool = False,
scalars: str | None = None,
color: Any = 'tan',
cmap: str | None = 'turbo',
title: str | None = None,
z_scale: float = 1.0,
wells: Sequence[VerticalWell] | VerticalWell | None = None,
**kwargs: Any,
) -> None:
"""Render the grid in 3D PyVista viewer.
Parameters
----------
show_inactive : bool, default False
Whether to display inactive cells.
scalars : str or None, optional
Property name to color by; if ``None`` uses solid color.
color : Any, default 'tan'
Solid color when ``scalars`` is not provided.
cmap : str or None, default 'turbo'
Colormap applied when ``scalars`` is provided.
title : str or None, optional
Figure title.
z_scale : float, default 1.0
Scale factor for the z-axis.
wells : VerticalWell or Sequence[VerticalWell] or None, optional
Well(s) to plot on top of the grid. Can be a single VerticalWell or a sequence of them. If ``None``, no wells are plotted.
**kwargs
Forwarded to viewer ``add_grid``.
"""
from ..viewers.viewer3d.pyvista.theme import PyVista3DViewerTheme
from ..viewers.viewer3d.pyvista.viewer import PyVista3DViewer
if not np.isfinite(z_scale) or z_scale <= 0:
raise ValueError("z_scale must be a positive finite value.")
theme = PyVista3DViewerTheme(scale=(1.0, 1.0, float(z_scale)))
viewer = PyVista3DViewer(theme=theme)
scalars = self._resolve_source(scalars) if scalars is not None else None
color = None if scalars is not None else color
viewer.add_grid(grid=self, show_inactive=show_inactive, color=color, scalars=scalars, cmap=cmap, **kwargs)
if wells is not None:
viewer.add_wells(_validate_well_sequence(wells))
viewer.show(title=title)
[docs]
@classmethod
def from_rectilinear(
cls,
x: np.ndarray,
y: np.ndarray,
z: np.ndarray
) -> CornerPointGrid:
"""Create a corner-point grid from rectilinear coordinate vertices.
Parameters
----------
x : np.ndarray
Monotonic x-vertex coordinates with shape ``(ni + 1,)``.
y : np.ndarray
Monotonic y-vertex coordinates with shape ``(nj + 1,)``.
z : np.ndarray
Layer-interface depths with shape ``(nk + 1,)``.
Returns
-------
CornerPointGrid
Grid with vertical pillars and broadcasted layer surfaces.
"""
z = _validate_vertex_array(z, "z")
pillars = PillarGrid.from_rectilinear(x=x, y=y, top=z[0], base=z[-1])
# Create simple ZCORN with flat layers
nj, ni = pillars.cell_shape
nk = len(z) - 1
# Naive loop version (for clarity)
# zcorn = np.zeros((2*nk, 2*nj, 2*ni))
# for k in range(nk):
# zcorn[2*k:2*k+2, :, :] = z[k:k+2][:, np.newaxis, np.newaxis]
# Fully vectorized version (optimized)
z_planes = np.empty(2 * nk, dtype=z.dtype)
z_planes[0::2] = z[:-1]
z_planes[1::2] = z[1:]
zcorn = np.broadcast_to(
z_planes[:, None, None],
(2 * nk, 2 * nj, 2 * ni),
).copy()
return cls(pillars=pillars, zcorn=zcorn)
[docs]
@classmethod
def from_regular(
cls,
*,
xlim: tuple[float, float] | None = None,
ylim: tuple[float, float] | None = None,
zlim: tuple[float, float] | None = None,
ni: int | None = None,
nj: int | None = None,
nk: int | None = None,
dx: float | None = None,
dy: float | None = None,
dz: float | None = None,
) -> CornerPointGrid:
"""Build a rectilinear corner-point grid from limits or spacings.
Parameters
----------
xlim : tuple[float, float] or None, optional
Minimum and maximum x bounds.
ylim : tuple[float, float] or None, optional
Minimum and maximum y bounds.
zlim : tuple[float, float] or None, optional
Minimum and maximum z bounds.
ni : int or None, optional
Number of cells in i direction.
nj : int or None, optional
Number of cells in j direction.
nk : int or None, optional
Number of cells in k direction.
dx : float or None, optional
Cell size in i direction.
dy : float or None, optional
Cell size in j direction.
dz : float or None, optional
Cell size in k direction.
Returns
-------
CornerPointGrid
Corner-point grid resolved from the provided geometric constraints.
"""
x, y, z = _resolve_xyz_vertices(
xlim=xlim, ylim=ylim, zlim=zlim, ni=ni, nj=nj, nk=nk, dx=dx, dy=dy, dz=dz
)
return cls.from_rectilinear(x=x, y=y, z=z)
[docs]
@classmethod
def from_zones(cls, *, pillars: PillarGrid, zones: Sequence[Zone]) -> CornerPointGrid:
"""Create CornerPointGrid from zones with gap detection.
Parameters
----------
pillars : PillarGrid
Lateral pillar geometry
zones : Sequence[Zone]
Zones in stratigraphic order (top to bottom)
Returns
-------
CornerPointGrid
Grid with gap-filling cells marked as inactive (ACTNUM=0)
"""
zcorn, actnum, zone_index, zone_names = _build_zcorn_from_zones(pillars=pillars, zones=zones)
return cls(pillars=pillars, zcorn=zcorn, active=actnum, zone_index=zone_index, zone_names=zone_names)
[docs]
def apply_boundary(self, boundary: BoundaryPolygon) -> CornerPointGrid:
"""Mask grid activity using a boundary polygon.
Cells whose centers fall outside the polygon become inactive (ACTNUM=0).
Parameters
----------
boundary : BoundaryPolygon
XY boundary polygon used to clip the grid.
Returns
-------
CornerPointGrid
Updated grid with activity masked by the boundary.
Notes
-----
Updates ``active`` in place; cached property masks may become stale.
"""
# Get cell centers (nk, nj, ni, 3)
centers = self.cell_centers
# Extract XY coordinates and reshape to (n_cells, 2)
nk, nj, ni = self.shape
xy = centers[..., :2].reshape(-1, 2) # Shape: (nk*nj*ni, 2)
# Check which cells are inside the boundary
inside_mask = boundary.contains(xy) # Shape: (nk*nj*ni,)
# Reshape back to grid dimensions
inside_mask = inside_mask.reshape(nk, nj, ni)
# Update active array: cells outside boundary become inactive
# Need to create a writable copy if active is a broadcast array
if not self.active.flags.writeable:
self.active = np.array(self.active, dtype=bool)
self.active &= inside_mask
return self
def _surface_cell_at_xy(self, x: float, y: float, surface: Literal["top", "bottom"]) -> tuple[int, int] | None:
"""Find the (i, j) column index whose surface quad contains point (x, y).
Uses a cross-product sign test on all cells simultaneously — no loops.
Parameters
----------
x, y : float
Query point in grid XY coordinates.
surface : Literal["top", "bottom"]
The surface to check.
Returns
-------
tuple[int, int] or None
Zero-based ``(i, j)`` indices of the containing column, or ``None`` if outside the grid.
"""
if surface == "top":
pt = self.pillars.pillar_top # (nj+1, ni+1, 3)
elif surface == "bottom":
pt = self.pillars.pillar_bottom # (nj+1, ni+1, 3)
else:
raise ValueError("`surface` must be 'top' or 'bottom'.")
# Each cell (j, i) is bounded by 4 pillars at corners:
# (j, i ) = "00" (j, i+1) = "10"
# (j+1, i+1) = "11" (j+1, i ) = "01"
#
# Slicing off-by-one gives us (nj, ni) arrays — one value per cell.
x00 = pt[:-1, :-1, 0]; y00 = pt[:-1, :-1, 1]
x10 = pt[:-1, 1:, 0]; y10 = pt[:-1, 1:, 1]
x11 = pt[ 1:, 1:, 0]; y11 = pt[ 1:, 1:, 1]
x01 = pt[ 1:, :-1, 0]; y01 = pt[ 1:, :-1, 1]
# ----------------------------------------------------------------
# Cross product sign test (how it works)
# ----------------------------------------------------------------
# For two points A and B defining a directed edge, the 2D cross product
# of (B - A) and (P - A) tells you which *side* of that edge point P is on:
#
# cross = (B.x - A.x) * (P.y - A.y) - (B.y - A.y) * (P.x - A.x)
#
# cross > 0 → P is to the LEFT of edge A→B
# cross < 0 → P is to the RIGHT of edge A→B
# cross = 0 → P is exactly ON edge A→B
#
# If we walk around the quad in order 00→10→11→01→(back to 00),
# a point INSIDE the quad will be on the SAME side of all 4 edges.
# A point OUTSIDE will be on the wrong side of at least one edge.
#
# Because pillar grids can be wound either CW or CCW depending on
# coordinate system, we accept both "all positive" and "all negative".
# ----------------------------------------------------------------
def _cross(ax, ay, bx, by):
# All inputs are (nj, ni) arrays; x and y are scalars.
# Result is (nj, ni) — one signed area value per cell.
return (bx - ax) * (y - ay) - (by - ay) * (x - ax)
d0 = _cross(x00, y00, x10, y10) # edge 00 → 10 (bottom edge)
d1 = _cross(x10, y10, x11, y11) # edge 10 → 11 (right edge)
d2 = _cross(x11, y11, x01, y01) # edge 11 → 01 (top edge)
d3 = _cross(x01, y01, x00, y00) # edge 01 → 00 (left edge)
inside = (
((d0 >= 0) & (d1 >= 0) & (d2 >= 0) & (d3 >= 0)) | # CCW winding
((d0 <= 0) & (d1 <= 0) & (d2 <= 0) & (d3 <= 0)) # CW winding
) # (nj, ni) bool
hits = np.argwhere(inside)
if hits.size == 0:
return None
return (int(hits[0, 1]), int(hits[0, 0])) # (i, j)
[docs]
def well_indices(self, well: VerticalWell | tuple[float, float]) -> tuple[int, int] | None:
"""Locate the surface grid column indices intersected by a vertical well.
Parameters
----------
well : VerticalWell or tuple[float, float]
Well object providing ``x`` and ``y`` map coordinates, or a tuple of coordinates.
Returns
-------
tuple[int, int] or None
Zero-based ``(i, j)`` indices of the top-surface column containing the well
head location. Returns ``None`` when the well lies outside the
grid footprint.
Notes
-----
This method performs a geometric XY containment lookup on the grid top
surface only. It does not evaluate completion intervals or active-cell
status.
"""
if isinstance(well, VerticalWell):
x, y = well.x, well.y
elif isinstance(well, tuple) and len(well) == 2:
x, y = well
try:
x = _validate_finite_float(x, "well x-coordinate")
y = _validate_finite_float(y, "well y-coordinate")
except (TypeError, ValueError) as e:
raise TypeError("`well` coordinates must be finite numbers.") from e
else:
raise TypeError("`well` must be an instance of VerticalWell or a tuple of coordinates (x, y).")
return self._surface_cell_at_xy(x, y, surface="top")
def _compute_cell_corners(self) -> np.ndarray:
"""Compute 3D coordinates of all 8 corners for each cell.
Returns
-------
ndarray
Corner coordinates shaped ``(nk, nj, ni, 8, 3)`` ordered as:
0-3 top face (k-layer top), 4-7 bottom face, each face ordered
(i, j), (i+1, j), (i, j+1), (i+1, j+1).
"""
nk, nj, ni = self.shape
corners = np.zeros((nk, nj, ni, 8, 3))
# Extract z-coordinates for all 8 corners
# Top face (layer k top)
z_top_00 = self.zcorn[0::2, 0::2, 0::2] # (nk, nj, ni) - pillar (j, i)
z_top_10 = self.zcorn[0::2, 0::2, 1::2] # pillar (j, i+1)
z_top_01 = self.zcorn[0::2, 1::2, 0::2] # pillar (j+1, i)
z_top_11 = self.zcorn[0::2, 1::2, 1::2] # pillar (j+1, i+1)
# Bottom face (layer k bottom)
z_bot_00 = self.zcorn[1::2, 0::2, 0::2] # pillar (j, i)
z_bot_10 = self.zcorn[1::2, 0::2, 1::2] # pillar (j, i+1)
z_bot_01 = self.zcorn[1::2, 1::2, 0::2] # pillar (j+1, i)
z_bot_11 = self.zcorn[1::2, 1::2, 1::2] # pillar (j+1, i+1)
# Get pillar endpoints
pt = self.pillars.pillar_top # Shape (nj+1, ni+1, 3)
pb = self.pillars.pillar_bottom # Shape (nj+1, ni+1, 3)
# Interpolate along pillars for each corner
# For each pillar, compute t = (z - z_top) / (z_bottom - z_top)
# Then position = pillar_top + t * (pillar_bottom - pillar_top)
def interpolate_pillar_at_z(
pillar_top: np.ndarray,
pillar_bottom: np.ndarray,
z: np.ndarray,
) -> np.ndarray:
"""Interpolate pillar xyz positions at target depths.
Parameters
----------
pillar_top : np.ndarray
Pillar top xyz coordinates.
pillar_bottom : np.ndarray
Pillar bottom xyz coordinates.
z : np.ndarray
Target depths for interpolation.
Returns
-------
np.ndarray
Interpolated xyz coordinates with ``z`` enforced exactly.
"""
z_top = pillar_top[..., 2:3] # Keep dimension for broadcasting
z_bot = pillar_bottom[..., 2:3]
# Compute interpolation parameter
dz = z_bot - z_top
# Handle vertical pillars (avoid evaluating invalid divisions)
numerator = z[..., np.newaxis] - z_top
t = np.full_like(numerator, 0.5, dtype=np.float64)
np.divide(numerator, dz, out=t, where=np.abs(dz) > 1e-10)
# Interpolate position
pos = pillar_top + t * (pillar_bottom - pillar_top)
# Override z with exact value
pos[..., 2] = z
return pos
# Top face corners (indices 0-3)
corners[:, :, :, 0, :] = interpolate_pillar_at_z(pt[:-1, :-1], pb[:-1, :-1], z_top_00)
corners[:, :, :, 1, :] = interpolate_pillar_at_z(pt[:-1, 1:], pb[:-1, 1:], z_top_10)
corners[:, :, :, 2, :] = interpolate_pillar_at_z(pt[1:, :-1], pb[1:, :-1], z_top_01)
corners[:, :, :, 3, :] = interpolate_pillar_at_z(pt[1:, 1:], pb[1:, 1:], z_top_11)
# Bottom face corners (indices 4-7)
corners[:, :, :, 4, :] = interpolate_pillar_at_z(pt[:-1, :-1], pb[:-1, :-1], z_bot_00)
corners[:, :, :, 5, :] = interpolate_pillar_at_z(pt[:-1, 1:], pb[:-1, 1:], z_bot_10)
corners[:, :, :, 6, :] = interpolate_pillar_at_z(pt[1:, :-1], pb[1:, :-1], z_bot_01)
corners[:, :, :, 7, :] = interpolate_pillar_at_z(pt[1:, 1:], pb[1:, 1:], z_bot_11)
return corners
def _cell_top_bottom_depths(self) -> tuple[np.ndarray, np.ndarray]:
"""Return top and bottom depth for each cell.
Returns
-------
tuple of ndarray
``(z_top, z_bottom)`` each shaped ``(nk, nj, ni)``.
"""
zcorn = self._compute_cell_corners()[..., 2]
z_top = np.min(zcorn[..., :4], axis=-1)
z_bottom = np.max(zcorn[..., 4:], axis=-1)
return z_top, z_bottom
def _cell_top_depth(self) -> np.ndarray:
"""Get top depth for each cell.
Returns
-------
np.ndarray
Top depths with shape ``(nk, nj, ni)``.
"""
z_top, _ = self._cell_top_bottom_depths()
return z_top
def _cell_bottom_depth(self) -> np.ndarray:
"""Get bottom depth for each cell.
Returns
-------
np.ndarray
Bottom depths with shape ``(nk, nj, ni)``.
"""
_, z_bottom = self._cell_top_bottom_depths()
return z_bottom
def _cell_thickness(self) -> np.ndarray:
"""Compute geometric thickness for each cell.
Returns
-------
np.ndarray
Absolute difference between bottom and top depth per cell.
"""
z_top, z_bottom = self._cell_top_bottom_depths()
return np.abs(z_bottom - z_top)
def _target_mask(
self,
zone: str | Zone | None = None,
include_inactive: bool = False,
) -> np.ndarray:
"""Boolean mask for selecting cells.
Parameters
----------
zone : str or Zone or None, optional
Restrict mask to a specific zone.
include_inactive : bool, default False
When False, inactive cells are excluded.
Returns
-------
ndarray
Boolean mask shaped ``(nk, nj, ni)``.
"""
mask = np.ones(self.shape, dtype=bool)
if not include_inactive:
mask &= np.asarray(self.active, dtype=bool)
if zone is not None:
mask &= self._zone_mask(zone)
return mask
def _resolve_source(
self,
source: str | "GridProperty",
) -> np.ndarray:
"""Resolve a source into a full-grid ndarray.
Parameters
----------
source : str or GridProperty
Built-in geometry attribute name or property reference.
Returns
-------
ndarray
Array of shape ``(nk, nj, ni)``.
Raises
------
ValueError
If a GridProperty belongs to a different grid.
TypeError
If ``source`` is not a string or GridProperty.
UnsupportedGridAttributeError
If a named attribute is not available.
"""
if isinstance(source, str):
if source in self._properties:
prop = self._properties[source]
return prop.values
else:
return self._get_attribute(source)
if isinstance(source, GridProperty):
if source.grid is not self:
raise ValueError(
"Source GridProperty must belong to the same grid."
)
return source.values
raise TypeError(
"`source` entries must be either str or GridProperty."
)
def _get_attribute(self, name: str) -> np.ndarray:
"""Resolve and compute a built-in grid attribute by name or alias.
Parameters
----------
name : str
Attribute name or alias such as ``"depth"`` or ``"z"``.
Returns
-------
np.ndarray
Computed attribute array shaped like the grid.
Raises
------
UnsupportedGridAttributeError
If the attribute name is not supported.
"""
try:
spec = GRID_ATTRIBUTE_BY_ALIAS[name]
except KeyError as e:
raise UnsupportedGridAttributeError(attribute_name=name, supported_names=GRID_ATTRIBUTE_BY_ALIAS.keys()) from e
return spec.resolver(self)
[docs]
def __repr__(self) -> str:
"""Return a compact debug representation of the grid instance.
Returns
-------
str
Representation including shape, active-cell count, and properties.
"""
nk, nj, ni = self.shape
return (
f"{self.__class__.__name__}(shape=({nk}, {nj}, {ni}), "
f"n_active={self.n_active}/{self.n_cells}, "
f"properties={list(self._properties.keys())})"
)
# # ----------------------------
# # Cell geometry
# # ----------------------------
# # TEST
# def zcorn_to_cell_corner_z8(zcorn_3d: np.ndarray) -> np.ndarray:
# """
# Convert ZCORN shaped (2*nk, 2*nj, 2*ni) to (nk, nj, ni, 8) with ordering:
# 0: TOP (i, j)
# 1: TOP (i+1, j)
# 2: TOP (i, j+1)
# 3: TOP (i+1, j+1)
# 4: BOTTOM (i, j)
# 5: BOTTOM (i+1, j)
# 6: BOTTOM (i, j+1)
# 7: BOTTOM (i+1, j+1)
# """
# if zcorn_3d.ndim != 3:
# raise ValueError(f"Expected 3D array (2*nk,2*nj,2*ni), got ndim={zcorn_3d.ndim}")
# nk2, nj2, ni2 = zcorn_3d.shape
# if (nk2 % 2) or (nj2 % 2) or (ni2 % 2):
# raise ValueError(f"All dimensions must be even, got {zcorn_3d.shape}")
# nk, nj, ni = nk2 // 2, nj2 // 2, ni2 // 2
# out = np.empty((nk, nj, ni, 8), dtype=zcorn_3d.dtype)
# z = zcorn_3d # alias
# # Top (k-top = 0::2), Bottom (k-bot = 1::2)
# kt = z[0::2]
# kb = z[1::2]
# # ---- TOP face ----
# out[..., 0] = kt[:, 0::2, 0::2] # (i, j)
# out[..., 1] = kt[:, 0::2, 1::2] # (i+1, j)
# out[..., 2] = kt[:, 1::2, 0::2] # (i, j+1)
# out[..., 3] = kt[:, 1::2, 1::2] # (i+1, j+1)
# # ---- BOTTOM face ----
# out[..., 4] = kb[:, 0::2, 0::2]
# out[..., 5] = kb[:, 0::2, 1::2]
# out[..., 6] = kb[:, 1::2, 0::2]
# out[..., 7] = kb[:, 1::2, 1::2]
# return out
# def get_cell_corners(self, k: int, j: int, i: int) -> np.ndarray:
# """Get 8 corner coordinates of cell (k, j, i).
# Returns corners in standard hexahedron ordering:
# - Corners 0-3: Bottom face (counter-clockwise from top view)
# - Corners 4-7: Top face (counter-clockwise from top view)
# Args:
# k, j, i: Cell indices
# Returns:
# np.ndarray: Shape (8, 3) corner coordinates
# """
# corners = np.zeros((8, 3))
# # Bottom face (dk=0)
# for idx, (di, dj) in enumerate([(0,0), (1,0), (1,1), (0,1)]):
# z = self.zcorn[k, j, i, di, dj, 0]
# corners[idx] = self.pillars.interpolate_at_z(j+dj, i+di, z)
# # Top face (dk=1)
# for idx, (di, dj) in enumerate([(0,0), (1,0), (1,1), (0,1)]):
# z = self.zcorn[k, j, i, di, dj, 1]
# corners[4+idx] = self.pillars.interpolate_at_z(j+dj, i+di, z)
# return corners
# @property
# def cell_centers(self) -> np.ndarray:
# """Cell centers (average of 8 corners).
# Returns:
# np.ndarray: Shape (nk, nj, ni, 3)
# """
# if self._cell_centers is None:
# centers = np.zeros((self.nk, self.nj, self.ni, 3))
# for k in range(self.nk):
# for j in range(self.nj):
# for i in range(self.ni):
# corners = self.get_cell_corners(k, j, i)
# centers[k, j, i] = np.mean(corners, axis=0)
# self._cell_centers = centers
# return self._cell_centers
# @property
# def cell_volumes(self) -> np.ndarray:
# """Cell volumes.
# Returns:
# np.ndarray: Shape (nk, nj, ni)
# """
# if self._cell_volumes is None:
# volumes = np.zeros((self.nk, self.nj, self.ni))
# for k in range(self.nk):
# for j in range(self.nj):
# for i in range(self.ni):
# corners = self.get_cell_corners(k, j, i)
# volumes[k, j, i] = self._hexahedron_volume(corners)
# self._cell_volumes = volumes
# return self._cell_volumes
# @staticmethod
# def _hexahedron_volume(corners: np.ndarray) -> float:
# """Compute hexahedron volume via tetrahedral decomposition."""
# center = np.mean(corners, axis=0)
# # 6 faces
# faces = [
# [0, 1, 2, 3], # Bottom
# [4, 5, 6, 7], # Top
# [0, 1, 5, 4], # Front
# [2, 3, 7, 6], # Back
# [0, 3, 7, 4], # Left
# [1, 2, 6, 5], # Right
# ]
# volume = 0.0
# for face in faces:
# # Split quad into 2 triangles
# for tri in [[0, 1, 2], [0, 2, 3]]:
# v0 = corners[face[tri[0]]]
# v1 = corners[face[tri[1]]]
# v2 = corners[face[tri[2]]]
# mat = np.column_stack([v1 - v0, v2 - v0, center - v0])
# volume += abs(np.linalg.det(mat)) / 6.0
# return volume
# # ----------------------------
# # Properties
# # ----------------------------
# def add_property(self, name: str, values: np.ndarray):
# """Add cell property.
# Args:
# name: Property name
# values: Property values, shape (nk, nj, ni)
# """
# if values.shape != self.shape:
# raise ValueError(f"Property shape {values.shape} != grid shape {self.shape}")
# self.properties[name] = values
# def get_property(self, name: str) -> Optional[np.ndarray]:
# """Get cell property by name."""
# return self.properties.get(name)
# # ----------------------------
# # Active cells
# # ----------------------------
# def get_active_indices(self) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
# """Get (k, j, i) indices of active cells."""
# return np.where(self.active)
# def get_active_centers(self) -> np.ndarray:
# """Get cell centers for active cells only.
# Returns:
# np.ndarray: Shape (n_active, 3)
# """
# return self.cell_centers[self.active]
# def get_active_volumes(self) -> np.ndarray:
# """Get volumes for active cells only.
# Returns:
# np.ndarray: Shape (n_active,)
# """
# return self.cell_volumes[self.active]
# def get_active_property(self, name: str) -> Optional[np.ndarray]:
# """Get property for active cells only."""
# prop = self.get_property(name)
# if prop is None:
# return None
# return prop[self.active]
# # ----------------------------
# # Constructors
# # ----------------------------
# @classmethod
# def from_layers(
# cls,
# pillars: PillarGrid,
# layer_z: np.ndarray, # Shape (nk+1, nj, ni) - z at layer interfaces
# active: Optional[np.ndarray] = None,
# properties: Optional[Dict[str, np.ndarray]] = None,
# ) -> 'CornerPointGrid':
# """Create grid from layer interface depths.
# Args:
# pillars: PillarGrid defining lateral geometry
# layer_z: Z-coordinates at layer interfaces, shape (nk+1, nj, ni)
# active: Boolean mask (default: all active)
# properties: Cell properties dict
# Returns:
# CornerPointGrid instance
# """
# ni, nj = pillars.cell_shape
# nk = layer_z.shape[0] - 1
# # Build zcorn from layer interfaces
# zcorn = np.zeros((nk, nj, ni, 2, 2, 2))
# for k in range(nk):
# for j in range(nj):
# for i in range(ni):
# # Bottom (dk=0) - all 4 corners same z
# zcorn[k, j, i, :, :, 0] = layer_z[k, j, i]
# # Top (dk=1) - all 4 corners same z
# zcorn[k, j, i, :, :, 1] = layer_z[k+1, j, i]
# # Keep active as None if not provided (memory efficient)
# if properties is None:
# properties = {}
# return cls(pillars, zcorn, active, properties)
# @classmethod
# def from_rectilinear(
# cls,
# x: np.ndarray, # Shape (ni+1,)
# y: np.ndarray, # Shape (nj+1,)
# z: np.ndarray, # Shape (nk+1,)
# active: Optional[np.ndarray] = None,
# properties: Optional[Dict[str, np.ndarray]] = None,
# ) -> 'CornerPointGrid':
# """Create rectilinear corner-point grid.
# Args:
# x: X-coordinates, shape (ni+1,)
# y: Y-coordinates, shape (nj+1,)
# z: Z-coordinates, shape (nk+1,)
# active: Boolean mask (default: all active)
# properties: Cell properties
# Returns:
# CornerPointGrid instance
# """
# # Create vertical pillars
# pillars = PillarGrid.from_rectilinear(x, y, z[0], z[-1])
# ni, nj = pillars.cell_shape
# nk = len(z) - 1
# # Uniform layer depths
# layer_z = np.zeros((nk + 1, nj, ni))
# for k in range(nk + 1):
# layer_z[k, :, :] = z[k]
# return cls.from_layers(pillars, layer_z, active, properties)