Source code for Stoner.folders.each

# -*- coding: utf-8 -*-
"""Classes and support functions for the :py:attr:`Stoner.DataFolder.each`.magic attribute."""
from __future__ import annotations

__all__ = ["Item"]
from collections.abc import MutableSequence
from functools import wraps, partial
from traceback import format_exc
import os
from typing import TYPE_CHECKING

import numpy as np

from ..tools import isiterable
from ..compat import string_types
from .utils import get_pool

if TYPE_CHECKING:
    from ..core import metadataObject
    from .core import baseFolder
    from typing import Tuple, List, Any, Union, Optional, Callable


def _worker(d: metadataObject, **kwargs) -> Tuple[metadataObject, Any]:
    """Support function to run an arbitary function over a :py:class:`Stoner.Data` object."""
    byname = kwargs.get("byname", False)
    func = kwargs.get("_func", lambda x: x)
    if byname:
        func = getattr(d, func, lambda x: x)
    args = kwargs.get("args", tuple())
    kargs = kwargs.get("kargs", dict)
    if hasattr(d, "setas"):
        d["setas"] = list(d.setas)
    d["args"] = args
    d["kargs"] = kargs
    d["func"] = func.__name__
    try:
        if byname:  # Ut's an instance bound moethod
            ret = func(*args, **kargs)
        else:  # It's an arbitary function
            ret = func(d, *args, **kargs)
    except Exception as e:  # pylint: disable=W0703 # Ok to be broad as user func could do anything
        ret = e, format_exc()
    return (d, ret)


class SetasWrapper(MutableSequence):

    """Manages wrapping each member of the folder's setas attribute."""

    def __init__(self, parent: "Item"):
        """Note a reference to the parent item class instance and folder."""
        self._each = parent
        self._folder = parent._folder

    def __call__(self, *args, **kargs):
        """Pass through the calls the setas method of each item in our folder."""
        _ = self._folder._object_attrs.pop("setas", None)
        for ix, obj in enumerate(self._folder):
            obj.setas(*args, **kargs)
        self._folder._object_attrs["setas"] = self.collapse()

        return self._folder

    def __len__(self):
        """Return the shortest length of all the setas elements in the folder."""
        lengths = np.array([len(data.setas) for data in self._folder])
        return abs(lengths.min())

    def __getitem__(self, index: Union[str, int, slice]) -> Union[str, List[str], int]:
        """Get the corresponding item from all the setas items in the folder."""
        return [data.setas[index] for data in self._folder]

    def __setitem__(self, index: Union[str, int], value):
        """Set the value of the specified item on the setas elements in the folder.

        Args:
            index (int):
                The column index of the individual setas attributes to change
            value (iterable):
                The set of values to apply to the setas attributes.

        Notes:
            If value has a length less than the data folder, then the final value is repeated to encompass the
            remaining elements.
        """
        if len(value) < len(self._folder):
            value = value + value[-1] * (len(self._folder) - len(value))
        for v, data in zip(value, self._folder):
            data.setas[index] = v
        setas = self._folder._object_attrs.get("setas", self.collapse())
        setas[index] = v
        self._folder._object_attrs["setas"] = setas

    def __delitem__(self, index: Union[str, int]):
        """Cannot delete items from the proxy setas object - so simply clear it instead."""
        for data in self._folder:
            data.setas[index] = "."
        setas = self._folder._object_attrs.get("setas", self.collapse())
        setas[index] = "."
        self._folder._object_attrs["setas"] = setas

    def insert(self, index: Union[str, int], value: Any):
        """Cannot insert items into the proxy setas object."""
        raise NotImplementedError(
            "Cannot insert into the objectFolder's setas - insdert into the objectFolder instead!"
        )

    def collapse(self) -> List[str]:
        """Collapse the setas into a single list if possible."""
        setas = []
        for v in self:
            if len(v):
                if np.unique(v).size == 1:
                    setas.append(v[0])
                else:
                    setas.append("-")
        return setas


