Source code for sofa.access.variables

# Copyright (c) 2019 Jannika Lossner
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.

"""Classes for accessing arrays and data in the underlying :class:`netCDF4.Dataset`.
"""

#__all__ = ["get_values_from_array", "DatasetVariables", "StringArray", "Variable"]

import numpy as np


def filled_if_masked(array):
    if type(array) is np.ma.MaskedArray: return array.filled()
    return array


def is_integer(val):
    return np.issubdtype(type(val), np.integer)


def get_slice_tuple(dimensions, indices=None):
    if indices is None: return tuple([slice(None) for x in dimensions])
    if "M" in indices and "I" in dimensions:
        indices["I"] = 0 if is_integer(indices["M"]) else slice(None)
    return tuple([slice(None) if x not in indices else indices[x] for x in dimensions])


def get_default_dimension_order(dimensions, indices=None):
    if indices is None: return dimensions
    if "M" in indices and "I" in dimensions:
        indices["I"] = 0 if is_integer(indices["M"]) else slice(None)
    dim_order = tuple([x for x in dimensions if x not in indices or not is_integer(indices[x])])
    return dim_order


def get_dimension_order_transposition(original, new):
    old = original
    if "M" in new and "I" in old:  # replace "I" with "M" if necessary
        old = list(old)
        old[old.index("I")] = "M"
    if "M" in old and "I" in new:  # replace "I" with "M" if necessary
        new = list(new)
        new[new.index("I")] = "M"
    transposition = [old.index(x) for x in new]
    return tuple(transposition)


def get_values_from_array(array, dimensions, indices=None, dim_order=None):
    """Extract values of a given range from an array

    Parameters
    ----------
    array : array_like
        Source array
    dimensions : tuple of str
        Names of the array dimensions in order
    indices : dict(key:str, value:int or slice), optional
        Key: dimension name, value: indices to be returned, complete axis assumed if not provided
    dim_order : tuple of str
        Desired order of dimensions in the output array

    Returns
    -------
    values : np.ndarray
        Requested array range in regular or desired dimension order, if provided
    """
    sls = get_slice_tuple(dimensions, indices)
    if dim_order is None: return filled_if_masked(array[sls])

    old_dim_order = get_default_dimension_order(dimensions, indices)
    transposition = get_dimension_order_transposition(old_dim_order, dim_order)

    try:
        return filled_if_masked(np.transpose(array[sls], transposition))
    except Exception as e:
        raise Exception(
            "dimension mismatch: cannot transpose from {0} to {1} in order {2}, error {3}".format(old_dim_order,
                                                                                                  dim_order,
                                                                                                  transposition, e))
    return transposed

