Source code for Stoner.core.array

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""Provides the DataArray class.

A subclass of :py:class:`numpy.ma.MaskedArray` that knows that columns have  names."""

__all__ = ["DataArray"]

import copy

from numpy import ma
import numpy as np

from Stoner.compat import string_types, int_types
from Stoner.tools import isiterable, all_type, isnone, AttributeStore, all_size

from .setas import setas as _setas
from .exceptions import StonerSetasError


[docs]class DataArray(ma.MaskedArray): r"""A sub class of :py:class:`numpy.ma.MaskedArray` with a copy of the setas attribute to allow indexing by name. Attributes: column_headers (list): of strings of the column names of the data. i (array of integers): When read, returns the row umbers of the data. When written to, sets the base row index. The base row index is preserved when a DataArray is indexed. x,y,z (1D DataArray): When a column is declared to contain *x*, *y*, or *z* data, then these attributes access the corresponding columns. When written to, the attributes overwrite the existing column's data. d,e,f (1D DataArray): Where a column is identified as containing uncertainties for *x*, *y* or *z* data, then these attributes provide a quick access to them. When written to, the attributes overwrite the existing column's data. u,v,w (1D DataArray): Columns may be identieid as containing vectgor field information. These attributes provide quick access to them, assuming that they are defined as cartesian coordinates. When written to, the attributes overwrite the existing column's data. p,q,r (1D DataArray): These attributes access calculated columns that convert :math:`(x,y,z)` data or :math:`(u,v,w)` into :math:`(\phi,\theta,r)` polar coordinates. If on *x* and *y* columns are defined, then 2D polar coordinates are returned for *q* and *r*. setas (list or string): Actually a proxy to a magic class that handles the assignment of columns to different axes and also tracks the names of columns (so that columns may be accessed as named items). This array type is used to represent numeric data in the Stoner Package - primarily as a 2D matrix in :py:class:`Stoner.Core.DataFile` but also when a 1D row is required. In con trast to the parent class, DataArray understands that it came from a DataFile which has a setas attribute and column assignments. This allows the row to be indexed by column name, and also for quick attribute access to work. This makes writing functions to work with a single row of data more attractive. """ # ============================================================================================================== ############################ Object Construction ############################### # ============================================================================================================== def __new__(cls, input_array, *args, **kargs): """Create the new instance of the DataArray.""" # Input array is an already formed ndarray instance # We first cast to be our class type setas = kargs.pop("setas", _setas()) if isinstance(input_array, ma.MaskedArray): default_mask = input_array.mask else: default_mask = None mask = np.copy(kargs.pop("mask", default_mask)) column_headers = kargs.pop("column_headers", []) _row = kargs.pop("isrow", False) if isinstance(input_array, DataArray): i = input_array.i else: i = 0 obj = ma.asarray(input_array, *args, **kargs).view(cls) # add the new attribute to the created instance setas.shape = obj.shape obj._setas = setas if mask is not None: obj.mask = mask else: obj.mask = False # Finally, we must return the newly created object: obj.i = i obj.setas._row = _row and obj.ndim == 1 # Set shared mask - stops some deprication warnings obj.unshare_mask() if np.issubdtype(obj.dtype, np.floating): obj.fill_value = np.nan obj.column_headers = column_headers return obj def __array_finalize__(self, obj): """Numpy ndarray magic method.""" # see InfoArray.__array_finalize__ for comments super().__array_finalize__(obj) if obj is None: self._setas = _setas() self.i = 0 self.mask = False if np.issubdtype(self.dtype, np.floating): self.fill_value = np.nan self._setas._row = False self._setas.shape = (0,) else: self._setas = getattr(obj, "_setas", _setas()) if isinstance(obj, DataArray): self.i = obj.i self.mask = obj.mask self.fill_value = obj.fill_value self._setas._row = getattr(obj._setas, "_row", False) else: self.i = 0 self.mask = False self._setas._row = False if np.issubdtype(self.dtype, np.floating): self.fill_value = np.nan self._setas.shape = getattr(self, "shape", (0,)) def __array_wrap__(self, obj, context=None): """Make sure ufuncs do the right thing with DataArrays.""" ret = ma.MaskedArray.__array_wrap__(self, obj, context) return ret def _prep_index(self, ix): """Normalise the index for a __getitem__.""" if isinstance(ix, string_types): if self.ndim > 1: ret_ix = (slice(None, None, None), self._setas.find_col(ix)) else: ret_ix = (self._setas.find_col(ix),) return ret_ix if isinstance(ix, (int_types, slice)): return (ix,) if isinstance(ix, tuple) and ix and isinstance(ix[-1], string_types): # index still has a string type in it ix = list(ix) ix[-1] = self._setas.find_col(ix[-1]) return tuple(ix) if ( isinstance(ix, tuple) and ix and isinstance(ix[-1], np.ndarray) and self.ndim == 1 ): # Indexing with a numpy array if len(ix) == 1: return ix[0] if isinstance(ix, tuple) and ix and isiterable(ix[-1]): # indexing with a list of columns ix = list(ix) if all_type(ix[-1], bool): ix[-1] = np.arange(len(ix[-1]))[ix[-1]] ix[-1] = [self._setas.find_col(c) for c in ix[-1]] return tuple(ix) if isinstance(ix, tuple) and ix and isinstance(ix[0], string_types): # oops! backwards indexing c = ix[0] ix = list(ix[1:]) ix.append(self._setas.find_col(c)) return tuple(ix) # Now can index with our constructed multidimesnional indexer return ix # ============================================================================================================== ############################ Property Accessor Functions ############################### # ============================================================================================================== @property def _(self): """Return the DataArray as a normal numpy array for those operations that need this.""" return ma.getdata(self) @property def isrow(self): """Define whether this is a single row or a column if 1D.""" return self._setas._row @isrow.setter def isrow(self, value): """Set whether this object is a single row or not.""" self._setas._row = self.ndim == 1 and value @property def r(self): r"""Calculate the radius :math:`\rho` coordinate if using spherical or polar coordinate systems.""" axes = int(self._setas.cols["axes"]) m = [ lambda d: None, lambda d: None, lambda d: np.sqrt(d.x**2 + d.y**2), lambda d: np.sqrt(d.x**2 + d.y**2 + d.z**2), lambda d: np.sqrt(d.x**2 + d.y**2 + d.z**2), lambda d: np.sqrt(d.u**2 + d.v**2), lambda d: np.sqrt(d.u**2 + d.v**2 + d.w**2), ] ret = m[axes](self) if ret is None: raise StonerSetasError( f"Insufficient axes defined in setas to calculate the r component. need 2 not {axes}" ) return ret @property def q(self): r"""Calculate the azimuthal :math:`\theta` coordinate if using spherical or polar coordinates.""" axes = int(self._setas.cols["axes"]) m = [ lambda d: None, lambda d: None, lambda d: np.arctan2(d.x, d.y), lambda d: np.arctan2(d.x, d.y), lambda d: np.arctan2(d.x, d.y), lambda d: np.arctan2(d.u, d.v), lambda d: np.arctan2(d.u, d.v), ] ret = m[axes](self) if ret is None: raise StonerSetasError( f"Insufficient axes defined in setas to calculate the theta component. need 2 not {axes}" ) return ret @property def p(self): r"""Calculate the inclination :math:`\phi` coordinate for spherical coordinate systems.""" axes = int(self._setas.cols["axes"]) m = [ lambda d: None, lambda d: None, lambda d: None, lambda d: np.arcsin(d.z), lambda d: np.arsin(d.z), lambda d: np.arcsin(d.w), lambda d: np.arcsin(d.w), ] ret = m[axes](self) if ret is None: raise StonerSetasError( f"Insufficient axes defined in setas to calculate the phi component. need 3 not {axes}" ) return ret @property def i(self): """Return the row indices of the DataArray or sets the base index - the row number of the first row.""" if not hasattr(self, "_ibase"): self._ibase = [] if len(self._ibase) == 1 and self.isrow: ret = min(self._ibase) else: ret = self._ibase return ret @i.setter def i(self, value): if self.ndim == 0: pass elif self.ndim == 1 and self.isrow: if isiterable(value) and value: self._ibase = np.array([min(value)]) else: self._ibase = np.array([value]) elif self.ndim >= 1: r = self.shape[0] if isiterable(value) and len(value) == r: # Iterable and the correct length - assign straight self._ibase = np.array(value) elif isiterable(value) and len(value) > 0: # Iterable but not the correct length - count from min of value self._ibase = np.arange(min(value), min(value) + r) elif ( isiterable(value) and len(value) == 0 ): # Iterable but not the correct length - count from min of value self._ibase = np.arange(0, r, r) else: # No iterable self._ibase = np.arange(value, value + r) @property def column_headers(self): """Pass through to the setas attribute.""" return self._setas.column_headers @column_headers.setter def column_headers(self, value): """Write the column_headers attribute (delagated to the setas object).""" self._setas.column_headers = value @property def setas(self): """Return an object for setting column assignments.""" if self._setas is None: self._setas = _setas() if self._setas.shape != self.shape: self._setas.shape = self.shape return self._setas @setas.setter def setas(self, value): """Set the object for setting column assignments.""" if isinstance(value, _setas): value = value.clone setas = self.setas setas(value) # ============================================================================================================== ############################ Special Methods #################################################### # ============================================================================================================== def __reduce__(self): """Implement hooks for pickling.""" # Get the parent's __reduce__ tuple pickled_state = super().__reduce__() # Create our own tuple to pass to __setstate__ new_state = pickled_state[2] + (self._setas, self.i) # Return a tuple that replaces the parent's __setstate__ tuple with our own return (pickled_state[0], pickled_state[1], new_state) def __setstate__(self, state): """Implement hooks for unpickling.""" self._setas = state[-2] # Set the info attribute # Call the parent's __setstate__ with the other tuple elements. super().__setstate__(state[0:-2]) self.i = state[-1] def __getattr__(self, name): """Get a column using the setas attribute.""" # Overrides __getattr__ to allow access as row.x etc. col_check = { "x": "xcol", "d": "xerr", "y": "ycol", "e": "yerr", "z": "zcol", "f": "zerr", "u": "ucol", "v": "vcol", "w": "wcol", } if name in self.setas.cols: return self.setas.__getattr__(name) if name not in col_check: return super().__getattribute__(name) indexer = [slice(0, dim, 1) for ix, dim in enumerate(self.shape)] col = col_check[name] if col.startswith("x"): if self._setas.cols[col] is not None: indexer[-1] = self._setas.cols[col] ret = self[tuple(indexer)] if ret.ndim > 0: ret.column_headers = self.column_headers[self._setas.cols[col]] else: ret = None else: if isiterable(self._setas.cols[col]) and len(self._setas.cols[col]) > 0: indexer[-1] = self._setas.cols[col][0] elif isiterable(self._setas.cols[col]): indexer[-1] = self._setas.cols[col] else: return None ret = self[tuple(indexer)] if ret.ndim > 0: ret.column_headers = self.column_headers[indexer[-1]] if ret is None: raise StonerSetasError(f"Tried accessing a {name} column, but setas is not defined.") return ret def __getitem__(self, ix): """Indexing function for DataArray. Args: ix (various): Index to find. Returns: An indexed part of the DataArray object with extra attributes. Notes: This tries to support all of the indexing operations of a regular numpy array, plus the special operations where one columns are named. Warning: The code almost certainly makes some assumptiuons that DataArray is one or 2D and may blow up with 3D arrays ! On the other hand it has a special case exception for where you give a string as the first index element and assumes that you've forgotten that we're row major and tries to do the right thing. """ # Is this going to be a single row ? single_row = isinstance(ix, int_types) or ( isinstance(ix, tuple) and len(ix) > 0 and isinstance(ix[0], int_types) ) # If the index is a single string type, then build a column accessing index ix = self._prep_index(ix) ret = super().__getitem__(ix) if isinstance(ret, np.ndarray) and ret.ndim > 0 and ret.size == 1: # Numpy extract [x] to x ret = ret.ravel()[0] if ret.ndim == 0: if isinstance(ret, ma.core.MaskedConstant): if ret.mask: return self.fill_value if isinstance(ret, ma.MaskedArray): ret = ma.filled(ret) return ret.dtype.type(ret) if not isinstance(ret, np.ndarray): # bugout for scalar returns return ret if ret.ndim >= 2: # Potentially 2D array here if ix[-1] is None: # Special case for increasing an array dimension if self.ndim == 1: # Going from 1 D to 2D ret.setas = self.setas.clone ret.i = self.i ret.name = getattr(self, "name", "Column") return ret ret.isrow = single_row ret.setas = self.setas.clone ret.column_headers = copy.copy(self.column_headers) if len(ix) > 0 and isiterable(ix[-1]): # pylint: disable=len-as-condition ret.column_headers = list(np.array(ret.column_headers)[ix[-1]]) # Sort out whether we need an array of row labels if isinstance(self.i, np.ndarray) and len(ix) > 0: # pylint: disable=len-as-condition if isiterable(ix[0]) or isinstance(ix[0], int_types): ret.i = self.i[ix[0]] else: ret.i = 0 else: ret.i = self.i return ret if ret.ndim == 1: # Potentially a single row or single column ret.isrow = single_row if not single_row: # Workoput what the original setas was if isinstance(ix, tuple) and len(ix) >= 2: tmp = np.array(self.setas)[ix[-1]].ravel() ret.setas(tmp) tmpcol = np.array(self.column_headers)[ix[-1]] ret.column_headers = tmpcol else: ret.setas = self.setas.clone ret.column_headers = copy.copy(self.column_headers) # Sort out whether we need an array of row labels if single_row and isinstance(self.i, np.ndarray): ret.i = self.i[ix[0]] else: # This is a single element? ret.i = self.i if not single_row: ret.name = self.column_headers return ret def __setitem__(self, ix, val): """Override __setitem__ to handle string indexing.""" if isinstance(ix, string_types): ix = self._setas.find_col(ix) elif isinstance(ix, tuple) and isinstance(ix[-1], string_types): ix = list(ix) ix[-1] = self._setas.find_col(ix[-1]) ix = tuple(ix) elif isinstance(ix, tuple) and isinstance(ix[0], string_types): c = ix[0] ix = list(ix[1:]) ix.append(self._setas.find_col(c)) ix = tuple(ix) if self.sharedmask: # We do not want to share a mask when we're about to change soimething here... self.unshare_mask() super().__setitem__(ix, val) # ============================================================================================================== ############################ Private Methods ####################################### # ============================================================================================================== def _col_args( self, scalar=True, xcol=None, ycol=None, zcol=None, ucol=None, vcol=None, wcol=None, xerr=None, yerr=None, zerr=None, **kargs, ): # pylint: disable=unused-argument """Create an object which has keys based either on arguments or setas attribute.""" cols = { "xcol": xcol, "ycol": ycol, "zcol": zcol, "ucol": ucol, "vcol": vcol, "wcol": wcol, "xerr": xerr, "yerr": yerr, "zerr": zerr, } no_guess = kargs.get("no_guess", True) for i in cols.values(): if i is not None: # User specification wins out break else: # User didn't set any values, setas will win no_guess = kargs.get("no_guess", False) ret = AttributeStore(self.setas._get_cols(no_guess=no_guess)) force_list = kargs.get("force_list", not scalar) for c in list(cols.keys()): if isnone(cols[c]): # Not defined, fallback on setas del cols[c] continue if isinstance(cols[c], bool) and not cols[c]: # False, delete column altogether del cols[c] if c in ret: del ret[c] continue if c in ret and isinstance(ret[c], list): if isinstance(cols[c], float) or (isinstance(cols[c], np.ndarray) and cols[c].size == len(self)): continue if isinstance(cols[c], float): continue cols[c] = self.setas.find_col(cols[c], force_list=force_list) ret.update(cols) if scalar: for c in ret: if isinstance(ret[c], list): if ret[c]: ret[c] = ret[c][0] else: ret[c] = None elif isinstance(scalar, bool) and not scalar: for c in ret: if c.startswith("x") or c.startswith("has_"): continue if not isiterable(ret[c]) and ret[c] is not None: ret[c] = list([ret[c]]) elif ret[c] is None: ret[c] = [] for n in ["xcol", "xerr", "ycol", "yerr", "zcol", "zerr", "ucol", "vcol", "wcol", "axes"]: ret[f"has_{n}"] = n in ret and not (ret[n] is None or (isinstance(ret[n], list) and not ret[n])) return ret # ============================================================================================================== ############################ Public Methods ######################################## # ==============================================================================================================
[docs] def keys(self): """Return a list of column headers.""" return self._setas.column_headers
[docs] def swap_column(self, *swp, **kargs): """Swap pairs of columns in the data. Useful for reordering data for idiot programs that expect columns in a fixed order. Args: swp (tuple of list of tuples of two elements): Each element will be iused as a column index (using the normal rules for matching columns). The two elements represent the two columns that are to be swapped. headers_too (bool): Indicates the column headers are swapped as well Returns: self: A copy of the modified :py:class:`DataFile` objects Note: If swp is a list, then the function is called recursively on each element of the list. Thus in principle the @swp could contain lists of lists of tuples """ headers_too = kargs.pop("headers_too", True) setas_too = kargs.pop("setas_too", True) if len(swp) == 1: swp = swp[0] if isinstance(swp, list) and all_type(swp, tuple) and all_size(swp, 2): for item in swp: self.swap_column(item, headers_too=headers_too) elif isinstance(swp, tuple): col1 = self._setas.find_col(swp[0]) col2 = self._setas.find_col(swp[1]) self[:, [col1, col2]] = self[:, [col2, col1]] if headers_too: self._setas.column_headers[col1], self._setas.column_headers[col2] = ( self._setas.column_headers[col2], self._setas.column_headers[col1], ) if setas_too: self._setas[col1], self._setas[col2] = self._setas[col2], self._setas[col1] else: raise TypeError( "Swap parameter must be either a tuple or a \ list of tuples" )