[docs]class Item: """Provides a proxy object for accessing methods on the inividual members of a Folder. Notes: The pupose of this class is to allow it to be explicit that we're calling methods on the members of the folder rather than a collective method. This allows us to work around nameclashes. """ _folder = None def __init__(self, folder: baseFolder): """Create the each proxy object. Notes the partent folder that created us.""" self._folder = folder @property def setas(self) -> SetasWrapper: """Return a proxy object for manipulating all the setas objects in a folder.""" return SetasWrapper(self) @setas.setter def setas(self, value: Union[str, List[str]]): """Manipualte the setas property of all the objects in the folder.""" setas = self.setas setas(value) self._folder._object_attrs["setas"] = setas.collapse() return setas
[docs] def __call__(self, func: Callable, *args, **kargs) -> Any: """Iterate over the baseFolder, calling func on each item. Args: func (callable, str): Either a callable object, or the name of a callable object (either method or global) that must take a metadataObject type instance as it's first argument. Keyword Args: _return (None, bool or str): Controls how the return value from *func* is added to the DataFolder Returns: A list of the results of evaluating *func* for each item in the folder. Notes: If *_return* is None and the return type of *func* is the same type as the :py:class:`baseFolder` is storing, then the return value replaces trhe original :py:class:`Stoner.Core.metadataobject` in the :py:class:`baseFolder`. If *_result* is True the return value is added to the :py:class:`Stoner.Core.metadataObject`'s metadata under the name of the function. If *_result* is a string. then return result is stored in the corresponding name. """ # Just call the iter generator but assemble into a list. if isinstance(func, string_types) and "_byname" not in kargs: if func in globals() and callable(globals()[func]): func = globals()[func] else: func = getattr(self, func) return list(self.iter(func, *args, **kargs))
def __dir__(self) -> List[str]: """Return a list of the common set of attributes of the instances in the folder.""" if self._folder and len(self._folder) != 0: res = set(dir(self._folder[0])) else: res = set() if self._folder and len(self._folder) > 0: for d in self._folder[1:]: res &= set(dir(d)) return list(res) def __delattr__(self, name: str): """Handle removing an attribute from the folder, including proxied attributes.""" if name in dir(self._folder.instance) or ( len(self._folder) and hasattr(self._folder[0], name) ): # This is an instance attribute # If we're tracking the object attributes and have a type set, then we can store this for adding to all # loaded objects on read. del self._folder._object_attrs[name] for _, this in self._folder.loaded: try: delattr(this, name) except AttributeError: pass elif name in self._folder._instance_attrs: del self._folder._instance_attrs[name] else: raise AttributeError(f"Unrecognised attribute {name}") def __getattr__(self, name: str) -> Any: """Handle some special case attributes that provide alternative views of the objectFolder. Args: item (string): The attribute name being requested Returns: Depends on the attribute Notes: If *name* is not present on the empty member instance, then the first member of the folder is checked as well. This allows the attributes of a :py:class:`Stoner.Data` object that derive from the Lpy:attr:`Stoner.Data.setas` attribute (such as *.x*, *.y* or *.e* etc) can be accessed. """ try: return super().__getattr__(name) except AttributeError: pass try: instance = self._folder.instance if callable(getattr(instance, name, None)): # It's a method ret = self.__getattr_proxy(name) else: # It's a static attribute if name in self._folder._object_attrs and len(self._folder) == 0: ret = self._folder._object_attrs[name] elif len(self._folder): ret = [(not hasattr(x, name), getattr(x, name, None)) for x in self._folder] mask, values = zip(*ret) ret = np.ma.MaskedArray(values) ret.mask = mask else: ret = getattr(instance, name, None) if ret is None or (hasattr(ret, "mask") and np.all(ret.mask)): raise AttributeError except AttributeError as err: # Ok, pass back raise AttributeError(f"{name} is not an Attribute of {type(self)} or {type(instance)}") from err return ret def __setattr__(self, name: str, value: Any): """Proxy call to set an attribute. Setting the attrbute on .each sets it on all instantiated objects and in _object_attrs. Args: name(str): Attribute to set value (any): Value to set Notes: If *name* is not present on the empty member instance, then the first member of the folder is checked as well. This allows the attributes of a :py:class:`Stoner.Data` object that derive from the Lpy:attr:`Stoner.Data.setas` attribute (such as *.x*, *.y* or *.e* etc) can be accessed. If *value* is iterable and the same length as the folder, then each element in the folder is loaded and the corresponding element of *value* is assigned to the attribute of the member. """ if hasattr(type(self), name) or name.startswith("_"): # Handle setting our own attributes super().__setattr__(name, value) elif name in dir(self._folder.instance) or ( len(self._folder) and hasattr(self._folder[0], name) ): # This is an instance attribute if isiterable(value) and len(value) == len(self._folder): force_load = True else: force_load = False self._folder._object_attrs[name] = value # Add to attributes to be set on load value = [value] * len(self._folder) for d, v in zip(self._folder.__names__(), value): # And set on all instantiated objects if force_load or isinstance(self._folder.__getter__(d, instantiate=False), self._folder.type): d = self._folder.__getter__(d) setattr(d, name, v) else: raise AttributeError(f"Unknown attribute {name}") def __getattr_proxy(self, item: str) -> Callable: """Make a prpoxy call to access a method of the metadataObject like types. Args: item (string): Name of method of metadataObject class to be called Returns: Either a modifed copy of this objectFolder or a list of return values from evaluating the method for each file in the Folder. """ meth = getattr(self._folder.instance, item, None) @wraps(meth) def _wrapper_(*args, **kargs): """Wrap a call to the metadataObject type for magic method calling. Keyword Arguments: _return (index types or None): specify to store the return value in the individual object's metadata Note: This relies on being defined inside the enclosure of the objectFolder method so we have access to self and item """ kargs["_byname"] = True return self(item, *args, **kargs) # Develove to self.__call__ where we have multiprocess magic # Ok that's the wrapper function, now return it for the user to mess around with. return _wrapper_ def __rmatmul__(self, other: Callable) -> Callable: """Implement callable@DataFolder as a generic iterate a function over DataFolder members. Returns: An object that supports __call__ and knows about this DataFolder. """ if not callable(other): return NotImplemented @wraps(other) def _wrapper_(*args, **kargs): """Wrap a call to the metadataObject type for magic method calling. Keyword Arguments: _return (index types or None): specify to store the return value in the individual object's metadata Note: This relies on being defined inside the enclosure of the objectFolder method so we have access to self and item """ kargs["_byname"] = False # Force the __call__ to use the callable function return self(other, *args, **kargs) # Delegate to self.__call__ which has multiprocess magic. # Ok that's the wrapper function, now return it for the user to mess around with. return _wrapper_
[docs] def iter(self, func: Union[str, Callable], *args, **kargs) -> Any: """Iterate over the baseFolder, calling func on each item. Args: func (str, callable): A Callable object that must take a metadataObject type instance as it's first argument. Keyword Args: _return (None, bool or str): Controls how the return value from *func* is added to the DataFolder _byname (bool): Whether to look func up as the name of a function. Defaults to True if func is a string. _mode (str): Whether to iterate using a parallel iteration scheme. Possible values are: "serial","SingleProcess": In the same process as the main script "ThreadPool": Uses a concurrent.futures ThreadPool "ProcessPool": Uses a concurrent.futures ProcessPool "Dask": Uses a dask.distributed.Client to distribute the task over an DASK cluster. Returns: A list of the results of evaluating *func* for each item in the folder. Notes: If *_return* is None and the return type of *func* is the same type as the :py:class:`baseFolder` is storing, then the return value replaces trhe original :py:class:`Stoner.Core.metadataobject` in the :py:class:`baseFolder`. If *_result* is True the return value is added to the :py:class:`Stoner.Core.metadataObject`'s metadata under the name of the function. If *_result* is a string. then return result is stored in the corresponding name. """ _return = kargs.pop("_return", None) _byname = kargs.pop("_byname", isinstance(func, string_types)) _model = kargs.pop("_mode", "serial") if _model.lower() not in ["serial", "singleprocess", "threadpool", "processpool", "dask"]: raise ValueError(f"Unknown folder iteration model {_model}") self._folder.fetch() # Prefetch thefolder in case we can do it in parallel executor = get_pool(folder=self._folder, _model=_model) for ix, (new_d, ret) in enumerate( executor.map(partial(_worker, _func=func, args=args, kargs=kargs, byname=_byname), self._folder) ): if self._folder.debug: print(ix, type(ret)) if isinstance(ret, self._folder._type) and _return is None: try: # Check if ret has same data type, otherwise will not overwrite well if ret.data.dtype != new_d.data.dtype: return ret new_d = ret except AttributeError: pass elif _return is not None: if isinstance(_return, bool) and _return: _return = func.__name__ new_d[_return] = ret name = self._folder.__names__()[ix] self._folder.__setter__(name, new_d) yield ret