class _VariableBase:
    #    """Access the values of a NETCDF4 dataset variable"""
    def __init__(self, database, name):
        #        """
        #        Parameters
        #        ----------
        #        database : :class:`sofa.Database`
        #            Parent database instance
        #        name : str
        #            Variable name within the netCDF4 dataset
        #        """
        self._database = database
        self._name = name

    @property
    def name(self):
        return self._name
    @property
    def database(self):
        return self._database

    def __getattribute__(self, name):
        try:
            return super().__getattribute__(name)
        except AttributeError:
            try:
                return self._Matrix.__getattribute__(name)
            except: raise

    def __setattr__(self, name, value):
        if '_' in name:
            super().__setattr__(name, value)
            return

        # TODO: are there any cases in which this is wrong?
        self._Matrix.setncattr_string(name, value)

    def initialize(self, dims, data_type="d", fill_value=0):
        """Create the variable in the underlying netCDF4 dataset"""
        defined = self.database.Dimensions.list_dimensions()
        missing = []
        for d in dims:
            if d not in defined: missing.append(d)
        if len(missing): raise Exception("Cannot initialize, dimensions undefined: {0}".format(missing))
        try:
            self.database.dataset.createVariable(self.name, data_type, dims, fill_value=fill_value)
        except Exception as ex:
            raise Exception(
                "Failed to create variable for {0} of type {1} with fill value {2}, error = {3}".format(self.name,
                                                                                                        data_type, dims,
                                                                                                        fill_value,
                                                                                                        str(ex)))

    @property
    def _Matrix(self):
        if self.name not in self.database.Variables.list_variables(): return None
        return self.database.dataset[self.name]

    def exists(self):
        """Returns
        -------
        exists : bool
            True if variable exists, False otherwise
        """
        return self._Matrix is not None

    def dimensions(self):
        """Returns
        -------
        dimensions : tuple of str
            Variable dimension names in order
        """
        if not self.exists(): return None
        return self._Matrix.dimensions

    def axis(self, dim):
        """Parameters
        ----------
        dim : str
            Name of the dimension

        Returns
        -------
        axis : int
            Index of the dimension axis or None if unused
        """
        if dim in self.dimensions(): return self.dimensions().index(dim)
        if dim == "M" and "I" in self.dimensions(): return self.dimensions().index("I")
        return None

    def get_values(self, indices=None, dim_order=None):
        """
        Parameters
        ----------
        indices : dict(key:str, value:int or slice), optional
            Key: dimension name, value: indices to be returned, complete axis assumed if not provided
        dim_order : tuple of str, optional
            Desired order of dimensions in the output array

        Returns
        -------
        values : np.ndarray
            Requested array range in regular or desired dimension order, if provided
        """
        if not self.exists():
            raise Exception("failed to get values of {0}, variable not initialized".format(self.name))
        return get_values_from_array(self._Matrix, self.dimensions(), indices=indices, dim_order=dim_order)

    def _reorder_values_for_set(self, values, indices=None, dim_order=None, repeat_dim=None):
        """
        Parameters
        ----------
        values : np.ndarray
            New values for the array range
        indices : dict(key:str, value:int or slice), optional
            Key: dimension name, value: indices to be set, complete axis assumed if not provided
        dim_order : tuple of str, optional
            Dimension names in provided order, regular order assumed
        repeat_dim : tuple of str, optional
            Tuple of dimension names along which to repeat the values
        """
        if not self.exists():
            raise Exception("Variable {0} not initialized".format(self.name))
        dimensions = self.dimensions()
        if "I" in dimensions:
            dimensions = list(dimensions)
            dimensions[dimensions.index("I")] = "M"
            dimensions = tuple(dimensions)

            if indices is not None and "M" in indices.keys(): indices["M"] = 0

        if dim_order is not None and "I" in dim_order:
            dim_order = list(dim_order)
            dim_order[dim_order.index("I")] = "M"
            dim_order = tuple(dim_order)
        if repeat_dim is not None and "I" in repeat_dim:
            repeat_dim = list(repeat_dim)
            repeat_dim[repeat_dim.index("I")] = "M"
            repeat_dim = tuple(repeat_dim)

        sls = ()
        for d in dimensions:
            sl = slice(None)
            if indices is not None and d in indices: sl = indices[d]
            sls = sls + (sl,)
        new_values = np.asarray(values)

        # repeat along provided dimensions
        full_dim_order = dim_order
        if repeat_dim is not None:
            if full_dim_order is None:
                full_dim_order = tuple(x for x in dimensions if x not in repeat_dim)
            for d in repeat_dim:
                if dim_order is not None and d in dim_order:
                    raise Exception("cannot repeat values along dimension {0}: dimension already provided".format(d))
                    return None
                i = self.axis(d)
                if i is None:
                    raise Exception(
                        "cannot repeat values along dimension {0}: dimension unused by variable {1}".format(d,
                                                                                                            self.name))
                    return None
                count = self._Matrix[sls].shape[i]
                new_values = np.repeat([new_values], count, axis=0)
                full_dim_order = (d,) + full_dim_order

        # change order if necessary
        if full_dim_order is not None:
            do = ()
            for d in dimensions:
                if d in full_dim_order:
                    if indices is not None and d in indices.keys() and type(indices[d]) != slice:
                        raise Exception(
                            "cannot assign values to variable {0}: dimension {1} is {2}, not a slice".format(self.name,
                                                                                                             d, type(
                                    indices[d])))
                        return None
                    do = do + (full_dim_order.index(d),)
                elif indices is None or d not in indices.keys():
                    raise Exception("cannot assign values to variable {0}: missing dimension {1}".format(self.name, d))
                    return None
            new_values = np.transpose(new_values, do)

        return new_values, sls

    def set_values(self, values, indices=None, dim_order=None, repeat_dim=None):
        """
        Parameters
        ----------
        values : np.ndarray
            New values for the array range
        indices : dict(key:str, value:int or slice), optional
            Key: dimension name, value: indices to be set, complete axis assumed if not provided
        dim_order : tuple of str, optional
            Dimension names in provided order, regular order assumed
        repeat_dim : tuple of str, optional
            Tuple of dimension names along which to repeat the values
        """
        if not self.exists():
            raise Exception("failed to set values of {0}, variable not initialized".format(self.name))
        new_values, sls = self._reorder_values_for_set(values, indices, dim_order, repeat_dim)

        # assign
        self._Matrix[sls] = new_values
        return

