Source code for raster_tools.distance.cost_distance

import dask.array as da
import numba as nb
import numpy as np
import xarray as xr
from numba.types import float64, int8, int64

from raster_tools.distance._heap import (
    init_heap_data,
    pop,
    push,
    push_if_lower,
)
from raster_tools.dtypes import (
    F16,
    F32,
    F64,
    I8,
    I64,
    is_float,
    is_scalar,
    is_str,
)
from raster_tools.raster import Raster
from raster_tools.utils import make_raster_ds

JIT_KWARGS = {"nopython": True, "nogil": True}
ngjit = nb.jit(**JIT_KWARGS)


@ngjit
def _mult_accumulate(x):
    out = np.empty(len(x), dtype=I64)
    c = 1
    for i in range(len(x)):
        out[i] = c * x[i]
    return out


@nb.jit(
    [
        "int64[:](Tuple((int64, int64)))",
        "int64[:](Tuple((int64, int64, int64)))",
    ],
    **JIT_KWARGS,
)
def _get_strides(shape):
    # Get strides for given nd array shape. Assumes c style strides
    # NOTE: output must have a standardized type because windows has different
    # default int types from linux.
    values = np.empty(len(shape), dtype=I64)
    values[0] = 1
    values[1:] = shape[::-1][:-1]
    strides = _mult_accumulate(values)[::-1]
    return strides


@ngjit
def _ravel_indices(indices, shape):
    # Convert nd c-strided indices to flat indices
    strides = _get_strides(shape)
    flat_indices = [np.sum(strides * idx) for idx in indices]
    return np.array(flat_indices, dtype=I64)


# These are the moves used to explore the cost surface. +1 means to move 1 cell
# in the positive row/col direction. -1 means to move in the negative row/col
# direction. The moves have been ordered so that the index can be mapped to the
# ESRI backtrace value by adding 1.
#
# The move direction is the direction traversed in the array during the
# algorithm's exploration. The bactrace direction is the direction to move when
# traveling back to the nearest source.
#
# +-------+------------+-------------+---------------+
# | Index | ESRI Value |  Move Dir   | Backtrace Dir |
# +-------+------------+-------------+---------------+
# |     0 |          1 | Left        | Right         |
# |     1 |          2 | Upper-Left  | Lower-Right   |
# |     2 |          3 | Up          | Down          |
# |     3 |          4 | Upper-Right | Lower-Left    |
# |     4 |          5 | Right       | Left          |
# |     5 |          6 | Lower-Right | Upper-Left    |
# |     6 |          7 | Down        | Up            |
# |     7 |          8 | Lower-Left  | Upper-Right   |
# +-------+------------+-------------+---------------+
#
# ref: https://pro.arcgis.com/en/pro-app/latest/tool-reference/spatial-analyst/understanding-cost-distance-analysis.htm  # noqa: E501
_MOVES = np.array(
    [
        # 0
        [0, -1],
        # 1
        [-1, -1],
        # 2
        [-1, 0],
        # 3
        [-1, 1],
        # 4
        [0, 1],
        # 5
        [1, 1],
        # 6
        [1, 0],
        # 7
        [1, -1],
    ],
    dtype=I8,
)
TRACEBACK_NOT_REACHED = -2


