# SPDX-FileCopyrightText: 2020 Lukas Schrangl <lukas.schrangl@tuwien.ac.at>
#
# SPDX-License-Identifier: BSD-3-Clause
from contextlib import suppress
from pathlib import Path
from typing import (BinaryIO, Callable, Dict, Literal, Mapping, Optional,
Sequence, Tuple, TypeVar, Union, overload)
import cv2
import matplotlib as mpl
import numpy as np
import pandas as pd
import scipy.io
import scipy.ndimage
import scipy.spatial
from .. import config, helper, roi
def _affine_trafo(params: np.ndarray, loc: np.ndarray) -> np.ndarray:
"""Perform an affine transformation
Parameters
----------
params
Transformation matrix of shape (n, n+1) or (n+1, n+1). The top-left
(n, n) block is used as the linear part of the transformation while the
top-right column of n entries specifies the shift.
loc
Array of m n-dimensional coordinate tuples to be transformed.
Returns
-------
Transformed coordinate tuples
"""
ndim = params.shape[1] - 1
return loc @ params[:ndim, :ndim].T + params[:ndim, ndim]
[docs]class Registrator:
"""Registration of two fluorescence microscopy emission channels
This class provides an easy-to-use interface to determine maps between
the channels' coordinates using localization data from fiducial markers. It
is based on the algorithm published by Preibisch et al. [Preibisch2010]_.
Examples
--------
Let's assume that multiple images/sequences of fluorescent beads have been
acquired, which are visible in both emission channels. First, the beads
need to be localized (e.g. using the ``sdt.gui.locator`` application).
These need to be loaded:
>>> bead_loc = [sdt.io.load(b) for b in glob.glob("beads*.h5")]
Next, we define ROIs for the two channels using :py:class:`sdt.roi.ROI` and
choose the bead localizations with respect to the ROIs:
>>> r1 = sdt.roi.ROI((0, 0), size=(200, 100))
>>> r2 = sdt.roi.ROI((0, 200), size=(200, 100))
>>> beads_r1 = [r1(b) for b in bead_loc]
>>> beads_r2 = [r2(b) for b in bead_loc]
Now, calculate the transform that overlays the two channels using
:py:class:`Registrator`:
>>> corr = Registrator(beads_r1, beads_r2)
>>> corr.determine_parameters()
>>> corr.test() # Plot results
This can now be used to transform i.e. image data from channel 1 so that it
can be overlaid with channel 1:
>>> with io.ImageSequence("image.tif") as s:
... img = s[0] # Load first frame (number 0)
>>> img_r1 = r1(img) # Get channel 1 part of the image
>>> img_r2 = r2(img) # Get channel 2 part of the image
>>> img_r1_corr = corr(img_r1, channel=1) # Transform channel 1 image
>>> overlay = numpy.dstack([img_r1, img_r2, np.zeros_like(img_r1)])
>>> matplotlib.pyplot.imshow(overlay) # Plot overlay
Similarly, coordinates of single molecule data can be transformed:
>>> loc = sdt.io.load("loc.h5") # Load data
>>> loc_r1 = r1(loc) # Get data from channel 1
>>> loc_r2 = r2(loc) # Get data from channel 2
>>> loc_r1_corr = corr(loc_r1, channel=1) # Transform channel 1 data
>>> matplotlib.pyplot.scatter(loc_r1_corr["x"], loc_r1_corr["y"],
... marker="+")
>>> matplotlib.pyplot.scatter(loc_r2["x"], loc_r2["y"], marker="x")
There is also support for saving and loading a :py:class:`Registrator`
instance to/from YAML:
>>> with open("output.yaml", "w") as f:
>>> sdt.io.yaml.safe_dump(corr, f)
>>> with open("output.yaml", "r") as f:
>>> corr_loaded = sdt.io.yaml.safe_load(f)
References
----------
.. [Preibisch2010] Preibisch, S.; Saalfeld, S.; Schindelin, J. & Tomancak,
P.: "Software for bead-based registration of selective plane
illumination microscopy data", Nature Methods, Springer Science and
Business Media LLC, 2010, 7, 418–419
"""
yaml_tag = "!Registrator"
feat1: Sequence[pd.DataFrame]
"""Positions of beads (as found by a localization algorithm) in the first
channel. Each DataFrame corresponds to one image (sequence), thus multiple
bead images can be used to increase the accuracy.
"""
feat2: Sequence[pd.DataFrame]
"""Same as :py:attr:`feat1`, but for the second channel"""
columns: Dict
"""Column names in :py:attr:`feat1` and :py:attr:`feat2`. Defaults are
taken from :py:attr:`config.columns`.
"""
channel_names: Sequence[str]
"""Channel names"""
pairs: pd.DataFrame
"""Pairs found by :py:meth:`determine_parameters`."""
parameters1: np.ndarray
"""Array describing the affine transformation of data from channel 1 to
channel 2.
"""
parameters2: np.ndarray
"""Array describing the affine transformation of data from channel 2 to
channel 1.
"""
@config.set_columns
def __init__(self,
feat1: Optional[Union[Sequence[pd.DataFrame],
pd.DataFrame]] = None,
feat2: Optional[Union[Sequence[pd.DataFrame],
pd.DataFrame]] = None,
columns: Dict = {},
channel_names: Sequence[str] = ["channel1", "channel2"]):
"""Parameters
----------
feat1, feat2
Set the `feat1` and `feat2` attribute (turning it into a list
if it is a single DataFrame). Can also be `None`, but in this case
:py:meth:`find_pairs` and :py:meth:`determine_parameters` will
not work. Defaults to `None`.
Other parameters
----------------
channel_names
Set the `channel_names` attribute.
columns
Override default column names as defined in
:py:attr:`config.columns`. Relevant name are `coords` and `time`.
That means, if the DataFrames have coordinate columns "x" and "z",
and a time column "alt_frame", set
``columns={"coords": ["x", "z"], "time": "alt_frame"}``. This is
used to set the :py:attr:`columns` attribute.
"""
self.feat1 = [feat1] if isinstance(feat1, pd.DataFrame) else feat1
self.feat2 = [feat2] if isinstance(feat2, pd.DataFrame) else feat2
self.columns = columns
self.channel_names = channel_names
self.pairs = None
self.parameters1 = np.eye(len(columns["coords"]) + 1)
self.parameters2 = np.eye(len(columns["coords"]) + 1)
[docs] def determine_parameters(self, n_neighbors: int = 3,
ambiguity_factor: float = 5.0,
max_error: float = 1.0):
"""Determine the parameters for the affine transformation
This takes the localizations of :py:attr:`feat1` and tries to match
them with those of :py:attr:`feat2`. Then a fit is used to determine
the affine transformation between the channels.
This is a convenience function that calls :py:meth:`find_pairs` and
:py:meth:`fit_parameters`; see the documentation of the methods for
details.
Parameters
----------
n_neighbors
Number of neighboring beads to consider to find matching features
across channels.
ambiguity_factor
A low value (around 1) will accept pairs even if there are similar
possible matches. The higher this value, the less are ambigous
results accepted.
max_error
Maximum error (i.e., distance between transformed position from
channel 1 and matched position in channel 2) to consider a feature
pair not an outlier and thus remove it from the transformation fit.
"""
self.find_pairs(n_neighbors, ambiguity_factor)
self.fit_parameters(max_error)
@overload
def __call__(self, data: pd.DataFrame, channel: Union[int, str],
inplace: Literal[True], mode: str,
cval: Union[float, Callable[[np.ndarray], float]],
columns: Mapping): ...
@overload
def __call__(self, data: pd.DataFrame, channel: Union[int, str],
inplace: Literal[False], mode: str,
cval: Union[float, Callable[[np.ndarray], float]],
columns: Mapping) -> pd.DataFrame: ...
@overload
def __call__(self, data: Union[helper.Slicerator, helper.Pipeline],
channel: Union[int, str], inplace: Literal[False], mode: str,
cval: Union[float, Callable[[np.ndarray], float]],
columns: Mapping) -> helper.Pipeline: ...
ImageOrROI = TypeVar("ImageOrROI",
bound=Union[roi.PathROI, np.ndarray])
[docs] @config.set_columns
def __call__(self,
data: ImageOrROI,
channel: Union[int, str], inplace: bool = False,
mode: str = "constant",
cval: Union[float, Callable[[np.ndarray], float]] = 0.0,
columns: Mapping = {}) -> ImageOrROI:
"""Apply transformation to data
This can be done either on coordinates (e. g. resulting from feature
localization) or directly on raw images.
Parameters
----------
data
Data to be processed. If a pandas.Dataframe, the feature
coordinates will be corrected. Otherwise,
:py:class:`sdt.helper.Pipeline` is used to correct image data using
an affine transformation.
channel
If `features` are in the first channel (corresponding to the
`feat1` arg of the constructor), set to 1 or the first channel's
name. If features are in the second channel, set to 2 or the
second channel's name. Depending on this, a transformation will
be applied to the coordinates of `features` to match the other
channel (mathematically speaking depending on this parameter
either the "original" transformation or its inverse are applied.)
inplace
Only has an effect if `data` is a DataFrame. If True, the
feature coordinates will be corrected in place.
mode
How to fill points outside of the uncorrected image boundaries.
Possibilities are "constant", "nearest", "reflect" or "wrap".
cval
What value to use for `mode="constant"`. If this is callable, it
should take a single argument (the uncorrected image) and return a
scalar, which will be used as the fill value.
Other parameters
----------------
columns
Override default column names in case `data` is a
:py:class:`pandas.DataFrame`. The only relevant name is `coords`.
That means, if the DataFrame has coordinate columns "x" and "z",
set ``columns={"coords": ["x", "z"]}``.
"""
with suppress(ValueError):
channel = self.channel_names.index(channel) + 1
if channel not in (1, 2):
valid = [1, 2, *self.channel_names[:2]]
raise ValueError(
"channel has to be one of "
f"{', '.join(str(v) for v in valid)}")
pos_columns = columns["coords"]
if isinstance(data, pd.DataFrame):
if not inplace:
data = data.copy()
loc = data[pos_columns].values
par = getattr(self, f"parameters{channel}")
data[pos_columns] = _affine_trafo(par, loc)
if not inplace:
# copied previously, now return
return data
elif isinstance(data, roi.PathROI):
t = mpl.transforms.Affine2D(
getattr(self, f"parameters{channel}"))
return roi.PathROI(t.transform_path(data.path),
buffer=data.buffer,
no_image=(data.image_mask is None))
else:
parms = getattr(self, f"parameters{2 if channel == 1 else 1}")
@helper.pipeline
def corr(img):
# transpose image since matrix axes are reversed compared to
# coordinate axes
img_t = img.T
if callable(cval):
cv = cval(img)
else:
cv = cval
# this way, the original subclass of np.ndarray is preserved
ret = np.empty_like(img_t)
scipy.ndimage.affine_transform(
img_t, parms[:-1, :-1], parms[:-1, -1], output=ret,
mode=mode, cval=cv)
return ret.T # transpose back
return corr(data)
[docs] def test(self, ax: Optional[Sequence] = None):
"""Test validity of the correction parameters
This plots the affine transformation functions and the coordinates of
the pairs that were matched in the channels. If everything went well,
the dots (i. e. pair coordinates) should lie on the line
(transformation function).
Parameters
----------
ax
Two matplotib axes instances for plotting. If `None`, allocate new
axes using :py:func:`matplotlib.pyplot.subplots`.
"""
import matplotlib.pyplot as plt
if ax is None:
grid_kw = dict(width_ratios=(1, 2))
fig, ax = plt.subplots(1, 2, gridspec_kw=grid_kw)
else:
fig = None
c1 = self.pairs[self.channel_names[0]]
c1_corr = self(c1, channel=1, columns=self.columns)
diff1 = (c1_corr[self.columns["coords"]] -
self.pairs[self.channel_names[1]][self.columns["coords"]])
diff1 = np.sqrt(np.sum(diff1**2, axis=1))
c2 = self.pairs[self.channel_names[1]]
ax[0].hist(diff1, bins=20)
ax[0].set_title("Error")
ax[0].set_xlabel("distance")
ax[0].set_ylabel("# data points")
x_col, y_col = self.columns["coords"]
ax[1].scatter(c1_corr[x_col], c1_corr[y_col], marker="x", color="blue")
ax[1].scatter(c2[x_col], c2[y_col], marker="+", color="red")
ax[1].set_aspect(1, adjustable="datalim")
ax[1].set_title("Overlay")
ax[1].set_xlabel("x")
ax[1].set_ylabel("y")
if fig is not None:
# Figure was created here, so it is safe to do this
fig.tight_layout()
[docs] def save(self, file: Union[BinaryIO, str, Path], fmt: str = "npz",
key: Tuple[str, str] = ("chromatic_param1", "chromatic_param2")):
"""Save transformation parameters to file
Parameters
----------
file
File name or an open file-like object to save data to.
fmt
Format to save data. Either numpy ("npz") or MATLAB ("mat").
Defaults to "npz".
Other parameters
----------------
key
Names of two transformations in the saved file.
"""
vardict = {key[0]: self.parameters1, key[1]: self.parameters2}
if fmt == "npz":
np.savez(file, **vardict)
elif fmt == "mat":
scipy.io.savemat(file, vardict)
else:
raise ValueError("Unknown format: {}".format(fmt))
[docs] @classmethod
def load(cls, file: Union[BinaryIO, str, Path], fmt: str = "npz",
key: Tuple[str, str] = ("chromatic_param1", "chromatic_param2")
) -> "Registrator":
"""Read paramaters from a file and construct a `Registrator`
Parameters
----------
file
File name or an open file-like object to load data from.
fmt
Format to save data. Either numpy ("npz") or MATLAB ("mat") or
`determine_shiftstretch`'s wrp ("wrp"). Defaults to "npz".
Returns
-------
A class instance with the parameters read from the file.
Other parameters
----------------
key
Name of the variables in the saved file (does not apply to "wrp").
Defaults to ("chromatic_param1", "chromatic_param2").
"""
corr = cls(None, None)
if fmt == "npz":
npz = np.load(file)
corr.parameters1 = npz[key[0]]
corr.parameters2 = npz[key[1]]
elif fmt == "mat":
mat = scipy.io.loadmat(file)
corr.parameters1 = mat[key[0]]
corr.parameters2 = mat[key[1]]
elif fmt == "wrp":
d = scipy.io.loadmat(file, squeeze_me=True,
struct_as_record=False)["S"]
corr.parameters2 = np.array([[d.k1, 0., d.d1],
[0., d.k2, d.d2],
[0., 0., 1.]])
corr.parameters1 = np.linalg.inv(corr.parameters2)
else:
raise ValueError("Unknown format: {}".format(fmt))
return corr
@staticmethod
def _calc_bases(coords: np.ndarray, idx: np.ndarray) -> np.ndarray:
"""Create orthogonal coordinate system for each bead
First basis vector is from the bead to the furthest of the n_neighbors
neighbors, second is to the second-furthest minus the projection onto
the first basis, and so on (Gram-Schmidt process).
Parameters
----------
coords
Row-wise coordinates of the beads
idx
Row-wise indices of the nearest neighbors, sorted ascendingly by
distance (i.e., the j-th entry in the i-th row is the index of
the j-th nearest neighbor of bead i).
Returns
-------
3D array where [i, :, :] yields the matrix of basis column vectors for
the i-th bead.
"""
coords = np.asarray(coords, dtype=float)
n_dim = coords.shape[1]
# bases = []
bases = np.empty((len(coords), n_dim, n_dim))
# Squared lengths of basis vectors
base_len_sq = []
for n in range(n_dim):
# vector from bead to n-th furthest neighbor
vec = coords[idx[:, -(n+1)]] - coords
for i, bl in enumerate(base_len_sq):
# subtract projection onto previously calculated
# axes
b = bases[..., i]
vec -= (np.sum(vec * b, axis=1) / bl)[:, None] * b
bases[:, :, n] = vec
base_len_sq.append(np.sum(vec * vec, axis=1))
return bases
@staticmethod
def _calc_local_coords(coords: np.ndarray, n_neighbors: int) -> np.ndarray:
"""Calculate coordinates of neighboring beads in local coordinates
Create a coordinate system for each bead using :py:meth:`_calc_bases`
and compute the coordinates of the neighboring beads.
Parameters
----------
coords
Row-wise coordinates of the beads
n_neighbors
Number of neighboring beads to consider
Returns
-------
3D array where [i, :, :] yields the matrix of coordinate column vectors
of the neighbors of the i-th bead.
"""
kd = scipy.spatial.cKDTree(coords)
# No need to return nearest neighbor (k=1) since this is
# the bead itself. Counting starts at 1.
dist, idx = kd.query(coords, k=range(2, n_neighbors + 2))
# Solve linear equation system to calculate coordinates in
# the new basis described by axes.
# Left hand side is the coordinate transform from new basis
# to cartesian. First index is bead number, last to indices
# give the transformation matrix.
bases = __class__._calc_bases(coords, idx)
# Right hand side are vectors from bead (first index) to
# its neighbors
rhs = coords[idx] - coords[:, None, :]
# Swap last and second-to last indices to produce row
# vectors
rhs = np.moveaxis(rhs, -2, -1)
# Solve system
local_coords = np.linalg.solve(bases, rhs)
return local_coords
@staticmethod
def _signatures_from_local_coords(local_coords: np.ndarray,
triu: np.ndarray) -> np.ndarray:
"""Extract bead signatures from local coordinates
Due to the construction of the basis vectors, the furthest neighbor
will always have coordinates [1., 0., …, .0], the will have
[x, 1., 0, …, 0.], and so on. Remove these zeros and ones values,
i.e., get the values from the upper triangle of the flipped matrix as
the signature.
Parameters
----------
local_coords
3D array where [i, :, :] yields the matrix of coordinate column
vectors of the neighbors of the i-th bead (see
:py:meth:`_calc_local_coords`).
triu
Indices of the upper triangular matrix shifted by one. Generate
using ``np.triu_indices(n=n_dim, m=n_neighbors, k=1)``. This is
passed as an argmuent so it can be created once and reused.
Returns
-------
2D array where [i, :] yields the signature of the i-th bead.
"""
return local_coords[..., ::-1][:, triu[0], triu[1]]
@staticmethod
def _pairs_from_signatures(coords: Sequence[np.ndarray],
signatures: Sequence[np.ndarray],
ambiguity_factor: float) -> Tuple[np.ndarray]:
"""Find pairs from signatures
Find the nearest neighbors in signature space
Parameters
----------
coords
Matrix of bead coordinate row vectors for each channel
signatures
Matrix of row-wise bead signatures for each channel
ambiguity_factor
Accept only matches where the distance (in signature space) between
the beads of the second best match is at least ``ambiguity_factor``
times as large as the distance for the best match.
Returns
-------
Matrices of matched bead coordinate row vectors for each channel. The
i-th row of the first matrix matches the i-th row of the second.
"""
unambiguous = []
matches = []
# Find pairs and check for ambiguity both ways. Otherwise there will
# be problems if there are two candidates in one channel for one in the
# other depending on which is channel 1 and which is channel 2
for src, dest in (0, 1), (1, 0):
kd = scipy.spatial.cKDTree(signatures[src])
match_dist, match_idx = kd.query(signatures[dest], k=2)
u = match_dist[:, 1] > ambiguity_factor * match_dist[:, 0]
unambiguous.append(u)
matches.append(match_idx)
unamb_twoway = unambiguous[0] & unambiguous[1][matches[0][:, 0]]
return (coords[0][matches[0][unamb_twoway, 0]],
coords[1][unamb_twoway])
[docs] def find_pairs(self, n_neighbors: int = 3, ambiguity_factor: float = 5.0):
"""Match features of `feat1` with features of `feat2`
Find the geomtric signature for each feature in each channel. Those
with best matching signatures are taken to be the same feature.
Parameters
----------
n_neighbors
Number of neighboring beads to consider for signature calculation.
ambiguity_factor
Accept only matches where the distance (with respect to the
geometric signature) between the beads of the second best match is
at least ``ambiguity_factor`` times as large as the distance for
the best match.
"""
pairs = []
n_dim = len(self.columns["coords"])
# Indices of triangular matrix, used below
triu = np.triu_indices(n=n_dim, m=n_neighbors, k=1)
# At least n_dim neighbors are needed to define local coordinates
n_neighbors = max(n_neighbors, n_dim)
for f1, f2 in zip(self.feat1, self.feat2):
if all(self.columns["time"] in f.columns for f in (f1, f2)):
# If there is a "frame" column, split data according to
# frame number since pairs can only be in the same frame
data = ([f[f[self.columns["time"]] == i] for f in (f1, f2)]
for i in f1[self.columns["time"]].unique())
else:
# If there is no "frame" column, just take everything
data = ((f1, f2),)
for frame_data in data:
if any(len(f) < n_neighbors + 1 for f in frame_data):
# Not enough neighbors
continue
signatures = []
coordinates = []
for f in frame_data:
# Profile says that the code below takes half the
# time compared to f[sel.columns["coords"]].to_numpy()
coords = np.array([f[c].to_numpy()
for c in self.columns["coords"]]).T
coords = coords[np.all(np.isfinite(coords), axis=1)]
lc = self._calc_local_coords(coords, n_neighbors)
s = self._signatures_from_local_coords(lc, triu)
signatures.append(s)
coordinates.append(coords)
# TODO: flip_axes
p = self._pairs_from_signatures(coordinates, signatures,
ambiguity_factor)
pairs.append(np.hstack(p))
pair_cols = [self.channel_names, self.columns["coords"]]
self.pairs = pd.DataFrame(
np.vstack(pairs), columns=pd.MultiIndex.from_product(pair_cols))
[docs] def fit_parameters(self, max_error: float = 1.0):
"""Determine parameters for the affine transformation
An affine transformation is used to map x and y coordinates of `feat1`
to to those of `feat2`. This requires :py:attr:`pairs` to be set, e.g.
by running :py:meth:`find_pairs`.
The result is saved as :py:attr:`parameters1` (transform of channel 1
coordinates to channel 2) and :py:attr:`parameters2` (transform of
channel 2 coordinates to channel 1) attributes.
:py:attr:`parameters1` is calculated by determining the affine
transformation between the :py:attr:`pairs` entries using a RANSAC
algorithm. :py:attr:`parameters2` is its inverse. Therefore,
results may be slightly different depending on what is channel1 and
what is channel2.
Parameters
----------
max_error
Maximum error (i.e., distance between transformed position from
channel 1 and matched position in channel 2) to consider a feature
pair not an outlier.
"""
ch1_name = self.channel_names[0]
ch2_name = self.channel_names[1]
loc1 = self.pairs[ch1_name][self.columns["coords"]]
loc2 = self.pairs[ch2_name][self.columns["coords"]]
n_dim = loc1.shape[1]
self.parameters1[:n_dim, :], good_pairs = cv2.estimateAffine2D(
loc1.to_numpy(dtype=np.float32), loc2.to_numpy(dtype=np.float32),
method=cv2.RANSAC, ransacReprojThreshold=max_error)
self.parameters1[n_dim, :n_dim] = 0.
self.parameters1[n_dim, n_dim] = 1.
self.parameters2 = np.linalg.inv(self.parameters1)
self.pairs = self.pairs[np.squeeze(good_pairs.astype(bool))]
[docs] @classmethod
def to_yaml(cls, dumper, data):
"""Dump as YAML
Pass this as the `representer` parameter to your
:py:class:`yaml.Dumper` subclass's `add_representer` method.
"""
m = (("parameters1", data.parameters1),
("parameters2", data.parameters2))
return dumper.represent_mapping(cls.yaml_tag, m)
[docs] @classmethod
def from_yaml(cls, loader, node):
"""Construct from YAML
Pass this as the `constructor` parameter to your
:py:class:`yaml.Loader` subclass's `add_constructor` method.
"""
m = loader.construct_mapping(node)
ret = cls()
ret.parameters1 = m["parameters1"]
ret.parameters2 = m["parameters2"]
return ret
def __eq__(self, other):
if not isinstance(other, __class__):
return False
return (np.allclose(self.parameters1, other.parameters1) and
np.allclose(self.parameters2, other.parameters2))
with suppress(ImportError):
from ..io import yaml
yaml.register_yaml_class(Registrator)