Source code for Stoner.folders.each

"""Classes and support functions for the :py:attr:`Stoner.DataFolder.each`.magic attribute."""
__all__ = ["Item"]
from collections.abc import MutableSequence
from functools import wraps, partial
from traceback import format_exc

import numpy as np

from ..tools import isiterable
from ..compat import string_types
from .utils import get_pool


def _worker(d, **kwargs):
    """Support function to run an arbitrary function over a :py:class:`Stoner.Data` object."""
    byname = kwargs.get("byname", False)
    func = kwargs.get("func", lambda x: x)
    if byname:
        func = getattr(d, func, lambda x: x)
    args = kwargs.get("args", tuple())
    kargs = kwargs.get("kargs", dict)
    if hasattr(d, "setas"):
        d["setas"] = list(d.setas)
    d["args"] = args
    d["kargs"] = kargs
    d["func"] = func.__name__
    try:
        if byname:  # Ut's an instance bound moethod
            ret = func(*args, **kargs)
        else:  # It's an arbitrary function
            ret = func(d, *args, **kargs)
    except Exception as e:  # pylint: disable=W0703 # Ok to be broad as user func could do anything
        ret = e, format_exc()
    return (d, ret)


class SetasWrapper(MutableSequence):

    """Manages wrapping each member of the folder's setas attribute."""

    def __init__(self, parent):
        """Note a reference to the parent item class instance and folder."""
        self._each = parent
        self._folder = parent._folder

    def __call__(self, *args, **kargs):
        """Pass through the calls the setas method of each item in our folder."""
        _ = self._folder._object_attrs.pop("setas", None)
        for obj in self._folder:
            obj.setas(*args, **kargs)
        self._folder._object_attrs["setas"] = self.collapse()

        return self._folder

    def __len__(self):
        """Return the shortest length of all the setas elements in the folder."""
        lengths = np.array([len(data.setas) for data in self._folder])
        return abs(lengths.min())

    def __getitem__(self, index):
        """Get the corresponding item from all the setas items in the folder."""
        return [data.setas[index] for data in self._folder]

    def __setitem__(self, index, value):
        """Set the value of the specified item on the setas elements in the folder.

        Args:
            index (int):
                The column index of the individual setas attributes to change
            value (iterable):
                The set of values to apply to the setas attributes.

        Notes:
            If value has a length less than the data folder, then the final value is repeated to encompass the
            remaining elements.
        """
        if len(value) < len(self._folder):
            value = value + value[-1] * (len(self._folder) - len(value))
        v = "."
        for v, data in zip(value, self._folder):
            data.setas[index] = v
        setas = self._folder._object_attrs.get("setas", self.collapse())
        setas[index] = v
        self._folder._object_attrs["setas"] = setas

    def __delitem__(self, index):
        """Cannot delete items from the proxy setas object - so simply clear it instead."""
        for data in self._folder:
            data.setas[index] = "."
        setas = self._folder._object_attrs.get("setas", self.collapse())
        setas[index] = "."
        self._folder._object_attrs["setas"] = setas

    def insert(self, index, value):
        """Cannot insert items into the proxy setas object."""
        raise IndexError("Cannot insert into the objectFolder's setas - insdert into the objectFolder instead!")

    def collapse(self):
        """Collapse the setas into a single list if possible."""
        setas = []
        for v in self:
            if len(v):
                if np.unique(v).size == 1:
                    setas.append(v[0])
                else:
                    setas.append("-")
        return setas


