Source code for petres.grids.cornerpoint

from __future__ import annotations

from collections.abc import Callable, Sequence
from dataclasses import dataclass, field
from typing import Any, Literal
from pathlib import Path
import numpy as np
import warnings


from .._validation import _validate_finite_float, _validate_z_scale
from ..models.wells import VerticalWell, _validate_well_sequence
from ..grids.sampling._validation import _validate_vertex_array
from ..grids.sampling._vertices import _resolve_xyz_vertices
from ..grids.properties import GridProperties, GridProperty
from .builders.cornerpoint import _build_zcorn_from_zones
from ..errors.property import MissingEclipseKeywordError
from ..eclipse.grids.metadata import EclipseGridMetadata
from ..errors.grid import UnsupportedGridAttributeError
from ..config.colors import DEFAULT_CMAP, DEFAULT_COLOR
from ..eclipse.grids.write import GRDECLWriter
from ..models.boundary import BoundaryPolygon
from ..eclipse.grids.read import GRDECLReader
from .pillars import PillarGrid
from ..models.zone import Zone

@dataclass(frozen=True)
class GridAttributeSpec:
    """Specification for a computed grid attribute.

    Parameters
    ----------
    name : str
        Canonical attribute name exposed to users.
    description : str
        Human-readable description of the attribute semantics.
    resolver : Callable[[CornerPointGrid], np.ndarray]
        Callable that computes the attribute array from a grid instance.
    aliases : tuple[str, ...], default ()
        Additional accepted names that resolve to the same attribute.

    Notes
    -----
    Instances of this class are immutable and used to build lookup tables for
    attribute-name and alias resolution.
    """

    name: str
    description: str
    resolver: Callable[[CornerPointGrid], np.ndarray]
    aliases: tuple[str, ...] = ()


GRID_ATTRIBUTES = (
    GridAttributeSpec("x", "cell-center x coordinate", lambda g: g.cell_centers[..., 0]),
    GridAttributeSpec("y", "cell-center y coordinate", lambda g: g.cell_centers[..., 1]),
    GridAttributeSpec(
        "depth",
        "cell-center depth (positive downward)",
        lambda g: g.cell_centers[..., 2],
        aliases=("z",),
    ),
    GridAttributeSpec("top", "top depth of cell", lambda g: g._cell_top_depth()),
    GridAttributeSpec("bottom", "bottom depth of cell", lambda g: g._cell_bottom_depth()),
    GridAttributeSpec("thickness", "cell thickness", lambda g: g._cell_thickness()),
    GridAttributeSpec("active", "1 for active cells, 0 for inactive", lambda g: g.active.astype(int)),
)

GRID_ATTRIBUTE_BY_NAME = {spec.name: spec for spec in GRID_ATTRIBUTES}

GRID_ATTRIBUTE_BY_ALIAS = {
    alias: spec
    for spec in GRID_ATTRIBUTES
    for alias in (spec.name, *spec.aliases)
}

RESERVED_GRID_PROPERTY_NAMES = frozenset(GRID_ATTRIBUTE_BY_ALIAS)


