Source code for Stoner.formats.attocube

# -*- coding: utf-8 -*-
"""Module to work with scan files from an AttocubeSPM running Daisy."""
__all__ = ["AttocubeScan"]
import pathlib
from os import path
from copy import deepcopy
from glob import glob
import re
import importlib

from numpy import genfromtxt, linspace, meshgrid, array, prod
from scipy.interpolate import griddata
from scipy.optimize import curve_fit
import h5py

from ..compat import string_types, bytes2str
from ..core.base import typeHintedDict
from ..Image import ImageStack, ImageFile, ImageArray
from ..HDF5 import HDFFileManager
from ..tools.file import FileManager
from ..core.exceptions import StonerLoadError
from .decorators import register_loader

PARAM_RE = re.compile(r"^([\d\\.eE\+\-]+)\s*([\%A-Za-z]\S*)?$")
SCAN_NO = re.compile(r"SC_(\d+)")


def parabola(X, cx, cy, a, b, c):
    """Parabola in the X-Y plane for levelling an image."""
    x, y = X
    return a * (x - cx) ** 2 + b * (y - cy) ** 2 + c


def plane(X, a, b, c):
    """Plane equation for levelling an image."""
    x, y = X
    return a * x + b * y + c


def _raise_error(openfile, message=""):
    """Raise a StonerLoadError after trying to close file."""
    try:
        raise StonerLoadError(message)
    finally:
        try:
            openfile.close()
        except (AttributeError, TypeError, ValueError, IOError):
            pass


@register_loader(patterns=(".txt", 32), mime_types=("text/plain", 32), name="AttocubeScanParametersFile", what="Data")
def load_attocube_parameters(new_data, filename, *args, **kargs):
    """Load the scan parameters text file as the metadata for a Data File.

    Args:
        root_name (str):
            The scan prefix e.g. SC_###

    Returns:
        new_data:
            The modififed scan stack.
    """
    new_data.filename = filename
    with FileManager(filename, "r") as parameters:
        if not parameters.readline().startswith("Daisy Parameter Snapshot"):
            raise StonerLoadError("Parameters file exists but does not have correct header")
        for line in parameters:
            if not line.strip():
                continue
            parts = [x.strip() for x in line.strip().split(":")]
            key = parts[0]
            value = ":".join(parts[1:])
            units = PARAM_RE.match(value)
            if units and units.groups()[1]:
                key += f" [{units.groups()[1]}]"
                value = units.groups()[0]
            new_data[key] = value
    return new_data


