Source code for raster_tools.batch

import os
import re

from raster_tools.dtypes import is_int, is_scalar
from raster_tools.general import band_concat
from raster_tools.masking import get_default_null_value
from raster_tools.raster import Raster
from raster_tools.utils import validate_file


[docs]class BatchScriptParseError(BaseException): pass
def _split_strip(s, delimeter): return [si.strip() for si in s.split(delimeter)] def _batch_error(msg, line_no): raise BatchScriptParseError(f"Script Line {line_no}: {msg}") def _parse_user_number(str_val): from ast import literal_eval # This may raise a ValueError if the string is not a valid literal val = literal_eval(str_val) if not is_scalar(val): raise TypeError("Must be a scalar") return val FTYPE_TO_EXT = { "TIFF": "tif", } _ESRI_OP_TO_OP = { "esriRasterPlus": "+", "+": "+", "esriRasterMinus": "-", "-": "-", "esriRasterMultiply": "*", "*": "*", "esriRasterDivide": "/", "/": "/", "esriRasterMode": "%", "%": "%", "esriRasterPower": "**", "**": "**", } _FUNC_PATTERN = re.compile(r"^(?P<func>[A-Za-z]+)\((?P<args>[^\(\)]+)\)$") class _BatchScripParserState: def __init__(self, path): validate_file(path) self.path = os.path.abspath(path) self.location = os.path.dirname(self.path) self.rasters = {} self.final_raster = None def get_raster(self, name_or_path): if name_or_path in self.rasters: return self.rasters[name_or_path] else: # Handle relative paths. Assume they are relative to the batch file if not os.path.isabs(name_or_path): name_or_path = os.path.join(self.location, name_or_path) validate_file(name_or_path) return Raster(name_or_path) def _batch_parse_arithmetic(state, args_str, line_no): left_arg, right_arg, op = _split_strip(args_str, ";") op = _ESRI_OP_TO_OP[op] if op not in _ESRI_OP_TO_OP: _batch_error(f"Unknown arithmetic operation {repr(op)}", line_no) op = _ESRI_OP_TO_OP[op] try: left = float(left_arg) except ValueError: left = state.get_raster(left_arg) try: right = float(right_arg) except ValueError: right = state.get_raster(right_arg) return left._binary_arithmetic(right, op) def _batch_parse_extract_band(state, args_str, line_no): rs = state.get_raster(args_str.pop(0)) bands = [] for sb in args_str: try: b = _parse_user_number(sb) if not is_int(b): raise ValueError() except ValueError: _batch_error("Error parsing band value", line_no) except TypeError: _batch_error("Band values must be integers", line_no) bands.append(b) return rs.get_bands(bands) def _batch_parse_null_to_value(state, args_str, line_no): left, *right = _split_strip(args_str, ";") if len(right) > 1: _batch_error("NULLTOVALUE Error: Too many arguments", line_no) value = float(right[0]) return state.get_raster(left).replace_null(value) def _batch_parse_remap(state, args_str, line_no): raster, *args = _split_strip(args_str, ";") if len(args) > 1: _batch_error("REMAP Error: Too many argument dividers", line_no) args = args[0] remaps = [] for group in _split_strip(args, ","): try: values = [float(v) for v in _split_strip(group, ":")] except ValueError: _batch_error("REMAP Error: values must be numbers", line_no) if len(values) != 3: _batch_error( "REMAP Error: requires 3 values separated by ':'", line_no ) left, right, new = values if right <= left: _batch_error( "REMAP Error: the min value must be less than the max value", line_no, ) remaps.append((left, right, new)) if len(remaps) == 0: _batch_error("REMAP Error: No remap values found", line_no) args = [] for group in remaps: args.extend(group) return state.get_raster(raster).remap_range(*args) def _batch_parse_composite(state, args_str, line_no): on_line = f" on line {line_no}" rasters = [state.get_raster(path) for path in _split_strip(args_str, ";")] if len(rasters) < 2: _batch_error( "COMPOSITE Error: at least 2 rasters are required", on_line ) return band_concat(rasters) def _batch_parse_open(state, args_str, line_no): try: return state.get_raster(args_str) except Exception as e: _batch_error(f"Error while opening raster: {repr(e)}", line_no) def _batch_parse_save(state, args_str, line_no): # From c# files: # (inRaster;outName;outWorkspace;rasterType;nodata;blockwidth;blockheight) # nodata;blockwidth;blockheight are optional try: in_rs, out_name, out_dir, type_, *extra = _split_strip(args_str, ";") except ValueError: _batch_error( "SAVEFUNCTIONRASTER Error: Incorrect number of arguments", line_no ) n = len(extra) bwidth = None bheight = None nodata = 0 if n >= 1: nodata = float(extra[0]) if n >= 2: bwidth = int(extra[1]) if n == 3: bheight = int(extra[2]) if n > 3: _batch_error("SAVEFUNCTIONRASTER Error: Too many arguments", line_no) if type_ not in FTYPE_TO_EXT: _batch_error("SAVEFUNCTIONRASTER Error: Unknown file type", line_no) raster = state.get_raster(in_rs) out_name = os.path.join(out_dir, out_name) ext = FTYPE_TO_EXT[type_] out_name += f".{ext}" return raster.save(out_name, nodata, bwidth, bheight) def _batch_parse_set_null(state, args_str, line_no): rs = state.get_raster(args_str.pop(0)) if not rs._masked: rs.set_null_value(get_default_null_value(rs.dtype)) sranges = [_split_strip(r, "-") for r in args_str] ranges = [] for sr in sranges: try: lh = _parse_user_number(sr[0]) rh = _parse_user_number(sr[1]) ranges.extend((lh, rh, rs.null_value)) except ValueError: _batch_error("Error parsing range value", line_no) except TypeError: _batch_error("Range bounds must be numbers", line_no) return rs.remap_range(*ranges).set_null_value(rs.null_value) _FUNC_TO_PARSER = { "ARITHMETIC": _batch_parse_arithmetic, "COMPOSITE": _batch_parse_composite, "EXTRACTBAND": _batch_parse_extract_band, "NULLTOVALUE": _batch_parse_null_to_value, "OPENRASTER": _batch_parse_open, "REMAP": _batch_parse_remap, "SAVEFUNCTIONRASTER": _batch_parse_save, "SETNULL": _batch_parse_set_null, }
[docs]def parse_batch_script(path): state = _BatchScripParserState(path) with open(state.path) as fd: lines = fd.readlines() last_raster = None for i, line in enumerate(lines): # Ignore comments line, *_ = _split_strip(line, "#") if not line: continue lh, rh = _split_strip(line, "=") state.rasters[lh] = _parse_raster(state, lh, rh, i + 1) last_raster = lh state.final_raster = state.rasters[last_raster] return state
def _parse_raster(state, dst, expr, line_no): mat = _FUNC_PATTERN.match(expr) if mat is None: _batch_error("Could not parse function on line", line_no) func = mat["func"].upper() args = mat["args"] if func not in _FUNC_TO_PARSER: _batch_error(f"Unknown function {repr(func)}", line_no) return _FUNC_TO_PARSER[func](state, args, line_no)