@ngjit
def _cost_distance_analysis_core(
    flat_costs,
    flat_sources,
    flat_moves,
    move_lengths,
    shape,
    sources_null_value,
    # Elevation args
    flat_elev,
    elevation_null_value,
    use_elevation,
):
    size: int64 = flat_costs.size
    # The cumulative cost distance to every pixel from the sources
    flat_cost_distance = np.full(size, np.inf, dtype=F64)
    # traceback values:
    #    -1: source
    #    -2: not reached, barrier
    #    0-7: index into moves array
    flat_traceback = np.full(size, TRACEBACK_NOT_REACHED, dtype=I8)
    # allocation values:
    #    sources_null_value: not reached, barrier
    #    other: value from sources array
    flat_allocation = np.full(size, sources_null_value, dtype=I64)

    cost: float64
    new_cost: float64
    cumcost: float64
    new_cumcost: float64
    move_length: float64
    dz: float64
    ev: float64
    new_ev: float64
    index: int64
    index: int64
    new_index: int64
    src: int64
    index2d = np.zeros(2, dtype=I64)
    i: int64
    it: int64
    move: int8[:]
    is_at_edge: bool
    bad_move: bool
    left: bool
    right: bool
    top: bool
    bottom: bool
    costs_shape_m1 = np.zeros(2, dtype=I64)
    strides: int64[:] = _get_strides(shape)
    # 1.5 * size
    maxiter: int64 = size + (size // 2)

    # A heap for storing pixels and their accumulated cost as the algorithm
    # explores the cost landscape.
    heap_keys, heap_values, heap_xrefs, heap_state = init_heap_data(
        128, size - 1
    )

    costs_shape_m1[0] = shape[0] - 1
    costs_shape_m1[1] = shape[1] - 1

    for i in range(flat_sources.size):
        src = flat_sources[i]
        if src != sources_null_value:
            flat_traceback[i] = -1
            flat_allocation[i] = src
            heap_keys, heap_values, heap_xrefs, heap_state, _ = push(
                heap_keys, heap_values, heap_xrefs, heap_state, 0, i
            )

    # Main loop for Dijkstra's algorithm
    # The frontier heap contains the current known costs to pixels discovered
    # so far. When a pixel and its cumulative cost are popped, we have found
    # the minimum cost path to it and can store the cumulative cost in our
    # output array. We then add/update the cost to that pixel's neighborhood.
    # If a neighbor has already been popped before, we ignore it. The
    # cumulative cost is used as the priority in the heap. At the start, only
    # the sources are on the heap.
    #
    # ref: https://pro.arcgis.com/en/pro-app/latest/tool-reference/spatial-analyst/how-the-cost-distance-tools-work.htm  # noqa: E501
    # ref: https://en.wikipedia.org/wiki/Dijkstra%27s_algorithm
    for it in range(maxiter):
        if heap_state[0].count == 0:
            break

        # Get the cumulative cost at the current pixel of interest and get the
        # flat index of the current pixel and the source that led to it
        heap_keys, heap_values, heap_xrefs, heap_state, cumcost, index = pop(
            heap_keys, heap_values, heap_xrefs, heap_state
        )
        src = flat_allocation[index]
        # We have now found the minimum cost to this pixel so store it
        flat_cost_distance[index] = cumcost

        # Convert to 2d index for bounds checking
        index2d[0] = (index // strides[0]) % shape[0]
        index2d[1] = (index // strides[1]) % shape[1]
        # Compare against the bounds to see if we are at any edges
        top = index2d[0] == 0
        bottom = index2d[0] == costs_shape_m1[0]
        left = index2d[1] == 0
        right = index2d[1] == costs_shape_m1[1]
        is_at_edge = top or bottom or left or right

        # Look at neighborhood
        for i in range(8):
            if is_at_edge:
                move = _MOVES[i]
                bad_move = (
                    (top and move[0] < 0)
                    or (bottom and move[0] > 0)
                    or (left and move[1] < 0)
                    or (right and move[1] > 0)
                )
                if bad_move:
                    continue
            new_index = index + flat_moves[i]
            move_length = move_lengths[i]
            # If a value is already stored in this pixel, we have found an
            # optimal path to it previously and can skip it.
            if flat_cost_distance[new_index] != np.inf:
                continue
            cost = flat_costs[index]
            new_cost = flat_costs[new_index]
            # If the cost at this point is a barrier, skip
            # TODO: may be able consolidate into a single sentinel check if
            #       input data is standardized
            if new_cost < 0 or new_cost == np.inf or np.isnan(new_cost):
                continue

            if use_elevation:
                ev = flat_elev[index]
                new_ev = flat_elev[new_index]
                if new_ev == elevation_null_value:
                    continue
                dz = new_ev - ev
                move_length = np.sqrt(move_length + (dz * dz))

            new_cumcost = cumcost + (move_length * 0.5 * (cost + new_cost))
            if new_cumcost != np.inf:
                (
                    heap_keys,
                    heap_values,
                    heap_xrefs,
                    heap_state,
                    flag,
                ) = push_if_lower(
                    heap_keys,
                    heap_values,
                    heap_xrefs,
                    heap_state,
                    new_cumcost,
                    new_index,
                )
                if flag > 0:
                    flat_traceback[new_index] = i
                    flat_allocation[new_index] = src
    return flat_cost_distance, flat_traceback, flat_allocation


[docs]def cost_distance_analysis_numpy( costs, sources, sources_null_value, elevation=None, elevation_null_value=0, scaling=1.0, ): """ Calculate accumulated cost distance, traceback, and allocation. This function uses Dijkstra's algorithm to compute the many-sources shortest-paths solution for a given cost surface. Valid moves are from a given pixel to its 8 nearest neighbors. This produces 3 2D arrays. The first is the accumulated cost distance, which contains the distance-weighted accumulated minimum cost to each pixel. The cost to move from one pixel to the next is ``length * mean(costs[i], costs[i+1])``, where ``length`` is 1 for horizontal and vertical moves and ``sqrt(2)`` for diagonal moves. The provided scaling factor informs the actual distance scaling to use. Source locations have a cost of 0. If `elevation` is provided, the length calculation incorporates the elevation data to make the algorithm 3D aware. The second array contains the traceback values for the solution. At each pixel, the stored value indicates the neighbor to move to in order to get closer to the cost-relative nearest source. The numbering is as follows: 5 6 7 4 X 0 3 2 1 Here, X indicates the current pixel and the numbers are the neighbor pixel positions. 0 indicates the neighbor immediately to the right and 6 indicates the neighbor immediately above. In terms of rows and columns, these are the neighbor one column over and one row above, respectively. a value of -1 indicates that the current pixel is a source pixel and -2 indicates that the pixel was not traversed (no data or a barrier). The third array contians the source allocation for each pixel. Each pixel is labeled based on the source location that is closest in terms of cost distance. The label is the value stored in the sources array at the corresponding source location. Parameters ---------- costs : 2D ndarray A 2D array representing a cost surface. sources : 2D int64 ndarray An array of sources. The values at each valid location, as determined using `sources_null_value`, are used for the allocation output. sources_null_value: int The value in `sources` that indicates a null value. elevation : 2D ndarray, optional An array of elevation values. Same shape as `costs`. If provided, the elevation is used when determining the move length between pixels. elevation_null_value : scalar, optional The null value for `elevation` data. Default is 0. scaling : scalar or 1D sequence, optional The scaling to use in each direction. For a grid with 30m scale, this would be 30. Default is 1. Returns ------- cost_distance : 2D ndarray The accumulated cost distance solution. This is the same shape as the `costs` input array. traceback : 2D ndarray The traceback result. This is the same shape as the `costs` input array. allocation : 2D ndarray The allocation result. This is the same shape as the `costs` input array. """ costs = np.asarray(costs) if costs.dtype == F16: # Promote because our fused type doesn't define a float16 type costs = costs.astype(F32) sources = np.asarray(sources).astype(I64) shape = costs.shape if sources.shape != shape: raise ValueError("Costs and sources array shapes must match") if elevation is None: elevation = np.array([]) elevation_null_value = 0 use_elevation = False else: elevation = np.asarray(elevation) if elevation.shape != shape: raise ValueError( "Elevation must have the same shape as costs array" ) if elevation.dtype == F16: # Promote because our fused type doesn't define a float16 type elevation = elevation.astype(F32) use_elevation = True if is_scalar(scaling): scaling = np.array([scaling for _ in shape], dtype=F64) elif isinstance(scaling, (np.ndarray, list, tuple)): scaling = np.asarray(scaling).astype(F64) if scaling.size != len(shape) or len(scaling.shape) != 1: raise ValueError(f"Invalid scaling shape: {scaling.shape}") if any(scaling <= 0): raise ValueError("Scaling values must be greater than 0") flat_costs = costs.ravel() flat_elev = elevation.ravel() flat_sources = sources.ravel() flat_moves = _ravel_indices(_MOVES, shape) # Compute squared move lengths move_lengths = np.sum((scaling * _MOVES) ** 2, axis=1).astype(F64) if not use_elevation: # No elevation data provided so convert to actual euclidean lengths for i in range(move_lengths.size): move_lengths[i] = np.sqrt(move_lengths[i]) ( flat_cost_distance, flat_traceback, flat_allocation, ) = _cost_distance_analysis_core( flat_costs, flat_sources, flat_moves, move_lengths, shape, sources_null_value, # Optional elevation args flat_elev, elevation_null_value, use_elevation, ) cost_distance = flat_cost_distance.reshape(shape) traceback = flat_traceback.reshape(shape) allocation = flat_allocation.reshape(shape) return cost_distance, traceback, allocation
def _normalize_raster_data(rs, missing=-1): data = rs.data # Make sure that null values are skipped nv = rs.null_value if rs._masked: if is_float(rs.dtype): if not np.isnan(nv): data = da.where(rs.mask, missing, data) elif not np.isnan(missing): data = da.where(np.isnan(data), missing, data) else: data = da.where(rs.mask, missing, data) # Trim off band dim data = data[0] return data # NOTE: this must match the TRACEBACK_NOT_REACHED value in _core.pyx. I can't # get that variable to import so I'm using this mirror for now. _TRACEBACK_NOT_REACHED = -2
[docs]def cost_distance_analysis(costs, sources, elevation=None): """Calculate accumulated cost distance, traceback, and allocation. This function uses Dijkstra's algorithm to compute the many-sources shortest-paths solution for a given cost surface. Valid moves are from a given pixel to its 8 nearest neighbors. This produces 3 rasters. The first is the accumulated cost distance, which contains the distance-weighted accumulated minimum cost to each pixel. The cost to move from one pixel to the next is ``length * mean(costs[i], costs[i+1])``, where ``length`` is 1 for horizontal and vertical moves and ``sqrt(2)`` for diagonal moves. The `costs` raster's resolution informs the actual scaling to use. Source locations have a cost of 0. If `elevation` provided, the length calculation incorporates the elevation data to make the algorithm 3D aware. The second raster contains the traceback values for the solution. At each pixel, the stored value indicates the neighbor to move to in order to get closer to the cost-relative nearest source. The numbering is as follows: :: 6 7 8 5 X 1 4 3 2 Here, X indicates the current pixel and the numbers are the neighbor pixel positions. 1 indicates the neighbor immediately to the right and 7 indicates the neighbor immediately above. In terms of rows and columns, these are the neighbor +1 column over and +1 row above, respectively. a value of 0 indicates that the current pixel is a source pixel and -1 indicates that the pixel was not traversed due to a null value. The third raster contains the source allocation for each pixel. Each pixel is labeled based on the source location that is closest in terms of cost distance. The label is the value stored in the sources raster at the corresponding source location. Parameters ---------- costs : Raster or raster path A raster representing a cost surface. Values less than 0 are treated as null values. sources : Raster or raster path, or sequence A raster or sequence of indices. If a raster, pixels that are not null are used as source locations. The raster must have a null value set. The values at valid locations are used as the corresponding values in the allocation output. If a sequence, must have shape (M, 2) where M is the number of source pixels. Each item represents an index into `costs` to be used as a source. The element number, starting at 0, is used as the corresponding allocation value. elevation : Raster or raster path, optional A raster containing elevation values on the same grid as `costs`. If provided, the elevation values are used when calculating the travel distance between pixels. This makes the algorithm 3D aware. Returns ------- cost_distance : Raster The accumulated cost distance solution. This is the same shape as the `costs` input Raster and has the same null value. traceback : Raster The traceback result. This is the same shape as the `costs` input Raster. allocation : Raster The allocation result. This is the same shape as the `costs` input Raster. References ---------- * `ESRI: How cost distance tools work <https://pro.arcgis.com/en/pro-app/latest/tool-reference/spatial-analyst/how-the-cost-distance-tools-work.htm>`_ * `ESRI: How path distance tools work <https://pro.arcgis.com/en/pro-app/latest/tool-reference/spatial-analyst/how-the-path-distance-tools-work.htm>`_ """ # noqa: E501 if not isinstance(costs, Raster): costs = Raster(costs) if costs.shape[0] != 1: raise ValueError("Costs raster cannot be multibanded") if elevation is not None: if not isinstance(elevation, Raster): elevation = Raster(elevation) if elevation.shape[0] != 1: raise ValueError("Elevation raster cannot be multibanded") if costs.shape != elevation.shape: raise ValueError( "Costs and elevation rasters must have the same shape" ) src_idxs = None if isinstance(sources, Raster) or is_str(sources): if is_str(sources): sources = Raster(sources) if sources.shape != costs.shape: raise ValueError("Cost and sources raster shapes must match") if sources.dtype.kind not in ("u", "i"): raise TypeError("Sources raster must be an integer type") if not sources._masked: raise ValueError("Sources raster must have a null value set") sources_null_value = sources.null_value srcs = sources.data[0].astype(I64) else: try: sources = np.asarray(sources).astype(int) if len(sources.shape) != 2 or sources.shape[1] != 2: raise ValueError("Sources must be an (M, 2) shaped array") if np.unique(sources, axis=0).shape != sources.shape: raise ValueError("Sources must not contain duplicates") sources_null_value = -1 src_idxs = sources srcs = np.full(costs.shape[1:], sources_null_value, dtype=I64) srcs[src_idxs[:, 0], src_idxs[:, 1]] = np.arange( len(sources), dtype=I64 ) except TypeError: raise ValueError("Could not understand sources argument") data = _normalize_raster_data(costs) if elevation is not None: elevation_null_value = -9999 edata = _normalize_raster_data(elevation, elevation_null_value) else: edata = None elevation_null_value = 0 scaling = np.abs(costs.resolution) results = cost_distance_analysis_numpy( data, srcs, sources_null_value, elevation=edata, elevation_null_value=elevation_null_value, scaling=scaling, ) # Make lazy and add band dim cd, tr, al = [da.from_array(r[None]) for r in results] # Convert to DataArrays using same coordinate system as costs xcosts = costs.xdata xcd, xtr, xal = [ xr.DataArray( r, coords=xcosts.coords, dims=xcosts.dims, attrs=xcosts.attrs ) for r in (cd, tr, al) ] xcd = xcd.where(np.isfinite(xcd), costs.null_value) # Add 1 to match ESRI 0-8 scale xtr += 1 cd_ds = make_raster_ds( xcd.rio.write_nodata(costs.null_value), costs.xmask.copy() ) tr_ds = make_raster_ds( xtr.rio.write_nodata(_TRACEBACK_NOT_REACHED + 1), costs.xmask.copy() ) al_ds = make_raster_ds( xal.rio.write_nodata(sources_null_value), costs.xmask.copy() ) if costs.crs is not None: cd_ds = cd_ds.rio.write_crs(costs.crs) tr_ds = tr_ds.rio.write_crs(costs.crs) al_ds = al_ds.rio.write_crs(costs.crs) cd = Raster(cd_ds, _fast_path=True) tr = Raster(tr_ds, _fast_path=True) al = Raster(al_ds, _fast_path=True) return cd, tr, al
[docs]def cda_cost_distance(costs, sources, elevation=None): """Calculate just the cost distance for a cost surface See cost_distance_analysis for a full description. Parameters ---------- costs : Raster or raster path A raster representing a cost surface. Values less than 0 are treated as null values. sources : Raster or raster path, or sequence A raster or sequence of indices. If a raster, pixels that are not null are used as source locations. The raster must have a null value set. The values at valid locations are used as the corresponding values in the allocation output. If a sequence, must have shape (M, 2) where M is the number of source pixels. Each item represents an index into `costs` to be used as a source. The element number, starting at 0, is used as the corresponding allocation value. elevation : Raster or raster path, optional A raster containing elevation values on the same grid as `costs`. If provided, the elevation values are used when calculating the travel distance between pixels. This makes the algorithm 3D aware. Returns ------- cost_distance : Raster The accumulated cost distance solution. This is the same shape as the `costs` input Raster. See Also -------- cost_distance_analysis : Full cost distance solution """ cost_dist, _, _ = cost_distance_analysis(costs, sources, elevation) return cost_dist
[docs]def cda_traceback(costs, sources, elevation=None): """Calculate just the cost distance traceback for a cost surface. See cost_distance_analysis for a full description. Parameters ---------- costs : Raster or raster path A raster representing a cost surface. Values less than 0 are treated as null values. sources : Raster or raster path, or sequence A raster or sequence of indices. If a raster, pixels that are not null are used as source locations. The raster must have a null value set. The values at valid locations are used as the corresponding values in the allocation output. If a sequence, must have shape (M, 2) where M is the number of source pixels. Each item represents an index into `costs` to be used as a source. The element number, starting at 0, is used as the corresponding allocation value. elevation : Raster or raster path, optional A raster containing elevation values on the same grid as `costs`. If provided, the elevation values are used when calculating the travel distance between pixels. This makes the algorithm 3D aware. Returns ------- traceback : Raster The traceback result. This is the same shape as the `costs` input Raster. See Also -------- cost_distance_analysis : Full cost distance solution """ _, trb, _ = cost_distance_analysis(costs, sources, elevation) return trb
[docs]def cda_allocation(costs, sources, elevation=None): """Calculate just the cost distance allocation for a cost surface. See cost_distance_analysis for a full description. Parameters ---------- costs : Raster or raster path A raster representing a cost surface. Values less than 0 are treated as null values. sources : Raster or raster path, or sequence A raster or sequence of indices. If a raster, pixels that are not null are used as source locations. The raster must have a null value set. The values at valid locations are used as the corresponding values in the allocation output. If a sequence, must have shape (M, 2) where M is the number of source pixels. Each item represents an index into `costs` to be used as a source. The element number, starting at 0, is used as the corresponding allocation value. elevation : Raster or raster path, optional A raster containing elevation values on the same grid as `costs`. If provided, the elevation values are used when calculating the travel distance between pixels. This makes the algorithm 3D aware. Returns ------- allocation : Raster The allocation result. This is the same shape as the `costs` input Raster. See Also -------- cost_distance_analysis : Full cost distance solution """ _, _, alloc = cost_distance_analysis(costs, sources, elevation) return alloc