class AttocubeScanMixin:

    """Provides the specialist methods for dealing with Attocube SPM scan files.

    See :py:class:`AttocubeScan` for details."""

    def __init__(self, *args, **kargs):
        """Construct the attocube subclass of ImageStack."""
        args = list(args)
        if len(args) > 0:
            for ix, arg in enumerate(args):
                if isinstance(arg, pathlib.PurePath):
                    args[ix] = str(arg)
        if len(args) > 0 and isinstance(args[0], string_types):
            root_name = args.pop(0)
            scan = SCAN_NO.search(root_name)
            if scan:
                scan = int(scan.groups()[0])
            else:
                scan = -1
        elif len(args) > 0 and isinstance(args[0], int):
            scan = args.pop(0)
            root_name = f"SC_{scan:03d}"
        else:
            root_name = kargs.pop("root", None)
            scan = kargs.pop("scan", -1)

        regrid = kargs.pop("regrid", False)

        super().__init__(*args, **kargs)

        self._common_metadata = typeHintedDict()

        if root_name:
            self._load_files(root_name, regrid)

        self.scan_no = scan

        self._common_metadata["Scan #"] = scan

        self.compression = "gzip"
        self.compression_opts = 6

    def __clone__(self, other=None, attrs_only=False):
        """Do whatever is necessary to copy attributes from self to other.

        Note:
            We're in the base class here, so we don't call super() if we can't handle this, then we're stuffed!


        """
        other = super().__clone__(other, attrs_only)
        other._common_metadata = deepcopy(self._common_metadata)
        return other

    def __getitem__(self, name):
        """Allow scans to be got by channel name as well as normal indexing."""
        if isinstance(name, string_types):
            for ix, ch in enumerate(self.channels):
                if name in ch:
                    return self[ix]
        return super().__getitem__(name)

    @property
    def channels(self):
        """Get the list of channels saved in the scan."""
        if len(self) > 0:
            return self.metadata.slice("display", values_only=True)
        return []

    def _load(self, filename, *args, **kargs):
        """Load data from a hdf5 file.

        Args:
            h5file (string or h5py.Group):
                Either a string or an h5py Group object to load data from

        Returns:
            itself after having loaded the data
        """
        if filename is None or not filename:
            self.get_filename("r")
            filename = self.filename
        else:
            self.filename = filename
        with HDFFileManager(self.filename, "r") as f:
            if "type" not in f.attrs:
                _raise_error(f, message="HDF5 Group does not specify the type attribute used to check we can load it.")
            typ = bytes2str(f.attrs["type"])
            if typ != type(self).__name__ and "module" not in f.attrs:
                _raise_error(
                    f,
                    message=f"HDF5 Group is not a {type(self).__name__} and does not specify a module to use to load.",
                )
            loader = None
            if typ == type(self).__name__:
                loader = getattr(type(self), "read_hdf5")
            else:
                mod = importlib.import_module(bytes2str(f.attrs["module"]))
                cls = getattr(mod, typ)
                loader = getattr(cls, "read_hdf5")
            if loader is None:
                raise StonerLoadError("Could not et loader for {bytes2str(f.attrs['module'])}.{typ}")

        return loader(f, *args, **kargs)

    def _instantiate(self, idx):
        """Reconstructs the data type."""
        r, c = self._sizes[idx]
        if issubclass(
            self.type, ImageArray
        ):  # IF the underlying type is an ImageArray, then return as a view with extra metadata
            tmp = self._stack[:r, :c, idx].view(type=self.type)
        else:  # Otherwise it must be something with a data attribute
            tmp = self.type()  # pylint: disable=E1102
            tmp.data = self._stack[:r, :c, idx]
        tmp.metadata = deepcopy(self._common_metadata)
        tmp.metadata.update(self._metadata[self.__names__()[idx]])
        tmp.metadata["Scan #"] = self.scan_no
        tmp._fromstack = True
        return tmp

    def _load_files(self, root_name, regrid):
        """Build the image stack from a stack of files.

        Args:
            root_name (str):
                The scan prefix e.g. SC_###
            regrid (bool):
                Use the X and Y positions if available to regrid the data.
        """
        if not path.exists(path.join(self.directory, f"{root_name}-Parameters.txt")):
            return False
        self._load_parameters(root_name)
        for data in glob(path.join(self.directory, f"{root_name}*.asc")):
            if data.endswith("fwd.asc"):
                if "fwd" not in self.groups:
                    self.add_group("fwd")
                target = self.groups["fwd"]
            elif data.endswith("bwd.asc"):
                if "bwd" not in self.groups:
                    self.add_group("bwd")
                target = self.groups["bwd"]
            else:
                target = self
            target._load_asc(data)
        if regrid:
            if "fwd" in self.groups:
                self.groups["fwd"].regrid(in_place=True)
            if "bwd" in self.groups:
                self.groups["bwd"].regrid(in_place=True)
            if not self.groups:
                self.regrid(in_place=True)
        return self

    def _load_parameters(self, root_name):
        """Load the scan parameters text file.

        Args:
            root_name (str):
                The scan prefix e.g. SC_###

        Returns:
            self:
                The modififed scan stack.
        """
        filename = path.join(self.directory, f"{root_name}-Parameters.txt")
        with FileManager(filename, "r") as parameters:
            if not parameters.readline().startswith("Daisy Parameter Snapshot"):
                raise IOError("Parameters file exists but does not have correct header")
            for line in parameters:
                if not line.strip():
                    continue
                parts = [x.strip() for x in line.strip().split(":")]
                key = parts[0]
                value = ":".join(parts[1:])
                units = PARAM_RE.match(value)
                if units and units.groups()[1]:
                    key += f" [{units.groups()[1]}]"
                    value = units.groups()[0]
                self._common_metadata[key] = value
        return self

    def _load_asc(self, filename):
        """Load a single scan file from ascii data."""
        with FileManager(filename, "r") as data:
            if not data.readline().startswith("# Daisy frame view snapshot"):
                raise ValueError(f"{filename} lacked the correct header line")
            tmp = ImageFile()
            for line in data:
                if not line.startswith("# "):
                    break
                parts = [x.strip() for x in line[2:].strip().split(":")]
                key = parts[0]
                value = ":".join(parts[1:])
                units = PARAM_RE.match(value)
                if units and units.groups()[1]:
                    key += f" [{units.groups()[1]}]"
                    value = units.groups()[0]
                tmp.metadata[key] = value
        xpx = tmp["x-pixels"]
        ypx = tmp["y-pixels"]
        metadata = tmp.metadata
        tmp.image = genfromtxt(filename).reshape((xpx, ypx))
        tmp.metadata = metadata
        tmp.filename = tmp["display"]
        self.append(tmp)
        return self

    def _read_signal(self, g):
        """Read a signal array and return a member of the image stack."""
        if "signal" not in g:
            raise StonerLoadError(f"{g.name} does not have a signal dataset !")
        tmp = self.type()  # pylint: disable=E1102
        data = g["signal"]
        if prod(array(data.shape)) > 0:
            tmp.image = data[...]
        else:
            tmp.image = [[]]
        metadata = g.require_group("metadata")
        typehints = g.get("typehints", None)
        if not isinstance(typehints, h5py.Group):
            typehints = {}
        else:
            typehints = typehints.attrs
        for i in sorted(metadata.attrs):
            v = metadata.attrs[i]
            t = typehints.get(i, "Detect")
            if isinstance(v, string_types) and t != "Detect":  # We have typehints and this looks like it got exported
                tmp.metadata[f"{i}{{{t}}}".strip()] = f"{v}".strip()
            else:
                tmp[i] = metadata.attrs[i]
        tmp.filename = path.basename(g.name)
        return tmp

    def regrid(self, **kargs):
        """Regrid the data sets based on PosX and PosY channels.

        Keyword Parameters:
            x_range, y_range (tuple of start, stop, points):
                Range of x-y co-rdinates to regrid the data to. Used as an argument to :py:func:`np.linspace` to
                generate the coordinate
                vector.
            in_place (bool):
                If True then replace the existing datasets with the regridded data, otherwise create a new copy
                of the scan object. Default is False.

        Returns:
            (AttocubeScan):
                Scan object with regridded data. May be the same as the source object if in_place is True.
        """
        if not kargs.get("in_place", False):
            new = self.clone
        else:
            new = self
        try:
            x = self["PosX"]
            y = self["PosY"]
        except KeyError:  # Can't get X and Y data
            return new

        xrange = kargs.pop("x_range", (x[:, 0].max(), x[:, -1].min(), x.shape[1]))
        yrange = kargs.pop("y_range", (y[0].max(), y[-1].min(), y.shape[0]))
        nx, ny = meshgrid(linspace(*xrange), linspace(*yrange))
        for data in self.channels:
            if "PosX" in data or "PosY" in data:
                continue
            z = self[data]
            nz = griddata((x.ravel(), y.ravel()), z.ravel(), (nx.ravel(), ny.ravel()), method="cubic").reshape(
                nx.shape
            )
            new[data].data = nz
        new["PosX"].data = nx
        new["PosY"].data = ny

        return new

    def level_image(self, method="plane", signal="Amp"):
        """Remove a background signla by fitting an appropriate function.

        Keyword Arguments:
            method (str or callable):
                Eirther the name of a fitting function in the global scope, or a callable. *plane* and *parabola*
                are already defined.
            signal (str):
                The name of the dataset to be flattened. Defaults to the Amplitude signal

        Returns:
            (AttocubeScan):
                The current scan object with the data modified.
        """
        if isinstance(method, string_types):
            method = globals()[method]
        if not callable(method):
            raise ValueError("Could not get a callable method to flatten the data")
        data = self[signal]
        ys, xs = data.shape
        X, Y = meshgrid(linspace(-1, 1, xs), linspace(-1, 1, ys))
        Z = data.data
        X = X.ravel()
        Y = Y.ravel()
        Z = Z.ravel()

        popt = curve_fit(method, (X, Y), Z)[0]

        nZ = method((X, Y), *popt)
        Z -= nZ

        data.data = Z.reshape(xs, ys)
        return self

    def to_hdf5(self, filename=None):
        """Save the AttocubeScan to an hdf5 file."""
        mode = ""
        if filename is None:
            filename = path.join(self.directory, f"SC_{self.scan_no:03d}.hdf5")
        if isinstance(filename, pathlib.PurePath):
            filename = str(filename)
        if filename is None or (isinstance(filename, bool) and not filename):  # now go and ask for one
            filename = self.__file_dialog("w")
        if isinstance(filename, string_types):
            mode = "r+" if path.exists(filename) else "w"
        self.filename = filename
        with HDFFileManager(self.filename, mode=mode) as f:
            f.attrs["type"] = type(self).__name__
            f.attrs["module"] = type(self).__module__
            f.attrs["scan_no"] = self.scan_no
            f.attrs["groups"] = list(self.groups.keys())
            f.attrs["channels"] = list(self.channels)
            if "common_metadata" in f.parent and "common_metadata" not in f:
                f["common_metadata"] = h5py.SoftLink(f.parent["common_metadata"].name)
                f["common_typehints"] = h5py.SoftLink(f.parent["common_typehints"].name)
            else:
                metadata = f.require_group("common_metadata")
                typehints = f.require_group("common_typehints")
                for k in self._common_metadata:
                    try:
                        typehints.attrs[k] = self._common_metadata._typehints[k]
                        metadata.attrs[k] = self._common_metadata[k]
                    except TypeError:
                        # We get this for trying to store a bad data type - fallback to metadata export to string
                        parts = self._common_metadata.export(k).split("=")
                        metadata.attrs[k] = "=".join(parts[1:])

            for g in self.groups:  # Recurse to save groups
                grp = f.require_group(g)
                self.groups[g].to_hdf5(grp)

            for ch in self.channels:
                signal = f.require_group(ch)
                data = self[ch]
                signal.require_dataset(
                    "signal",
                    data=data.data,
                    shape=data.shape,
                    dtype=data.dtype,
                    compression=self.compression,
                    compression_opts=self.compression_opts,
                )
                metadata = signal.require_group("metadata")
                typehints = signal.require_group("typehints")
                for k in [x for x in data.metadata if x not in self._common_metadata]:
                    try:
                        typehints.attrs[k] = data.metadata._typehints[k]
                        metadata.attrs[k] = data.metadata[k]
                    except TypeError:
                        # We get this for trying to store a bad data type - fallback to metadata export to string
                        parts = data.metadata.export(k).split("=")
                        metadata.attrs[k] = "=".join(parts[1:])
        return self

    @classmethod
    def read_hdf5(cls, filename, *args, **kargs):
        """Create a new instance from an hdf file."""
        self = cls(regrid=False)
        if filename is None or not filename:
            self.get_filename("r")
            filename = self.filename
        else:
            self.filename = filename
        with HDFFileManager(filename, "r") as f:
            self.scan_no = f.attrs["scan_no"]
            if "groups" in f.attrs:
                sub_grps = f.attrs["groups"]
            else:
                sub_grps = None
            if "channels" in f.attrs:
                channels = f.attrs["channels"]
            else:
                channels = []
            grps = list(f.keys())
            if "common_metadata" not in grps or "common_typehints" not in grps:
                _raise_error(f, message="Couldn;t find common metadata groups, something is not right here!")
            metadata = f["common_metadata"].attrs
            typehints = f["common_typehints"].attrs
            for i in sorted(metadata):
                v = metadata[i]
                t = typehints.get(i, "Detect")
                if (
                    isinstance(v, string_types) and t != "Detect"
                ):  # We have typehints and this looks like it got exported
                    self._common_metadata[f"{i}{{{t}}}".strip()] = f"{v}".strip()
                else:
                    self._common_metadata[i] = metadata[i]
            grps.remove("common_metadata")
            grps.remove("common_typehints")
            if sub_grps is None:
                sub_grps = grps
            for grp in sub_grps:
                if "type" in f[grp].attrs:
                    self.groups[grp] = cls.read_hdf5(f[grp], *args, **kargs)
                    continue
                g = f[grp]
                self.append(self._read_signal(g))
            for grp in channels:
                g = f[grp]
                self.append(self._read_signal(g))
        return self


