Source code for sdt.flatfield

# SPDX-FileCopyrightText: 2020 Lukas Schrangl <lukas.schrangl@tuwien.ac.at>
#
# SPDX-License-Identifier: BSD-3-Clause

"""Flat-field correction
=====================

The intensity cross section of a laser beam is usually not flat, but of
Gaussian shape or worse. That means that fluorophores closer to the edges of
the image will appear dimmer simply because they do not receive so much
exciting laser light.

A camera's pixels may differ in efficiency so that even a homogeneously
illuminated sample will not look homogeneous.

Errors as these can corrected for to some extent by *flat-field correction*.
The error is determined by recording a supposedly homogeneous (flat) sample
(e.g. a homogeneously fluorescent surface) and can later be removed from
other/real experimental data.


Examples
--------

First, load images that show the error, e.g. images of a surface with
homogenous fluorescence label.

>>> baseline = 200.  # camera baseline
>>> # Load homogenous sample image sequences
>>> img1 = io.ImageSequence("flat1.tif").open()
>>> img2 = io.ImageSequence("flat2.tif").open()

These can now be used to create the :py:class:`Corrector` object:

>>> corr = Corrector([img1, img2], bg=baseline)

With help of ``corr``, other images or even whole image sequences (when using
:py:class:`io.ImageSequence` to load them) can now be corrected.

>>> img3 = io.ImageSequence("experiment_data.tif").open()
>>> img3_flat = corr(img3)

The brightness values of single molecule data may be corrected as well:

>>> sm_data = io.load("data.h5")
>>> sm_data_flat = corr(sm_data)


Programming reference
---------------------
.. autoclass:: Corrector
    :members:
    :special-members: __call__
"""
import numbers
from pathlib import Path
from typing import (BinaryIO, Dict, List, Mapping, Optional, Sequence, Tuple,
                    Union)
import warnings

import pandas as pd
import numpy as np
import scipy.interpolate as sp_int
from scipy import stats, optimize, ndimage

from . import config, funcs, io, optimize as sdt_opt
from .helper import pipeline, Slicerator


def _fit_result_to_list(r: Union[Mapping[str, float], None],
                        no_offset: bool = False) -> List[float]:
    """Flatten fit result dict to list

    Parameters
    ----------
    r
        Fit results. If `None`, return empty list.
    no_offset
        Whether to exclude "offset" from the list. If `False`, it is put at the
        end.

    Returns
    -------
    Dict values or empty list if `r` was `None`.

    See also
    --------
    :py:func:`_fit_result_from_list`
    """
    if r is None:
        return []
    ret = ([r["amplitude"]] + list(r["center"]) + list(r["sigma"]) +
           [r["rotation"]])
    if not no_offset:
        ret.append(r["offset"])
    return ret


def _fit_result_from_list(a: Sequence[float]) -> Union[Dict[str, float], None]:
    """Create fit result dict from list

    Inverts the action of :py:func:`_fit_result_to_list`.

    Parameters
    ----------
    a
        Fit results

    Returns
    -------
    Dict values if list is not empty, else `None`.

    See also
    --------
    :py:func:`_fit_result_to_list`
    """
    if not len(a):
        return None
    return {"amplitude": a[0], "center": a[1:3], "sigma": a[3:5],
            "rotation": a[5], "offset": a[6] if len(a) > 6 else 0}


def _do_fit_g2d(mass: np.ndarray, x: np.ndarray, y: np.ndarray,
                weights: Union[float, np.ndarray] = 1.0) -> Dict[str, float]:
    """Do the LSQ fitting of a 2D Gaussian

    Parameters
    ----------
    mass, x, y
        Brightness values and corresponding x and y coordinates
    weights
        Weights of the data points.

    Returns
    -------
    Fit results
    """
    g = sdt_opt.guess_gaussian_parameters(mass, x, y)
    p = _fit_result_to_list(g, no_offset=True)

    def r_gaussian_2d(params):
        a, cx, cy, sx, sy, r = params
        return np.ravel(
            (mass - funcs.gaussian_2d(
                x, y, a, (cx, cy), (sx, sy), 0, r)) * weights)

    r = optimize.least_squares(
        r_gaussian_2d, p,
        bounds=([0, -np.inf, 0, -np.inf, 0, -np.inf], np.inf))

    return _fit_result_from_list(r.x)