[docs]class Variable(_VariableBase): def __init__(self, database, name): super().__init__(database, name) self._unit_proxy = None @property def Units(self): """Units of the values""" if not self.exists(): raise Exception("failed to get Units of {0}, variable not initialized".format(self.name)) if self._unit_proxy is None: return self._Matrix.Units return self._unit_proxy.Units @Units.setter def Units(self, value): if not self.exists(): raise Exception("failed to set Units of {0}, variable not initialized".format(self.name)) self._Matrix.Units = value
[docs]class DatasetVariables: # """Direct access the dataset variables""" def __init__(self, database): self.database = database
[docs] def get_variable(self, name): """Parameters ---------- name : str Name of the variable Returns ------- value : `sofa.access.Variable` Access object for the variable """ return Variable(self.database, name)
[docs] def get_string_array(self, name): """Parameters ---------- name : str Name of the string array Returns ------- value : `sofa.access.StringArray` Access object for the string array """ return StringArray(self.database, name)
[docs] def create_variable(self, name, dims, data_type="d", fill_value=0): """Parameters ---------- name : str Name of the variable dims : tuple(str) Dimensions of the variable Returns ------- value : `sofa.access.Variable` Access object for the variable """ var = self.get_variable(name) if var.exists(): # TODO: add raise error? print(name, "already exists in the dataset!") return var var.initialize(dims, data_type=data_type, fill_value=fill_value) return var
[docs] def create_string_array(self, name, dims): """Parameters ---------- name : str Name of the variable dims : tuple(str) Dimensions of the variable Returns ------- value : `sofa.access.StringArray` Access object for the string array """ var = self.get_string_array(name) if var.exists(): # TODO: add raise error? print(name, "already exists in the dataset!") return var var.initialize(dims) return var
[docs] def list_variables(self): """Returns ------- attrs : list List of the existing dataset variable and string array names """ return sorted(self.database.dataset.variables.keys())
[docs] def dump(self): """Prints all variables and their dimensions""" for vname in self.list_variables(): print("{0}: {1}".format(vname, self.get_variable(vname).dimensions())) return
[docs]class StringArray(_VariableBase):
[docs] def initialize(self, dims, data_type="c", fill_value='\0'): """Create the zero-padded character array in the underlying netCDF4 dataset. Dimension 'S' must be the last dimension, and is appended if not included in dims.""" if "S" not in dims: dims = dims + ("S",) if dims[-1] != "S": raise Exception("Failed to initialize character array with dimensions {0}, 'S' must be last dimension.".format(dims)) super().initialize(dims, data_type, fill_value)
[docs] def get_values(self, indices=None, dim_order=None): """ Parameters ---------- indices : dict(key:str, value:int or slice), optional Key: dimension name, value: indices to be returned, complete axis assumed if not provided dim_order : tuple of str, optional Desired order of dimensions in the output array Returns ------- values : np.ndarray Requested array range in regular or desired dimension order, if provided """ if dim_order is not None and "S" not in dim_order: dim_order = dim_order + ("S",) return super().get_values(indices, dim_order)
[docs] def set_values(self, values, indices=None, dim_order=None, repeat_dim=None): """ Parameters ---------- values : np.ndarray New values for the array range indices : dict(key:str, value:int or slice), optional Key: dimension name, value: indices to be set, complete axis assumed if not provided dim_order : tuple of str, optional Dimension names in provided order, regular order assumed repeat_dim : tuple of str, optional Tuple of dimension names along which to repeat the values """ if dim_order is not None and "S" not in dim_order: dim_order = dim_order + ("S",) # TODO: accept nested lists of strings that may be too short, convert into proper character array return super().set_values(values, indices, dim_order, repeat_dim)