[docs] @dataclass class CornerPointGrid: """3D corner-point grid with explicit corner coordinates. Parameters ---------- pillars : PillarGrid Lateral pillar geometry defining i–j topology. zcorn : ndarray Corner depths shaped ``(2*nk, 2*nj, 2*ni)`` in Eclipse order. active : ndarray or None, optional Boolean mask ``(nk, nj, ni)``; ``None`` marks all cells active. zone_index : ndarray or None, optional Zone id per cell, same shape as the grid. ``0`` denotes gap. zone_names : dict[int, str], optional Mapping of zone id to zone name. Notes ----- Properties are managed via :class:`GridProperties` and are not stored in ``zcorn``. Use :meth:`from_zones` to build grids from stratigraphic zones. """ pillars: PillarGrid zcorn: np.ndarray # Shape (2*nk, 2*nj, 2*ni) active: np.ndarray | None = None # Shape (nk, nj, ni), boolean, or None for all active zone_index: np.ndarray | None = None # shape (nk, nj, ni), int zone_names: dict[int, str] = field(default_factory=dict) _properties: dict[str, GridProperty] = field(default_factory=dict, repr=False) _zone_name_to_id: dict[str, int] = field(default_factory=dict, init=False, repr=False) _eclipse_metadata: EclipseGridMetadata | None = field(default=None, repr=False)
[docs] def __post_init__(self) -> None: """Validate grid arrays and initialize zone lookup metadata. Raises ------ ValueError If ``zcorn``, ``active``, or attached property shapes do not match the grid dimensions, or when a property belongs to a different grid. TypeError If an attached entry in ``_properties`` is not a :class:`GridProperty` instance. Notes ----- When ``active`` is ``None``, an all-active broadcast view is created for memory efficiency. """ nj, ni = self.pillars.cell_shape nk = self.zcorn.shape[0]//2 expected_corner_shape = (2*nk, 2*nj, 2*ni) expected_cell_shape = (nk, nj, ni) expected_active_shape = expected_cell_shape if self.zcorn.shape != expected_corner_shape: raise ValueError( f"zcorn shape {self.zcorn.shape} != expected {expected_corner_shape}" ) # Initialize or validate active array if self.active is None: # Use broadcast for memory-efficient all-active representation self.active = np.broadcast_to(True, expected_active_shape) else: self.active = np.asarray(self.active, dtype=bool) if self.active.shape != expected_active_shape: raise ValueError( f"active shape {self.active.shape} != expected {expected_active_shape}" ) # Validate properties for name, prop in self._properties.items(): if not isinstance(prop, GridProperty): raise TypeError( f"Property '{name}' must be a GridProperty, got {type(prop)}." ) if prop.values.shape != expected_cell_shape: raise ValueError( f"Property '{name}' shape {prop.values.shape} != expected {expected_cell_shape}" ) if prop.grid is not self: raise ValueError( f"Property '{name}' belongs to a different grid instance." ) self.set_zones(zone_index=self.zone_index, zone_names=self.zone_names)
def _zone_mask(self, zone: str | Zone) -> np.ndarray: """Build a boolean mask for cells that belong to a selected zone. Parameters ---------- zone : str or Zone Zone name or zone object to select. Returns ------- np.ndarray Boolean mask with shape ``(nk, nj, ni)`` where ``True`` marks cells assigned to the requested zone. Raises ------ ValueError If no zone definition is available on the grid. """ if self.zone_index is None: raise ValueError("Grid has no zones defined.") zone_name = self._normalize_zone_name(zone) zone_id = self._get_zone_id_from_name(zone_name) return self.zone_index == zone_id
[docs] def summary(self) -> str: """Create a human-readable summary of grid dimensions and metadata. Returns ------- str Multi-line summary including shape, active/inactive counts, registered property names, and zone names. Examples -------- >>> text = grid.summary() >>> "Grid Summary" in text True """ nk, nj, ni = self.shape total_cells = self.n_cells active = self.n_active inactive = self.n_inactive active_pct = 100 * active / total_cells if total_cells else 0.0 inactive_pct = 100.0 - active_pct prop_names = list(self._properties.keys()) zone_names = list(self.zone_names.values()) if self.zone_names else [] def fmt(items: list[str], max_items: int = 4) -> str: if not items: return "Not Defined" if len(items) <= max_items: return ", ".join(items) return f"{', '.join(items[:max_items])}, … (+{len(items) - max_items} more)" lines = [ "Grid Summary", "════════════", f"Shape : {ni}×{nj}×{nk} (i×j×k)", f"Cells : {total_cells}", f"Active : {active} ({active_pct:.2f}%)", f"Inactive : {inactive} ({inactive_pct:.2f}%)", f"Properties : {fmt(prop_names)}", f"Zones : {fmt(zone_names)}", ] return "\n".join(lines)
@staticmethod def _normalize_zone_name(zone: str | Zone) -> str: """Normalize a zone selector into a zone-name string. Parameters ---------- zone : str or Zone Zone selector provided by caller. Returns ------- str Zone name to be used in internal lookups. Raises ------ TypeError If ``zone`` is neither a string nor a :class:`Zone`. """ if isinstance(zone, Zone): return zone.name if not isinstance(zone, str): raise TypeError("`zone` must be str, or `Zone` instance.") return zone def _get_zone_id_from_name(self, name: str) -> int: """Resolve a zone name to its integer zone identifier. Parameters ---------- name : str Name of the zone. Returns ------- int Integer identifier associated with the zone name. Raises ------ ValueError If the provided zone name is not present in the grid lookup table. """ if name not in self._zone_name_to_id: raise ValueError(f"Zone '{name}' not found in grid zones.") return self._zone_name_to_id[name]
[docs] def set_zones( self, zone_index: np.ndarray | None, zone_names: dict[int, str] | None, ) -> None: """Assign zone membership arrays and zone-name metadata. Parameters ---------- zone_index : np.ndarray Integer array of shape ``(nk, nj, ni)`` assigning each cell to a zone. Values use this convention: - ``0`` means gap, undefined, or unassigned. - ``> 0`` means a valid zone id. zone_names : dict[int, str], optional Mapping from zone id to zone name. If not provided, names will be auto-generated as "Zone {id}". Raises ------ ValueError If shapes mismatch, invalid ids are present, or names are inconsistent. TypeError If ``zone_index`` cannot be converted to an integer ndarray. """ # ---------------------------- # 1. Validate zone_index # ---------------------------- if zone_index is None: if not zone_names: zone_names = {} else: raise ValueError("`zone_names` provided without `zone_index`.") else: try: zone_index = np.asarray(zone_index, dtype=np.int32) except Exception as e: raise TypeError(f"Failed to convert `zone_index` to numpy array: {e}") from e if zone_index.shape != self.shape: raise ValueError( f"`zone_index` shape {zone_index.shape} != expected {self.shape}" ) if np.any(zone_index < 0): raise ValueError("`zone_index` cannot contain negative values.") # Extract used zone IDs (exclude 0 = gap) unique_ids = np.unique(zone_index) unique_ids = unique_ids[unique_ids != 0] # ---------------------------- # 2. Handle zone_names # ---------------------------- if not zone_names: zone_names = {int(i): f"Zone {int(i)}" for i in unique_ids} else: # Ensure dict[int, str] if not isinstance(zone_names, dict): raise ValueError("`zone_names` must be a dict[int, str].") if 0 in zone_names: raise ValueError("Zone ID 0 is reserved for gaps and cannot be named.") # Convert keys to int zone_names = {int(k): str(v) for k, v in zone_names.items()} # ---- 2a. Check duplicate names ---- names = list(zone_names.values()) if len(names) != len(set(names)): duplicates = {n for n in names if names.count(n) > 1} raise ValueError(f"Duplicate zone names detected: {duplicates}") # ---- 2b. Check all used IDs have names except 0 (gap) ---- missing = set(unique_ids) - set(zone_names.keys()) missing.discard(0) # Allow 0 (gap) to be unnamed if missing: raise ValueError( f"`zone_names` missing definitions for zone ids: {sorted(missing)}" ) # ---- 2c. Optional: warn about unused names ---- unused = set(zone_names.keys()) - set(unique_ids) unused.discard(0) # Allow 0 (gap) to be unused if unused: warnings.warn( f"Unused zone ids in `zone_names`: {sorted(unused)}", UserWarning, ) # ---------------------------- # 3. Assign # ---------------------------- self.zone_index = zone_index self.zone_names = zone_names self._zone_name_to_id = {v: k for k, v in zone_names.items()}
# ---------------------------- # Dimensions # ---------------------------- @property def properties(self) -> GridProperties: """Return the property manager bound to this grid. Returns ------- GridProperties Dictionary-like facade used to access and validate cell properties. """ return GridProperties(self) @property def ni(self) -> int: """Get the number of cells in the i direction. Returns ------- int Cell count along the i axis. """ return self.pillars.ni @property def nj(self) -> int: """Get the number of cells in the j direction. Returns ------- int Cell count along the j axis. """ return self.pillars.nj @property def nk(self) -> int: """Get the number of layers in the k direction. Returns ------- int Layer count inferred from ``zcorn``. """ return self.zcorn.shape[0] // 2 @property def shape(self) -> tuple[int, int, int]: """Get grid dimensions ordered as ``(nk, nj, ni)``. Returns ------- tuple[int, int, int] Layer, row, and column cell counts. """ return (self.nk, self.nj, self.ni) @property def n_cells(self) -> int: """Get the total number of cells in the grid. Returns ------- int Product of ``nk * nj * ni``. """ return self.nk * self.nj * self.ni @property def n_active(self) -> int: """Get the number of active cells. Returns ------- int Count of ``True`` values in ``active``. """ return int(np.nansum(self.active)) @property def n_inactive(self) -> int: """Get the number of inactive cells. Returns ------- int Difference between total and active cell counts. """ return self.n_cells - self.n_active @property def cell_centers(self) -> np.ndarray: """Cell centers computed as the mean of 8 corners. Returns ------- ndarray Coordinates shaped ``(nk, nj, ni, 3)``. """ corners = self._compute_cell_corners() # Shape (nk, nj, ni, 8, 3) cell_centers = np.mean(corners, axis=3) # Average over 8 corners return cell_centers @property def cell_volumes(self) -> np.ndarray: """Cell volumes computed via hexahedral decomposition. Returns ------- ndarray Volumes shaped ``(nk, nj, ni)``. """ corners = self._compute_cell_corners() # Shape (nk, nj, ni, 8, 3) # Decompose each hexahedron into 6 tetrahedra using diagonal 0-7 # Tetrahedron volume formula: |det([v1-v0, v2-v0, v3-v0])| / 6 # Corner ordering (from _compute_cell_corners): # 0-3: Top face (i,j), (i+1,j), (i,j+1), (i+1,j+1) # 4-7: Bottom face (i,j), (i+1,j), (i,j+1), (i+1,j+1) def tet_volume( v0: np.ndarray, v1: np.ndarray, v2: np.ndarray, v3: np.ndarray, ) -> np.ndarray: """Compute signed tetrahedron volumes for vectorized vertices. Parameters ---------- v0 : np.ndarray First tetrahedron vertex array. v1 : np.ndarray Second tetrahedron vertex array. v2 : np.ndarray Third tetrahedron vertex array. v3 : np.ndarray Fourth tetrahedron vertex array. Returns ------- np.ndarray Signed tetrahedron volumes with broadcast-compatible shape. """ # v1-v0, v2-v0, v3-v0 form the edge vectors # Volume = det([v1-v0, v2-v0, v3-v0]) / 6 a = v1 - v0 b = v2 - v0 c = v3 - v0 # Compute determinant using scalar triple product: a · (b × c) cross = np.cross(b, c) det = np.sum(a * cross, axis=-1) return det / 6.0 # Extract corners for each tetrahedron c0, c1, c2, c3 = corners[..., 0, :], corners[..., 1, :], corners[..., 2, :], corners[..., 3, :] c4, c5, c6, c7 = corners[..., 4, :], corners[..., 5, :], corners[..., 6, :], corners[..., 7, :] # Decompose into 6 tetrahedra using diagonal 0-7 vol = np.zeros(corners.shape[:3]) # Shape (nk, nj, ni) vol += tet_volume(c0, c1, c3, c7) vol += tet_volume(c0, c1, c5, c7) vol += tet_volume(c0, c4, c5, c7) vol += tet_volume(c0, c2, c3, c7) vol += tet_volume(c0, c2, c6, c7) vol += tet_volume(c0, c4, c6, c7) # Take absolute value to handle orientation volumes = np.abs(vol) return volumes
[docs] @classmethod def from_grdecl( cls, path: str | Path, *, use_actnum: bool = True, use_metadata: bool = True, properties: Sequence[str] | None = None, ) -> CornerPointGrid: """Load a grid from a GRDECL file. Parameters ---------- path : str or Path Path to GRDECL file containing COORD/ZCORN (and optional ACTNUM). use_actnum : bool, default True When True, read ACTNUM to set active cells; otherwise all active. use_metadata : bool, default True When True, read metadata keywords from the file. properties : Sequence[str] | None, default None Property names to import. If None, all available properties are imported. Returns ------- CornerPointGrid Grid populated from GRDECL arrays. """ if properties is not None: try: properties = tuple(properties) except Exception as e: raise TypeError(f"`properties` must be a sequence of property names.") from e data = GRDECLReader().read( path, use_actnum=use_actnum, use_metadata=use_metadata, properties=properties ) coord = data.coord zcorn = data.zcorn actnum = data.actnum metadata = data.metadata pillars = PillarGrid.from_eclipse_coord(coord) grid = cls(pillars=pillars, zcorn=zcorn, active=actnum, _eclipse_metadata=metadata) for kw, val in data.properties.items(): if kw in RESERVED_GRID_PROPERTY_NAMES: raise UnsupportedGridAttributeError( f"Cannot load property '{kw}' from GRDECL because it conflicts with a reserved grid attribute name." ) prop = grid.properties.create( name=kw, eclipse_keyword=kw ) prop.from_array(val) return grid
[docs] def to_grdecl( self, path: str | Path, *, properties: Sequence[str] | None = None, include_actnum: bool = True, ) -> None: """Write grid geometry and selected properties to a GRDECL file. Parameters ---------- path : str or Path Output file path. properties : Sequence[str] or None, optional Property names to export. When ``None``, all attached properties are written. include_actnum : bool, default True Whether to export ``ACTNUM`` from the grid active mask. Raises ------ TypeError If ``properties`` is not a valid sequence of names. MissingEclipseKeywordError If an included property has no Eclipse keyword configured. """ coord = self.pillars.to_eclipse_coord() zcorn = self.zcorn actnum = self.active.astype(int) if include_actnum else None writer = GRDECLWriter() if properties is None: properties = list(self._properties.keys()) else: try: properties = tuple(properties) except Exception as e: raise TypeError(f"`properties` must be a sequence of property names.") from e _properties = {} if properties: for prop_name in properties: prop = self.properties[prop_name] eclipse_keyword = prop.eclipse_keyword if eclipse_keyword is None: raise MissingEclipseKeywordError(property_name=prop_name) _properties[eclipse_keyword] = prop.values return writer.write( path=path, coord=coord, zcorn=zcorn, actnum=actnum, properties=_properties, metadata=self._eclipse_metadata, )
[docs] def show( self, show_inactive: bool = False, scalars: str | None = None, color: Any = DEFAULT_COLOR, cmap: str | None = DEFAULT_CMAP, title: str | None = None, z_scale: float = 1.0, wells: Sequence[VerticalWell] | VerticalWell | None = None, **kwargs: Any, ) -> None: """Render the grid in 3D PyVista viewer. Parameters ---------- show_inactive : bool, default False Whether to display inactive cells. scalars : str or None, optional Property name to color by; if ``None`` uses solid color. color : Any, default DEFAULT_COLOR Solid color when ``scalars`` is not provided. Defaults to the project default color in :mod:`petres.config.colors` (`DEFAULT_COLOR`). cmap : str or None, default DEFAULT_CMAP Colormap applied when ``scalars`` is provided. title : str or None, optional Figure title. z_scale : float, default 1.0 Scale factor for the z-axis. wells : VerticalWell or Sequence[VerticalWell] or None, optional Well(s) to plot on top of the grid. Can be a single VerticalWell or a sequence of them. If ``None``, no wells are plotted. **kwargs Forwarded to viewer ``add_grid``. """ from ..viewers.viewer3d.pyvista.theme import PyVista3DViewerTheme from ..viewers.viewer3d.pyvista.viewer import PyVista3DViewer theme = PyVista3DViewerTheme(lighting=False) z_scale = _validate_z_scale(z_scale, name="z_scale") viewer = PyVista3DViewer(theme=theme, z_scale=z_scale) color = None if scalars is not None else color viewer.add_grid( grid=self, show_inactive=show_inactive, color=color, scalars=scalars, cmap=cmap, **kwargs ) if wells is not None: viewer.add_wells(_validate_well_sequence(wells)) viewer.show(title=title)
[docs] @classmethod def from_rectilinear( cls, x: np.ndarray, y: np.ndarray, z: np.ndarray ) -> CornerPointGrid: """Create a corner-point grid from rectilinear coordinate vertices. Parameters ---------- x : np.ndarray Monotonic x-vertex coordinates with shape ``(ni + 1,)``. y : np.ndarray Monotonic y-vertex coordinates with shape ``(nj + 1,)``. z : np.ndarray Layer-interface depths with shape ``(nk + 1,)``. Returns ------- CornerPointGrid Grid with vertical pillars and broadcasted layer surfaces. """ z = _validate_vertex_array(z, "z") pillars = PillarGrid.from_rectilinear(x=x, y=y, top=z[0], base=z[-1]) # Create simple ZCORN with flat layers nj, ni = pillars.cell_shape nk = len(z) - 1 # Naive loop version (for clarity) # zcorn = np.zeros((2*nk, 2*nj, 2*ni)) # for k in range(nk): # zcorn[2*k:2*k+2, :, :] = z[k:k+2][:, np.newaxis, np.newaxis] # Fully vectorized version (optimized) z_planes = np.empty(2 * nk, dtype=z.dtype) z_planes[0::2] = z[:-1] z_planes[1::2] = z[1:] zcorn = np.broadcast_to( z_planes[:, None, None], (2 * nk, 2 * nj, 2 * ni), ).copy() return cls(pillars=pillars, zcorn=zcorn)
[docs] @classmethod def from_regular( cls, *, xlim: tuple[float, float] | None = None, ylim: tuple[float, float] | None = None, zlim: tuple[float, float] | None = None, ni: int | None = None, nj: int | None = None, nk: int | None = None, dx: float | None = None, dy: float | None = None, dz: float | None = None, ) -> CornerPointGrid: """Build a rectilinear corner-point grid from limits or spacings. Parameters ---------- xlim : tuple[float, float] or None, optional Minimum and maximum x bounds. ylim : tuple[float, float] or None, optional Minimum and maximum y bounds. zlim : tuple[float, float] or None, optional Minimum and maximum z bounds. ni : int or None, optional Number of cells in i direction. nj : int or None, optional Number of cells in j direction. nk : int or None, optional Number of cells in k direction. dx : float or None, optional Cell size in i direction. dy : float or None, optional Cell size in j direction. dz : float or None, optional Cell size in k direction. Returns ------- CornerPointGrid Corner-point grid resolved from the provided geometric constraints. """ x, y, z = _resolve_xyz_vertices( xlim=xlim, ylim=ylim, zlim=zlim, ni=ni, nj=nj, nk=nk, dx=dx, dy=dy, dz=dz ) return cls.from_rectilinear(x=x, y=y, z=z)
[docs] @classmethod def from_zones(cls, *, pillars: PillarGrid, zones: Sequence[Zone]) -> CornerPointGrid: """Create CornerPointGrid from zones with gap detection. Parameters ---------- pillars : PillarGrid Lateral pillar geometry zones : Sequence[Zone] Zones in stratigraphic order (top to bottom) Returns ------- CornerPointGrid Grid with gap-filling cells marked as inactive (ACTNUM=0) """ zcorn, actnum, zone_index, zone_names = _build_zcorn_from_zones(pillars=pillars, zones=zones) return cls(pillars=pillars, zcorn=zcorn, active=actnum, zone_index=zone_index, zone_names=zone_names)
[docs] def apply_boundary(self, boundary: BoundaryPolygon) -> CornerPointGrid: """Mask grid activity using a boundary polygon. Cells whose centers fall outside the polygon become inactive (ACTNUM=0). Parameters ---------- boundary : BoundaryPolygon XY boundary polygon used to clip the grid. Returns ------- CornerPointGrid Updated grid with activity masked by the boundary. Notes ----- Updates ``active`` in place; cached property masks may become stale. """ # Get cell centers (nk, nj, ni, 3) centers = self.cell_centers # Extract XY coordinates and reshape to (n_cells, 2) nk, nj, ni = self.shape xy = centers[..., :2].reshape(-1, 2) # Shape: (nk*nj*ni, 2) # Check which cells are inside the boundary inside_mask = boundary.contains(xy) # Shape: (nk*nj*ni,) # Reshape back to grid dimensions inside_mask = inside_mask.reshape(nk, nj, ni) # Update active array: cells outside boundary become inactive # Need to create a writable copy if active is a broadcast array if not self.active.flags.writeable: self.active = np.array(self.active, dtype=bool) self.active &= inside_mask return self
def _surface_cell_at_xy(self, x: float, y: float, surface: Literal["top", "bottom"]) -> tuple[int, int] | None: """Find the (i, j) column index whose surface quad contains point (x, y). Uses a cross-product sign test on all cells simultaneously — no loops. Parameters ---------- x, y : float Query point in grid XY coordinates. surface : Literal["top", "bottom"] The surface to check. Returns ------- tuple[int, int] or None Zero-based ``(i, j)`` indices of the containing column, or ``None`` if outside the grid. """ if surface == "top": pt = self.pillars.pillar_top # (nj+1, ni+1, 3) elif surface == "bottom": pt = self.pillars.pillar_bottom # (nj+1, ni+1, 3) else: raise ValueError("`surface` must be 'top' or 'bottom'.") # Each cell (j, i) is bounded by 4 pillars at corners: # (j, i ) = "00" (j, i+1) = "10" # (j+1, i+1) = "11" (j+1, i ) = "01" # # Slicing off-by-one gives us (nj, ni) arrays — one value per cell. x00 = pt[:-1, :-1, 0]; y00 = pt[:-1, :-1, 1] x10 = pt[:-1, 1:, 0]; y10 = pt[:-1, 1:, 1] x11 = pt[ 1:, 1:, 0]; y11 = pt[ 1:, 1:, 1] x01 = pt[ 1:, :-1, 0]; y01 = pt[ 1:, :-1, 1] # ---------------------------------------------------------------- # Cross product sign test (how it works) # ---------------------------------------------------------------- # For two points A and B defining a directed edge, the 2D cross product # of (B - A) and (P - A) tells you which *side* of that edge point P is on: # # cross = (B.x - A.x) * (P.y - A.y) - (B.y - A.y) * (P.x - A.x) # # cross > 0 → P is to the LEFT of edge A→B # cross < 0 → P is to the RIGHT of edge A→B # cross = 0 → P is exactly ON edge A→B # # If we walk around the quad in order 00→10→11→01→(back to 00), # a point INSIDE the quad will be on the SAME side of all 4 edges. # A point OUTSIDE will be on the wrong side of at least one edge. # # Because pillar grids can be wound either CW or CCW depending on # coordinate system, we accept both "all positive" and "all negative". # ---------------------------------------------------------------- def _cross(ax, ay, bx, by): # All inputs are (nj, ni) arrays; x and y are scalars. # Result is (nj, ni) — one signed area value per cell. return (bx - ax) * (y - ay) - (by - ay) * (x - ax) d0 = _cross(x00, y00, x10, y10) # edge 00 → 10 (bottom edge) d1 = _cross(x10, y10, x11, y11) # edge 10 → 11 (right edge) d2 = _cross(x11, y11, x01, y01) # edge 11 → 01 (top edge) d3 = _cross(x01, y01, x00, y00) # edge 01 → 00 (left edge) inside = ( ((d0 >= 0) & (d1 >= 0) & (d2 >= 0) & (d3 >= 0)) | # CCW winding ((d0 <= 0) & (d1 <= 0) & (d2 <= 0) & (d3 <= 0)) # CW winding ) # (nj, ni) bool hits = np.argwhere(inside) if hits.size == 0: return None return (int(hits[0, 1]), int(hits[0, 0])) # (i, j)
[docs] def well_indices(self, well: VerticalWell | tuple[float, float]) -> tuple[int, int] | None: """Locate the surface grid column indices intersected by a vertical well. Parameters ---------- well : VerticalWell or tuple[float, float] Well object providing ``x`` and ``y`` map coordinates, or a tuple of coordinates. Returns ------- tuple[int, int] or None Zero-based ``(i, j)`` indices of the top-surface column containing the well head location. Returns ``None`` when the well lies outside the grid footprint. Notes ----- This method performs a geometric XY containment lookup on the grid top surface only. It does not evaluate completion intervals or active-cell status. """ if isinstance(well, VerticalWell): x, y = well.x, well.y elif isinstance(well, tuple) and len(well) == 2: x, y = well try: x = _validate_finite_float(x, "well x-coordinate") y = _validate_finite_float(y, "well y-coordinate") except (TypeError, ValueError) as e: raise TypeError("`well` coordinates must be finite numbers.") from e else: raise TypeError("`well` must be an instance of VerticalWell or a tuple of coordinates (x, y).") return self._surface_cell_at_xy(x, y, surface="top")
def _compute_cell_corners(self) -> np.ndarray: """Compute 3D coordinates of all 8 corners for each cell. Returns ------- ndarray Corner coordinates shaped ``(nk, nj, ni, 8, 3)`` ordered as: 0-3 top face (k-layer top), 4-7 bottom face, each face ordered (i, j), (i+1, j), (i, j+1), (i+1, j+1). """ nk, nj, ni = self.shape corners = np.zeros((nk, nj, ni, 8, 3)) # Extract z-coordinates for all 8 corners # Top face (layer k top) z_top_00 = self.zcorn[0::2, 0::2, 0::2] # (nk, nj, ni) - pillar (j, i) z_top_10 = self.zcorn[0::2, 0::2, 1::2] # pillar (j, i+1) z_top_01 = self.zcorn[0::2, 1::2, 0::2] # pillar (j+1, i) z_top_11 = self.zcorn[0::2, 1::2, 1::2] # pillar (j+1, i+1) # Bottom face (layer k bottom) z_bot_00 = self.zcorn[1::2, 0::2, 0::2] # pillar (j, i) z_bot_10 = self.zcorn[1::2, 0::2, 1::2] # pillar (j, i+1) z_bot_01 = self.zcorn[1::2, 1::2, 0::2] # pillar (j+1, i) z_bot_11 = self.zcorn[1::2, 1::2, 1::2] # pillar (j+1, i+1) # Get pillar endpoints pt = self.pillars.pillar_top # Shape (nj+1, ni+1, 3) pb = self.pillars.pillar_bottom # Shape (nj+1, ni+1, 3) # Interpolate along pillars for each corner # For each pillar, compute t = (z - z_top) / (z_bottom - z_top) # Then position = pillar_top + t * (pillar_bottom - pillar_top) def interpolate_pillar_at_z( pillar_top: np.ndarray, pillar_bottom: np.ndarray, z: np.ndarray, ) -> np.ndarray: """Interpolate pillar xyz positions at target depths. Parameters ---------- pillar_top : np.ndarray Pillar top xyz coordinates. pillar_bottom : np.ndarray Pillar bottom xyz coordinates. z : np.ndarray Target depths for interpolation. Returns ------- np.ndarray Interpolated xyz coordinates with ``z`` enforced exactly. """ z_top = pillar_top[..., 2:3] # Keep dimension for broadcasting z_bot = pillar_bottom[..., 2:3] # Compute interpolation parameter dz = z_bot - z_top # Handle vertical pillars (avoid evaluating invalid divisions) numerator = z[..., np.newaxis] - z_top t = np.full_like(numerator, 0.5, dtype=np.float64) np.divide(numerator, dz, out=t, where=np.abs(dz) > 1e-10) # Interpolate position pos = pillar_top + t * (pillar_bottom - pillar_top) # Override z with exact value pos[..., 2] = z return pos # Top face corners (indices 0-3) corners[:, :, :, 0, :] = interpolate_pillar_at_z(pt[:-1, :-1], pb[:-1, :-1], z_top_00) corners[:, :, :, 1, :] = interpolate_pillar_at_z(pt[:-1, 1:], pb[:-1, 1:], z_top_10) corners[:, :, :, 2, :] = interpolate_pillar_at_z(pt[1:, :-1], pb[1:, :-1], z_top_01) corners[:, :, :, 3, :] = interpolate_pillar_at_z(pt[1:, 1:], pb[1:, 1:], z_top_11) # Bottom face corners (indices 4-7) corners[:, :, :, 4, :] = interpolate_pillar_at_z(pt[:-1, :-1], pb[:-1, :-1], z_bot_00) corners[:, :, :, 5, :] = interpolate_pillar_at_z(pt[:-1, 1:], pb[:-1, 1:], z_bot_10) corners[:, :, :, 6, :] = interpolate_pillar_at_z(pt[1:, :-1], pb[1:, :-1], z_bot_01) corners[:, :, :, 7, :] = interpolate_pillar_at_z(pt[1:, 1:], pb[1:, 1:], z_bot_11) return corners def _cell_top_bottom_depths(self) -> tuple[np.ndarray, np.ndarray]: """Return top and bottom depth for each cell. Returns ------- tuple of ndarray ``(z_top, z_bottom)`` each shaped ``(nk, nj, ni)``. """ zcorn = self._compute_cell_corners()[..., 2] z_top = np.min(zcorn[..., :4], axis=-1) z_bottom = np.max(zcorn[..., 4:], axis=-1) return z_top, z_bottom def _cell_top_depth(self) -> np.ndarray: """Get top depth for each cell. Returns ------- np.ndarray Top depths with shape ``(nk, nj, ni)``. """ z_top, _ = self._cell_top_bottom_depths() return z_top def _cell_bottom_depth(self) -> np.ndarray: """Get bottom depth for each cell. Returns ------- np.ndarray Bottom depths with shape ``(nk, nj, ni)``. """ _, z_bottom = self._cell_top_bottom_depths() return z_bottom def _cell_thickness(self) -> np.ndarray: """Compute geometric thickness for each cell. Returns ------- np.ndarray Absolute difference between bottom and top depth per cell. """ z_top, z_bottom = self._cell_top_bottom_depths() return np.abs(z_bottom - z_top) def _target_mask( self, zone: str | Zone | None = None, include_inactive: bool = False, ) -> np.ndarray: """Boolean mask for selecting cells. Parameters ---------- zone : str or Zone or None, optional Restrict mask to a specific zone. include_inactive : bool, default False When False, inactive cells are excluded. Returns ------- ndarray Boolean mask shaped ``(nk, nj, ni)``. """ mask = np.ones(self.shape, dtype=bool) if not include_inactive: mask &= np.asarray(self.active, dtype=bool) if zone is not None: mask &= self._zone_mask(zone) return mask def _resolve_source( self, source: str | "GridProperty", ) -> np.ndarray: """Resolve a source into a full-grid ndarray. Parameters ---------- source : str or GridProperty Built-in geometry attribute name or property reference. Returns ------- ndarray Array of shape ``(nk, nj, ni)``. Raises ------ ValueError If a GridProperty belongs to a different grid. TypeError If ``source`` is not a string or GridProperty. UnsupportedGridAttributeError If a named attribute is not available. """ if isinstance(source, str): if source in self._properties: prop = self._properties[source] return prop.values else: return self._get_attribute(source) if isinstance(source, GridProperty): if source.grid is not self: raise ValueError( "Source GridProperty must belong to the same grid." ) return source.values raise TypeError( "`source` entries must be either str or GridProperty." ) def _get_attribute(self, name: str) -> np.ndarray: """Resolve and compute a built-in grid attribute by name or alias. Parameters ---------- name : str Attribute name or alias such as ``"depth"`` or ``"z"``. Returns ------- np.ndarray Computed attribute array shaped like the grid. Raises ------ UnsupportedGridAttributeError If the attribute name is not supported. """ try: spec = GRID_ATTRIBUTE_BY_ALIAS[name] except KeyError as e: raise UnsupportedGridAttributeError(attribute_name=name, supported_names=GRID_ATTRIBUTE_BY_ALIAS.keys()) from e return spec.resolver(self)
[docs] def __repr__(self) -> str: """Return a compact debug representation of the grid instance. Returns ------- str Representation including shape, active-cell count, and properties. """ nk, nj, ni = self.shape return ( f"{self.__class__.__name__}(shape=({nk}, {nj}, {ni}), " f"n_active={self.n_active}/{self.n_cells}, " f"properties={list(self._properties.keys())})" )
# # ---------------------------- # # Cell geometry # # ---------------------------- # # TEST # def zcorn_to_cell_corner_z8(zcorn_3d: np.ndarray) -> np.ndarray: # """ # Convert ZCORN shaped (2*nk, 2*nj, 2*ni) to (nk, nj, ni, 8) with ordering: # 0: TOP (i, j) # 1: TOP (i+1, j) # 2: TOP (i, j+1) # 3: TOP (i+1, j+1) # 4: BOTTOM (i, j) # 5: BOTTOM (i+1, j) # 6: BOTTOM (i, j+1) # 7: BOTTOM (i+1, j+1) # """ # if zcorn_3d.ndim != 3: # raise ValueError(f"Expected 3D array (2*nk,2*nj,2*ni), got ndim={zcorn_3d.ndim}") # nk2, nj2, ni2 = zcorn_3d.shape # if (nk2 % 2) or (nj2 % 2) or (ni2 % 2): # raise ValueError(f"All dimensions must be even, got {zcorn_3d.shape}") # nk, nj, ni = nk2 // 2, nj2 // 2, ni2 // 2 # out = np.empty((nk, nj, ni, 8), dtype=zcorn_3d.dtype) # z = zcorn_3d # alias # # Top (k-top = 0::2), Bottom (k-bot = 1::2) # kt = z[0::2] # kb = z[1::2] # # ---- TOP face ---- # out[..., 0] = kt[:, 0::2, 0::2] # (i, j) # out[..., 1] = kt[:, 0::2, 1::2] # (i+1, j) # out[..., 2] = kt[:, 1::2, 0::2] # (i, j+1) # out[..., 3] = kt[:, 1::2, 1::2] # (i+1, j+1) # # ---- BOTTOM face ---- # out[..., 4] = kb[:, 0::2, 0::2] # out[..., 5] = kb[:, 0::2, 1::2] # out[..., 6] = kb[:, 1::2, 0::2] # out[..., 7] = kb[:, 1::2, 1::2] # return out # def get_cell_corners(self, k: int, j: int, i: int) -> np.ndarray: # """Get 8 corner coordinates of cell (k, j, i). # Returns corners in standard hexahedron ordering: # - Corners 0-3: Bottom face (counter-clockwise from top view) # - Corners 4-7: Top face (counter-clockwise from top view) # Args: # k, j, i: Cell indices # Returns: # np.ndarray: Shape (8, 3) corner coordinates # """ # corners = np.zeros((8, 3)) # # Bottom face (dk=0) # for idx, (di, dj) in enumerate([(0,0), (1,0), (1,1), (0,1)]): # z = self.zcorn[k, j, i, di, dj, 0] # corners[idx] = self.pillars.interpolate_at_z(j+dj, i+di, z) # # Top face (dk=1) # for idx, (di, dj) in enumerate([(0,0), (1,0), (1,1), (0,1)]): # z = self.zcorn[k, j, i, di, dj, 1] # corners[4+idx] = self.pillars.interpolate_at_z(j+dj, i+di, z) # return corners # @property # def cell_centers(self) -> np.ndarray: # """Cell centers (average of 8 corners). # Returns: # np.ndarray: Shape (nk, nj, ni, 3) # """ # if self._cell_centers is None: # centers = np.zeros((self.nk, self.nj, self.ni, 3)) # for k in range(self.nk): # for j in range(self.nj): # for i in range(self.ni): # corners = self.get_cell_corners(k, j, i) # centers[k, j, i] = np.mean(corners, axis=0) # self._cell_centers = centers # return self._cell_centers # @property # def cell_volumes(self) -> np.ndarray: # """Cell volumes. # Returns: # np.ndarray: Shape (nk, nj, ni) # """ # if self._cell_volumes is None: # volumes = np.zeros((self.nk, self.nj, self.ni)) # for k in range(self.nk): # for j in range(self.nj): # for i in range(self.ni): # corners = self.get_cell_corners(k, j, i) # volumes[k, j, i] = self._hexahedron_volume(corners) # self._cell_volumes = volumes # return self._cell_volumes # @staticmethod # def _hexahedron_volume(corners: np.ndarray) -> float: # """Compute hexahedron volume via tetrahedral decomposition.""" # center = np.mean(corners, axis=0) # # 6 faces # faces = [ # [0, 1, 2, 3], # Bottom # [4, 5, 6, 7], # Top # [0, 1, 5, 4], # Front # [2, 3, 7, 6], # Back # [0, 3, 7, 4], # Left # [1, 2, 6, 5], # Right # ] # volume = 0.0 # for face in faces: # # Split quad into 2 triangles # for tri in [[0, 1, 2], [0, 2, 3]]: # v0 = corners[face[tri[0]]] # v1 = corners[face[tri[1]]] # v2 = corners[face[tri[2]]] # mat = np.column_stack([v1 - v0, v2 - v0, center - v0]) # volume += abs(np.linalg.det(mat)) / 6.0 # return volume # # ---------------------------- # # Properties # # ---------------------------- # def add_property(self, name: str, values: np.ndarray): # """Add cell property. # Args: # name: Property name # values: Property values, shape (nk, nj, ni) # """ # if values.shape != self.shape: # raise ValueError(f"Property shape {values.shape} != grid shape {self.shape}") # self.properties[name] = values # def get_property(self, name: str) -> Optional[np.ndarray]: # """Get cell property by name.""" # return self.properties.get(name) # # ---------------------------- # # Active cells # # ---------------------------- # def get_active_indices(self) -> Tuple[np.ndarray, np.ndarray, np.ndarray]: # """Get (k, j, i) indices of active cells.""" # return np.where(self.active) # def get_active_centers(self) -> np.ndarray: # """Get cell centers for active cells only. # Returns: # np.ndarray: Shape (n_active, 3) # """ # return self.cell_centers[self.active] # def get_active_volumes(self) -> np.ndarray: # """Get volumes for active cells only. # Returns: # np.ndarray: Shape (n_active,) # """ # return self.cell_volumes[self.active] # def get_active_property(self, name: str) -> Optional[np.ndarray]: # """Get property for active cells only.""" # prop = self.get_property(name) # if prop is None: # return None # return prop[self.active] # # ---------------------------- # # Constructors # # ---------------------------- # @classmethod # def from_layers( # cls, # pillars: PillarGrid, # layer_z: np.ndarray, # Shape (nk+1, nj, ni) - z at layer interfaces # active: Optional[np.ndarray] = None, # properties: Optional[Dict[str, np.ndarray]] = None, # ) -> 'CornerPointGrid': # """Create grid from layer interface depths. # Args: # pillars: PillarGrid defining lateral geometry # layer_z: Z-coordinates at layer interfaces, shape (nk+1, nj, ni) # active: Boolean mask (default: all active) # properties: Cell properties dict # Returns: # CornerPointGrid instance # """ # ni, nj = pillars.cell_shape # nk = layer_z.shape[0] - 1 # # Build zcorn from layer interfaces # zcorn = np.zeros((nk, nj, ni, 2, 2, 2)) # for k in range(nk): # for j in range(nj): # for i in range(ni): # # Bottom (dk=0) - all 4 corners same z # zcorn[k, j, i, :, :, 0] = layer_z[k, j, i] # # Top (dk=1) - all 4 corners same z # zcorn[k, j, i, :, :, 1] = layer_z[k+1, j, i] # # Keep active as None if not provided (memory efficient) # if properties is None: # properties = {} # return cls(pillars, zcorn, active, properties) # @classmethod # def from_rectilinear( # cls, # x: np.ndarray, # Shape (ni+1,) # y: np.ndarray, # Shape (nj+1,) # z: np.ndarray, # Shape (nk+1,) # active: Optional[np.ndarray] = None, # properties: Optional[Dict[str, np.ndarray]] = None, # ) -> 'CornerPointGrid': # """Create rectilinear corner-point grid. # Args: # x: X-coordinates, shape (ni+1,) # y: Y-coordinates, shape (nj+1,) # z: Z-coordinates, shape (nk+1,) # active: Boolean mask (default: all active) # properties: Cell properties # Returns: # CornerPointGrid instance # """ # # Create vertical pillars # pillars = PillarGrid.from_rectilinear(x, y, z[0], z[-1]) # ni, nj = pillars.cell_shape # nk = len(z) - 1 # # Uniform layer depths # layer_z = np.zeros((nk + 1, nj, ni)) # for k in range(nk + 1): # layer_z[k, :, :] = z[k] # return cls.from_layers(pillars, layer_z, active, properties)