Source code for Stoner.core.utils

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""Utility functions to support :py:mod:`Stoner.Core`."""

__all__ = ["add_core", "and_core", "sub_core", "mod_core", "copy_into", "tab_delimited", "decode_string"]

import copy
import csv
import re
from collections.abc import Mapping
from typing import Union, List, Mapping as MappingType, Callable
import numpy as np
from ..compat import index_types, int_types
from ..tools import all_type, copy_into
from .Typing import Numeric, Column_Index, Int_Types


[docs]def add_core(other: Union["DataFile", np.ndarray, List[Numeric], MappingType], newdata: "DataFile") -> "DataFile": """Implement the core work of adding other to self and modifying newdata. Args: other (DataFile,array,list): The data to be added newdata(DataFile): The instance to be modified Returns: newdata: A modified newdata """ if isinstance(other, np.ndarray): if len(newdata) == 0: # pylint: disable=len-as-condition ch = getattr(other, "column_headers", []) setas = getattr(other, "setas", "") t = np.atleast_2d(other) c = t.shape[1] if len(newdata.column_headers) < c: newdata.column_headers.extend([f"Column_{x}" for x in range(c - len(newdata.column_headers))]) newdata.data = t newdata.setas = setas newdata.column_headers = ch ret = newdata elif len(np.shape(other)) == 1: # 1D array, so assume a single row of data if np.shape(other)[0] == np.shape(newdata.data)[1]: newdata.data = np.append(newdata.data, np.atleast_2d(other), 0) ret = newdata else: return NotImplemented elif len(np.shape(other)) == 2 and np.shape(other)[1] == np.shape(newdata.data)[1]: # DataFile + array with correct number of columns newdata.data = np.append(newdata.data, other, 0) ret = newdata else: return NotImplemented elif isinstance(other, type(newdata)): # Appending another DataFile new_data = np.ones((other.shape[0], newdata.shape[1])) * np.nan for i in range(newdata.shape[1]): column = newdata.column_headers[i] try: new_data[:, i] = other.column(column) except KeyError: pass newdata.metadata.update(other.metadata) newdata.data = np.append(newdata.data, new_data, axis=0) ret = newdata elif isinstance(other, list): for o in other: newdata = newdata + o ret = newdata elif isinstance(other, Mapping): # First check keys all in newdata if len(newdata) == 0: newdata.data = np.atleast_2d(list(other.values())) newdata.column_headers = list(other.keys()) else: order = {} for k in other: try: order[k] = newdata.find_col(k) except (KeyError, re.error): mask = newdata.mask newdata.add_column(np.ones(len(newdata)) * np.NaN, header=k) newdata.mask[:, :-1] = mask newdata.mask[:, -1] = np.ones(len(newdata), dtype=bool) order[k] = newdata.shape[1] - 1 row = np.ones(newdata.shape[1]) * np.NaN mask = np.ones_like(row, dtype=bool) for k in order: row[order[k]] = other[k] mask[order[k]] = False old_mask = newdata.mask newdata.data = np.ma.append(newdata.data, np.atleast_2d(row), axis=0) newdata.mask[:-1, :] = old_mask newdata.mask[-1] = mask ret = newdata else: return NotImplemented ret._data._setas.shape = ret.shape for attr in newdata.__dict__: if attr not in ("setas", "metadata", "data", "column_headers", "mask") and not attr.startswith("_"): ret.__dict__[attr] = newdata.__dict__[attr] return ret
[docs]def and_core(other: Union["DataFile", np.ndarray], newdata: "DataFile") -> "DataFile": """Implement the core of the & operator, returning data in newdata. Args: other (array,DataFile): Data whose columns are to be added newdata (DataFile): instance of DataFile to be modified Returns: ():py:class:`DataFile`): new Data object with the columns of other concatenated as new columns at the end of the self object. """ if len(newdata.data.shape) < 2: newdata.data = np.atleast_2d(newdata.data) # Get other to be a numpy masked array of data # Get other_headers to be a suitable length list of strings if isinstance(other, type(newdata)): newdata.metadata.update(other.metadata) other_headers = other.column_headers other = copy.copy(other.data) elif isinstance(other, type(newdata.data)): other = copy.copy(other) if other.ndim < 2: # 1D array, make it 2D column other = np.atleast_2d(other) other = other.T other_headers = [f"Column {i + newdata.shape[1]}" for i in range(other.shape[1])] elif isinstance(other, np.ndarray): other = type(newdata.data)(copy.copy(other)) if other.ndim < 2: # 1D array, make it 2D column other = np.atleast_2d(other) other = other.T other_headers = [f"Column {i + newdata.shape[1]}" for i in range(other.shape[1])] else: return NotImplemented newdata_headers = newdata.column_headers + other_headers setas = newdata.setas.clone # Workout whether to extend rows on one side or the other if np.prod(newdata.data.shape) == 0: # Special case no data yet newdata.data = other elif newdata.data.shape[0] == other.shape[0]: newdata.data = np.append(newdata.data, other, 1) elif newdata.data.shape[0] < other.shape[0]: # Need to extend self.data extra_rows = other.shape[0] - newdata.data.shape[0] newdata.data = np.append(newdata.data, np.zeros((extra_rows, newdata.data.shape[1])), 0) new_mask = newdata.mask new_mask[-extra_rows:, :] = True newdata.data = np.append(newdata.data, other, 1) other_mask = np.ma.getmaskarray(other) new_mask = np.append(new_mask, other_mask, 1) newdata.mask = new_mask elif other.shape[0] < newdata.data.shape[0]: # too few rows we can extend with zeros extra_rows = newdata.data.shape[0] - other.shape[0] other = np.append(other, np.zeros((extra_rows, other.shape[1])), 0) other_mask = np.ma.getmaskarray(other) other_mask[-extra_rows:, :] = True new_mask = newdata.mask new_mask = np.append(new_mask, other_mask, 1) newdata.data = np.append(newdata.data, other, 1) newdata.mask = new_mask setas.column_headers = newdata_headers newdata._data._setas = setas newdata._data._setas.shape = newdata.shape for attr in newdata.__dict__: if attr not in ("setas", "metadata", "data", "column_headers", "mask") and not attr.startswith("_"): newdata.__dict__[attr] = newdata.__dict__[attr] return newdata
[docs]def mod_core(other: Column_Index, newdata: "DataFile") -> "DataFile": """Implement the column deletion method.""" if isinstance(other, index_types): newdata.del_column(other) else: newdata = NotImplemented newdata._data._setas.shape = newdata.shape return newdata
[docs]def sub_core(other: Union[Int_Types, slice, Callable], newdata: "DataFile") -> "DataFile": """Worker for the subtraction.""" if isinstance(other, (slice, int_types)) or callable(other): newdata.del_rows(other) elif isinstance(other, list) and (all_type(other, int_types) or all_type(other, bool)): newdata.del_rows(other) else: newdata = NotImplemented newdata._data._setas.shape = newdata.shape return newdata
[docs]class tab_delimited(csv.Dialect): """A customised csv dialect class for reading tab delimited text files.""" delimiter = "\t" quoting = csv.QUOTE_NONE doublequote = False lineterminator = "\r\n"
[docs]def decode_string(value: str) -> str: """Expand a string of column assignments, replacing numbers with repeated characters.""" pattern = re.compile(r"(([0-9]+)(x|y|z|d|e|f|u|v|w|\.|\-))") while True: res = pattern.search(value) if res is None: break (total, count, code) = res.groups() count = int(count) value = value.replace(total, code * count, 1) return value