[docs]class Corrector(object): """Flat field correction This works by multiplying features' integrated intensities ("mass") by a position-dependent correction factor. The factor can be calculated from a series of images of a fluorescent surface or from single molecule data. The factor is calculated in case of a fluorescent surface by - Taking the averages of the pixel intensities of the images - If requested, doing a 2D Gaussian fit - Normalizing, so that the maximum of the Gaussian or of the average image (if on Gaussian fit was performed) equals 1.0 or in the case of single molecule data by fitting a 2D Gaussian to their "mass" values. Single molecule density fluctuations are take into account by weighing data points inversely to their density in the fit. The "signal" and "mass" of a feature at position (x, y) are then divided by the value of the Gaussian at the positon (x, y) (or the pixel intensity of the image) to yield a corrected value. Also, image data can be corrected analogously in a pixel-by-pixel fashion. **Images (both for deterimnation of the correction and those to be corrected) need to have their background subtracted for this to work properly.** This can be done by telling the `Corrector` object what the background is (or of course, before passing the data to the `Corrector`.) """ avg_img: np.ndarray """Pixel-wise average image from `data` argument to :py:meth:`__init__`.""" corr_img: np.ndarray """Pixel data used for correction of images. Any image to be corrected is divided pixel-wise by `corr_img`. """ fit_result: Optional[Dict] """If a Gaussian fit was done, this holds the result. Otherwise, it is `None`. """ fit_amplitude: float """If a Gaussian fit was done, this holds the maximum of the Gaussian within the image region. See also the `shape` argument to the constructor. """ bg: Union[float, np.ndarray] """Background to be subtracted from image data.""" @config.set_columns def __init__(self, data: Union[np.ndarray, Sequence[np.ndarray], Sequence[Sequence[np.ndarray]], pd.DataFrame, Sequence[pd.DataFrame]], bg: Union[float, np.ndarray, Sequence[np.ndarray], Sequence[Sequence[np.ndarray]]] = 0.0, gaussian_fit: bool = True, shape: Optional[Tuple[int, int]] = None, density_weight: bool = False, smooth_sigma: float = 0.0, columns: Dict = {}): """Parameters ---------- data Collections of images fluorescent surfaces or single molecule data to use for determining the correction function. bg Background that is subtracted from each image. It is not used on single molecule data. Also sets the :py:attr`bg` attribute. gaussian_fit Whether to fit a Gaussian to the averaged image. This is ignored if `data` is single molecule data. shape If `data` is single molecule data, use this to specify the height and width (in this order) of the image region. It allows for calculation of the correct normalization in case the maximum of the fitted Gaussian does not lie within the region. Furthermore, :py:attr:`avg_img` and :py:attr:`corr_img` can be created from the fit. density_weight If `data` is single molecule data, weigh data points inversely to their density for fitting so that areas with higher densities don't have a higher impact on the result. smooth_sigma If > 0, smooth the image used for flatfield correction. Only applicable if ``gaussian_fit=False``. Other parameters ---------------- columns Override default column names as defined in :py:attr:`config.columns`. Only applicable of `data` are single molecule DataFrames. The relevant names are `coords` and `mass`. That means, if the DataFrames have coordinate columns "x" and "z" and a mass column "alt_mass", set ``columns={"coords": ["x", "z"], "mass": "alt_mass"}``. """ self.avg_img = np.empty((0, 0)) self.corr_img = np.empty((0, 0)) self.fit_result = None self.fit_amplitude = np.NaN self.bg = self._calc_bg(bg, smooth_sigma) if (isinstance(data, pd.DataFrame) or (isinstance(data, np.ndarray) and data.ndim == 2)): data = [data] local_max = None if isinstance(data[0], pd.DataFrame): # Get the beam shape from single molecule brightness values pos_columns = columns["coords"] x = np.concatenate([d[pos_columns[0]].values for d in data]) y = np.concatenate([d[pos_columns[1]].values for d in data]) mass = np.concatenate([d[columns["mass"]].values for d in data]) fin = np.isfinite(mass) x = x[fin] y = y[fin] mass = mass[fin] if density_weight: weights = stats.gaussian_kde([x, y])([x, y]) weights = np.sqrt(weights.max() / weights) else: weights = 1. self.fit_result = _do_fit_g2d(mass, x, y, weights) if shape is None: self.avg_img = None self.corr_img = None else: y, x = np.indices(shape) self.avg_img = funcs.gaussian_2d(x, y, **self.fit_result) local_max = self.avg_img.max() self.avg_img /= local_max self.corr_img = self.avg_img else: # calculate the average profile image self.avg_img = self._calc_avg_img(data) shape = self.avg_img.shape if gaussian_fit: # do the fitting y, x = np.indices(self.avg_img.shape) self.fit_result = _do_fit_g2d(self.avg_img, x, y) # normalization factor so that the maximum of the Gaussian is 1 self.corr_img = funcs.gaussian_2d(x, y, **self.fit_result) local_max = self.corr_img.max() self.corr_img /= local_max else: if smooth_sigma: self.corr_img = ndimage.gaussian_filter(self.avg_img, smooth_sigma) self.corr_img /= self.corr_img.max() else: self.corr_img = self.avg_img / self.avg_img.max() self._make_interp() self.fit_amplitude = self._calc_fit_amplitude( self.fit_result, shape, local_max) @staticmethod def _calc_fit_amplitude(fit_result: Optional[Mapping], shape: Optional[Tuple[int, int]], local_max: float) -> float: """Calculate the amplitude of the Gaussian fit within the image region Parameters ---------- fit_result 2D Gaussian fit result shape Height and width of image region local_max Value to use if maximum of the Gaussian is outside of the image region (typically ``avg_image.max()``) Returns ------- Amplitude of Gaussian within the image region """ if fit_result is None: return np.NaN if shape is None: warnings.warn("Calculating excitation profile from " "single-molecule data, but no image shape " "specified. Cannot check whether fit maximum " "lies within the image, corrections may increase " "results by a constant factor.") return fit_result["amplitude"] if np.all((0 <= fit_result["center"]) & (fit_result["center"] <= shape[::-1])): # Maximum of the Gaussian within the image return fit_result["amplitude"] # Maximum of the Gaussion not in the image; use # (approximate) maximum within the image return local_max def _make_interp(self) -> sp_int.RegularGridInterpolator: """Create interpolator form :py:attr:`corr_img`""" self.interp = sp_int.RegularGridInterpolator( [np.arange(i) for i in self.corr_img.shape], self.corr_img, bounds_error=False, fill_value=np.NaN) @staticmethod def _calc_bg(data: Union[float, np.ndarray, Sequence[np.ndarray], Sequence[Sequence[np.ndarray]]], smooth_sigma: float) -> Union[float, np.ndarray]: """Calculate the background from input scalar, array, or array sequence If a sequence or sequence of sequences (e.g., multiple :py:class:`io.ImageSequence`) are passed, calculate the mean. If desired, apply a Gaussian filter. Parameters ---------- data Background images (or scalar value) smooth_sigma If > 0, apply a Gaussian blur with given sigma. Returns ------- Background (scalar or array depending on `data` argument) """ if isinstance(data, numbers.Number): # Scalar return data if isinstance(data, np.ndarray) and data.ndim == 2: # 2D array, i.e., single image. ret = data else: summed = None cnt = 0 for seq in data: if isinstance(seq, np.ndarray) and seq.ndim == 2: # seq is a single image, turn it into a sequence seq = [seq] for img in seq: # Sequence of image sequences if summed is None: # convert to float to avoid overflow summed = np.array(img, dtype=float) else: summed += img cnt += 1 ret = summed / cnt if smooth_sigma: return ndimage.gaussian_filter(ret, smooth_sigma) return ret def _normalize_image(self, img: np.ndarray) -> np.ndarray: """Normalize image data Subtract background and divide by maximum so that the new maximum is 1. Parameters ---------- img Image data Returns ------- Normalized image data """ i2 = img.astype(float) - self.bg i2 /= i2.max() return i2 def _calc_avg_img(self, data: Union[Sequence[np.ndarray], Sequence[Sequence[np.ndarray]]] ) -> np.ndarray: """Calculate the average of normalized images Parameters ---------- data Images Returns ------- Average of normalized image """ summed = None cnt = 0 for seq in data: if isinstance(seq, np.ndarray) and seq.ndim == 2: # seq is a single image, turn it into a sequence seq = [seq] for img in seq: # Sequence of image sequences norm_img = self._normalize_image(img) if summed is None: summed = norm_img else: summed += norm_img cnt += 1 ret = summed / cnt return ret
[docs] @config.set_columns def __call__(self, data: Union[pd.DataFrame, Slicerator, np.ndarray], inplace: bool = False, bg: Union[float, np.ndarray, None] = None, columns: Dict = {} ) -> Union[pd.DataFrame, Slicerator, np.ndarray]: """Do brightness correction on `features` intensities Parameters ---------- data data to be processed. If a pandas.Dataframe, correct the "mass" column according to the particle position in the laser beam. Otherwise, :py:class:`pipeline` is used to correct raw image data. inplace Only has an effect if `data` is a DataFrame. If True, the feature intensities will be corrected in place. bg Background to be subtracted from image data. If `None`, use the :py:attr:`bg` attribute. Ignored for single molecule data. Returns ------- If `data` is a DataFrame and `inplace` is False, return the corrected frame. If `data` is raw image data, return corrected images Other parameters ---------------- columns : dict, optional Override default column names as defined in :py:attr:`config.columns`. Only applicable of `data` are single molecule DataFrames. The relevant names are `coords`, `signal`, and `mass`. That means, if the DataFrames have coordinate columns "x" and "z" and a mass column "alt_mass", set ``columns={"coords": ["x", "z"], "mass": "alt_mass"}``. It is also possible to give a list of columns to correct by adding the "corr" key, e.g. ``columns={"corr": ["mass", "alt_mass"]}``. """ if isinstance(data, pd.DataFrame): x, y = columns["coords"] if not inplace: data = data.copy() factors = self.get_factors(data[x], data[y]) corr_cols = columns.get("corr", [columns["mass"], columns["signal"]]) for cc in corr_cols: if cc in data.columns: data[cc] *= factors if not inplace: # copied previously, now return return data else: if bg is None: bg = self.bg @pipeline def corr(img): return (img - bg) / self.corr_img return corr(data)
[docs] def get_factors(self, x: np.ndarray, y: np.ndarray) -> np.ndarray: """Get correction factors at positions x, y Depending on whether gaussian_fit was set to True or False in the constructor, the correction factors for each feature that described by an x and a y coordinate is calculated either from the Gaussian fit or the average image itself. Parameters ---------- x, y x and y coordinates of features Returns ------- numpy.ndarray 1D array of correction factors corresponding to the features """ if self.fit_result: return 1. / (funcs.gaussian_2d(x, y, **self.fit_result) / self.fit_amplitude) else: return np.transpose(1. / self.interp(np.array([y, x]).T))
[docs] def save(self, file: Union[str, Path, BinaryIO]): """Save instance to disk Parameters ---------- file Where to save. If `str`, the extension ".npz" will be appended if it is not present. """ np.savez_compressed( file, avg_img=self.avg_img, corr_img=self.corr_img, fit_result=_fit_result_to_list(self.fit_result), fit_amplitude=self.fit_amplitude, bg=self.bg)
def to_hdf(self, path_or_buf: Union[str, Path, BinaryIO], key: str = "flatfield", mode: str = "a"): d = {"avg_img": self.avg_img, "corr_img": self.corr_img, "fit_result": np.array(_fit_result_to_list(self.fit_result)), "fit_amplitude": np.array(self.fit_amplitude), "bg": np.atleast_2d(self.bg)} io.arrays_to_hdf(path_or_buf, d, key_prefix=f"{key}/", resizable=True, mode=mode)
[docs] @classmethod def load(cls, file: Union[str, Path, BinaryIO]): """Load from disk Parameters ---------- file Where to load from Returns ------- sdt.flatfield.Corrector Loaded instance """ with np.load(file) as ld: ret = cls([ld["avg_img"]], gaussian_fit=False) ret.corr_img = ld["corr_img"] ret.fit_result = _fit_result_from_list(ld["fit_result"]) bg = ld["bg"] ret.bg = bg if bg.size > 1 else bg.item() # fit_amplitude may not be present in old save files if "fit_amplitude" in ld: ret.fit_amplitude = ld["fit_amplitude"].item() elif ret.fit_result: ret.fit_amplitude = ret.fit_result["amplitude"] else: ret.fit_amplitude = np.NaN ret._make_interp() return ret
@classmethod def read_hdf(cls, path_or_buf: Union[str, Path, BinaryIO], key: str = "flatfield", mode: str = "r"): import h5py opened = False hf = path_or_buf try: if isinstance(path_or_buf, (str, Path)): hf = h5py.File(path_or_buf, mode) opened = True ret = cls(hf[f"{key}/avg_img"][...], gaussian_fit=False) ret.corr_img = hf[f"{key}/corr_img"][...] ret.fit_result = _fit_result_from_list( hf[f"{key}/fit_result"][...]) bg = hf[f"{key}/bg"][...] ret.bg = bg if bg.size > 1 else bg.item() ret.fit_amplitude = hf[f"{key}/fit_amplitude"][...].item() finally: if opened: hf.close() ret._make_interp() return ret