[docs]class Item: """Provides a proxy object for accessing methods on the inividual members of a Folder. Notes: The purpose of this class is to allow it to be explicit that we're calling methods on the members of the folder rather than a collective method. This allows us to work around nameclashes. """ _folder = None def __init__(self, folder): """Create the each proxy object. Notes the partent folder that created us.""" self._folder = folder @property def setas(self): """Return a proxy object for manipulating all the setas objects in a folder.""" return SetasWrapper(self) @setas.setter def setas(self, value): """Manipualte the setas property of all the objects in the folder.""" setas = self.setas setas(value) self._folder._object_attrs["setas"] = setas.collapse() return setas
[docs] def __call__(self, func, *args, **kargs): """Iterate over the baseFolder, calling func on each item. Args: func (callable, str): Either a callable object, or the name of a callable object (either method or global) that must take a metadataObject type instance as it's first argument. Keyword Args: _return (None, bool or str): Controls how the return value from *func* is added to the DataFolder Returns: A list of the results of evaluating *func* for each item in the folder. Notes: If *_return* is None and the return type of *func* is the same type as the :py:class:`baseFolder` is storing, then the return value replaces the original :py:class:`Stoner.Core.metadataobject` in the :py:class:`baseFolder`. If *_result* is True the return value is added to the :py:class:`Stoner.Core.metadataObject`'s metadata under the name of the function. If *_result* is a string. then return result is stored in the corresponding name. """ # Just call the iter generator but assemble into a list. if isinstance(func, string_types) and "_byname" not in kargs: if func in globals() and callable(globals()[func]): func = globals()[func] else: func = getattr(self, func) return list(self.iter(func, *args, **kargs))
def __dir__(self): """Return a list of the common set of attributes of the instances in the folder.""" if self._folder and len(self._folder) != 0: res = set(dir(self._folder[0])) else: res = set() if self._folder and len(self._folder) > 0: for d in self._folder[1:]: res &= set(dir(d)) return list(res) def __delattr__(self, name): """Handle removing an attribute from the folder, including proxied attributes.""" if name in dir(self._folder.instance) or ( len(self._folder) and hasattr(self._folder[0], name) ): # This is an instance attribute # If we're tracking the object attributes and have a type set, then we can store this for adding to all # loaded objects on read. del self._folder._object_attrs[name] for _, this in self._folder.loaded: try: delattr(this, name) except AttributeError: pass elif name in self._folder._instance_attrs: del self._folder._instance_attrs[name] else: raise AttributeError(f"Unrecognised attribute {name}") def __getattr__(self, name): """Handle some special case attributes that provide alternative views of the objectFolder. Args: item (string): The attribute name being requested Returns: Depends on the attribute Notes: If *name* is not present on the empty member instance, then the first member of the folder is checked as well. This allows the attributes of a :py:class:`Stoner.Data` object that derive from the Lpy:attr:`Stoner.Data.setas` attribute (such as *.x*, *.y* or *.e* etc) can be accessed. """ try: return super().__getattr__(name) except AttributeError: pass try: instance = self._folder.instance if callable(getattr(instance, name, None)): # It's a method ret = self.__getattr_proxy(name) else: # It's a static attribute if name in self._folder._object_attrs and len(self._folder) == 0: ret = self._folder._object_attrs[name] elif len(self._folder): ret = [(not hasattr(x, name), getattr(x, name, None)) for x in self._folder] mask, values = zip(*ret) lens = np.unique([len(v) if isiterable(v) else 0 for v in values]) if lens.size == 1: ret = np.ma.MaskedArray(values) else: ret = np.ma.MaskedArray(values, dtype=object) ret.mask = mask else: ret = getattr(instance, name, None) if ret is None or (hasattr(ret, "mask") and np.all(ret.mask)): raise AttributeError except AttributeError as err: # Ok, pass back raise AttributeError(f"{name} is not an Attribute of {type(self)} or {type(instance)}") from err # except TypeError as err: # Can be triggered if self.instance lacks the attribute # if len(self._folder) and hasattr(self._folder[0], name): # ret = [(not hasattr(x, name), getattr(x, name, None)) for x in self._folder] # mask, values = zip(*ret) # ret = np.ma.MaskedArray(values) # ret.mask = mask # else: # raise err return ret def __setattr__(self, name, value): """Proxy call to set an attribute. Setting the attribute on .each sets it on all instantiated objects and in _object_attrs. Args: name(str): Attribute to set value (any): Value to set Notes: If *name* is not present on the empty member instance, then the first member of the folder is checked as well. This allows the attributes of a :py:class:`Stoner.Data` object that derive from the Lpy:attr:`Stoner.Data.setas` attribute (such as *.x*, *.y* or *.e* etc) can be accessed. If *value* is iterable and the same length as the folder, then each element in the folder is loaded and the corresponding element of *value* is assigned to the attribute of the member. """ if hasattr(type(self), name) or name.startswith("_"): # Handle setting our own attributes super().__setattr__(name, value) elif name in dir(self._folder.instance) or ( len(self._folder) and hasattr(self._folder[0], name) ): # This is an instance attribute if isiterable(value) and len(value) == len(self._folder): force_load = True else: force_load = False self._folder._object_attrs[name] = value # Add to attributes to be set on load value = [value] * len(self._folder) for d, v in zip(self._folder.__names__(), value): # And set on all instantiated objects if force_load or isinstance(self._folder.__getter__(d, instantiate=False), self._folder.type): d = self._folder.__getter__(d) setattr(d, name, v) else: raise AttributeError(f"Unknown attribute {name}") def __getattr_proxy(self, item): """Make a prpoxy call to access a method of the metadataObject like types. Args: item (string): Name of method of metadataObject class to be called Returns: Either a modified copy of this objectFolder or a list of return values from evaluating the method for each file in the Folder. """ meth = getattr(self._folder.instance, item, None) @wraps(meth) def _wrapper_(*args, **kargs): """Wrap a call to the metadataObject type for magic method calling. Keyword Arguments: _return (index types or None): specify to store the return value in the individual object's metadata Note: This relies on being defined inside the enclosure of the objectFolder method so we have access to self and item """ kargs["_byname"] = True return self(item, *args, **kargs) # Develove to self.__call__ where we have multiprocess magic # Ok that's the wrapper function, now return it for the user to mess around with. return _wrapper_ def __rmatmul__(self, other): """Implement callable@DataFolder as a generic iterate a function over DataFolder members. Returns: An object that supports __call__ and knows about this DataFolder. """ if not callable(other): return NotImplemented @wraps(other) def _wrapper_(*args, **kargs): """Wrap a call to the metadataObject type for magic method calling. Keyword Arguments: _return (index types or None): specify to store the return value in the individual object's metadata Note: This relies on being defined inside the enclosure of the objectFolder method so we have access to self and item """ kargs["_byname"] = False # Force the __call__ to use the callable function return self(other, *args, **kargs) # Delegate to self.__call__ which has multiprocess magic. # Ok that's the wrapper function, now return it for the user to mess around with. return _wrapper_
[docs] def iter(self, func, *args, **kargs): """Iterate over the baseFolder, calling func on each item. Args: func (callable): A Callable object that must take a metadataObject type instance as it's first argument. Keyword Args: _return (None, bool or str): Controls how the return value from *func* is added to the DataFolder Returns: A list of the results of evaluating *func* for each item in the folder. Notes: If *_return* is None and the return type of *func* is the same type as the :py:class:`baseFolder` is storing, then the return value replaces the original :py:class:`Stoner.Core.metadataobject` in the :py:class:`baseFolder`. If *_result* is True the return value is added to the :py:class:`Stoner.Core.metadataObject`'s metadata under the name of the function. If *_result* is a string. then return result is stored in the corresponding name. """ _return = kargs.pop("_return", None) _byname = kargs.pop("_byname", False) _serial = kargs.pop("_serial", False) self._folder.fetch() # Prefetch thefolder in case we can do it in parallel p, imap = get_pool(_serial) for ix, (new_d, ret) in enumerate( imap(partial(_worker, func=func, args=args, kargs=kargs, byname=_byname), self._folder) ): if self._folder.debug: print(ix, type(ret)) if isinstance(ret, self._folder._type) and _return is None: try: # Check if ret has same data type, otherwise will not overwrite well if ret.data.dtype != new_d.data.dtype: continue new_d = ret except AttributeError: pass elif _return is not None: if isinstance(_return, bool) and _return: _return = func.__name__ new_d[_return] = ret name = self._folder.__names__()[ix] self._folder.__setter__(name, new_d) yield ret if p is not None: p.close() p.join()