[docs]class AttocubeScan(AttocubeScanMixin, ImageStack): """An ImageStack subclass that can load scans from the AttocubeScan SPM System. AttocubeScan represents a scan from an Attocube SPM system as a 3D stack of scan data with associated metadata. Indexing the AttocubeScan with either an integer or a partial match to one the signals saved in the scan will pull out that particular scan as a :py:class:`Stoner.Image.ImageFile`. If the scan was a dual pass scan with forwards and backwards data, then the root AttocubeScan will contain the common metadata derived from the Scan parameters and then two sub-stacks that represent the forwards ('fwd') and backwards ('bwd') scans. The AttocubeScan constructor will with take a *rrot name* of the scan - e.g. "SC_099" or alternatively a scan number integer. It will then look in the stack's directory for matching files and builds the scan stack from them. Currently, it uses the scan parameters.txt file and any ASCII stream files .asc files. 2D linescans are not currently supported or imported. The native file format for an AttocubeScan is an HDF5 file with a particilar structure. The stack is saved into an HDF5 group which then has a *type* and *module* attribute that specifies the class and module pf the Python object that created the group - sof for an AttocubeScan, the type attribute is *AttocubeScan*. There is a class method :py:meth:`AttocubeSca.read_hdf5` to read the stack from the HDSF format and an instance method :py:meth:`AttocubeScan.to_hdf` that will save to either a new or existing HDF file format. The class provides other methods to regrid and flatten images and may gain other capabilities in the future. Todo: Implement load and save to/from multipage TIFF files. Attrs: scan_no (int): The scan number as defined in the Attocube software. compression (str): The HDF5 compression algorithm to use when writing files compression_opts (int): The lelbel of compression to use (depends on compression algorithm) """