# SPDX-FileCopyrightText: 2020 Lukas Schrangl <lukas.schrangl@tuwien.ac.at>
#
# SPDX-License-Identifier: BSD-3-Clause
"""Analyze mean square displacements (MSDs)"""
import math
from collections import OrderedDict, namedtuple
from typing import Literal, Tuple
import numpy as np
import pandas as pd
import scipy.optimize
from .. import config
from . import msd_base
[docs]
class Msd:
"""Calculate and analyze mean square displacements (MSDs)
from single moleclue tracking data.
"""
@config.set_columns
def __init__(self, data, frame_rate, n_lag=20, n_boot=100, ensemble=True,
e_name="ensemble", random_state=None, pixel_size=1,
columns={}):
"""Parameters
----------
data : pandas.DataFrame or iterable of pandas.DataFrame
Tracking data. Either a single DataFrame or a collection of
DataFrames.
frame_rate : float
Frame rate
n_lag : int or inf, optional
Number of lag times (time steps) to consider at most. Defaults to
100.
n_boot : int, optional
Number of bootstrapping iterations for calculation of errors.
Set to 0 to turn off bootstrapping for performance gain, in which
case there will be no errors on the fit results. Defaults to 100.
ensemble : bool, optional
Whether to calculate the MSDs for the whole data set or for each
trajectory individually. Defaults to True.
pixel_size : float, optional
Pixel size; multiply coordinates by this factor. Defaults to 1
(no scaling).
Other parameters
----------------
e_name : str, optional
If the `ensemble` parameter is `True`, use this as the name for
the dataset. It shows up in the MSD DataFrame (see
:py:meth:`get_msd`) and in plots. Defaults to "ensemble".
random_state : numpy.random.RandomState or None, optional
:py:class:`numpy.random.RandomState` instance to use for
random sampling for bootstrapping. If `None`, use create a new
instance with ``seed=None``. Defaults to `None`.
columns : dict, optional
Override default column names as defined in
:py:attr:`config.columns`. Relevant names are `coords`, `particle`,
and `time`. This means, if your DataFrame has coordinate columns
"x", "y", and "z" and the time column "alt_frame", set
``columns={"coords": ["x", "y", "z"], "time": "alt_frame"}``.
"""
square_disp = msd_base._all_square_displacements(
data, n_lag, ensemble, e_name, pixel_size, columns)
# Generate bootstrapped data if desired
if n_boot > 1:
if random_state is None:
random_state = np.random.RandomState()
msd_set = OrderedDict()
for p, sds_p in square_disp.items():
msds_p = np.empty((len(sds_p), n_boot))
for i, sd in enumerate(sds_p):
if len(sd) > 0:
b = random_state.choice(sd, (len(sd), n_boot),
replace=True)
m = np.mean(b, axis=0)
else:
# No data for this lag time
m = np.nan
msds_p[i, :] = m
msd_set[p] = msds_p
msds = None # Calculate in `MsdData.__init__`
err = None
else:
msds = OrderedDict([
(p, np.array([np.mean(v) if len(v) > 0 else np.nan
for v in s]))
for p, s in square_disp.items()])
# Use corrected sample std as a less biased estimator of the
# population std
err = OrderedDict([
(p, np.array([np.std(v, ddof=1) / np.sqrt(len(v))
if len(v) > 1 else np.nan
for v in s]))
for p, s in square_disp.items()])
msd_set = OrderedDict(
[(p, m[:, None]) for p, m in msds.items()])
self._msd_data = msd_base.MsdData(frame_rate, msd_set, msds, err)
@classmethod
def _from_data(cls, data, e_name="ensemble"):
"""Create class instance from pre-calculated data
With this, it is possible to create a class instance from legacy
data as created by :py:func:`emsd`.
For legacy interop purposes only.
Parameters
----------
data : pandas.DataFrame
Input data
e_name : str, optional
Name to be given to the input dataset. Defaults to "ensemble".
Returns
-------
class instance
Instance create from `data`
"""
ret = cls([], 1, 1, 0)
if isinstance(data, pd.DataFrame):
if "lagt" in data.columns and "msd" in data.columns:
# old `emsd` function output
msds = data["msd"].values
if "stderr" in data.columns:
err = data["stderr"].values
else:
err = np.full_like(msds, np.nan)
msd_set = OrderedDict([(e_name, msds[:, None])])
msds = OrderedDict([(e_name, msds)])
err = OrderedDict([(e_name, err)])
lt0, lt1 = data["lagt"].iloc[:2]
frame_rate = 1 / (lt1 - lt0)
ret._msd_data = msd_base.MsdData(frame_rate, msd_set, msds,
err)
return ret
raise ValueError("data in unrecognized format")
Result = namedtuple("Result", ["msd", "msd_err"])
[docs]
def get_msd(self, series=True):
"""Get MSD and error DataFrames
The columns contain data for different lag times. Each row corresponds
to one trajectory. The row index is either the particle number if a
single DataFrame was passed to :py:meth:`__init__` or a tuple
identifying both the DataFrame and the particle. If
:py:meth:`__init__` was called with ``ensemble=True``, there will only
be one row with index `e_name`.
The `series` parameter controls whether to return a
:py:class:`pandas.DataFrame` or a :py:class:`pandas.Series` if there is
only one row.
Parameters
----------
series : bool, optional
If `True` and and there is only one entry (e.g., due to
``ensemble=True`` in the constructor), :py:class:`pandas.Series`
objects will be returned. Otherwise, return
:py:class:`pandas.DataFrame`. Defaults to `True`.
Returns
-------
msd : pandas.DataFrame or pandas.Series
Mean square displacements
msd_err : pandas.DataFrame or pandas.Series
Standard errors of the mean square displacements. If
bootstrapping was used, these are the standard deviations of the
MSD results from bootstrapping. Otherwise, these are caleculated
as the standard deviation of square displacements divided by the
number of samples.
"""
return self.Result(self._msd_data.get_data("means", series),
self._msd_data.get_data("errors", series))
[docs]
def fit(self, model="brownian", *args, **kwargs):
"""Fit a model function to the MSD data
Parameters
----------
model : {"anomalous", "brownian"}, optional
Type of model to fit. Defaults to "brownian".
n_lag : int or inf, optional
Number of lag times to use for fitting. Defaults to 2 for the
Brownian model and `inf` for anomalous diffusion.
exposure_time : float, optional
Exposure time. Defaults to 0, i.e. no exposure time compensation
initial : tuple of float, len 3, optional
Initial guess for fitting anomalous diffusion. The tuple entries
are diffusion coefficient, positional accuracy, and alpha.
"""
if not isinstance(model, str):
return model(self._msd_data, *args, **kwargs)
model = model.lower()
if model.startswith("anomalous"):
return AnomalousDiffusion(self._msd_data, *args, **kwargs)
if model.startswith("brownian"):
return BrownianMotion(self._msd_data, *args, **kwargs)
raise ValueError("Unknown model: " + model)
[docs]
class AnomalousDiffusion:
r"""Fit anomalous diffusion parameters to MSD values
Fit a function :math:`msd(t_\text{lag}) = 2 n_\text{dim} D t_\text{lag}^\alpha +
2 n_\text{dim} \epsilon^2`
to the tlag-vs.-MSD graph, where :math:`D` is the diffusion coefficient,
:math:`\epsilon` is the positional uncertainty, and :math:`\alpha` the anomalous
diffusion exponent.
"""
_fit_parameters = ["D", "eps", "alpha"]
def __init__(
self,
msd_data: msd_base.MsdData,
n_lag: int | Literal[math.inf] = math.inf,
exposure_time: float = 0.0,
n_dim: int = 2,
initial: Tuple[float, float, float] = (0.5, 0.05, 1.0),
):
r"""Parameters
----------
msd_data
MSD data
n_lag
Maximum number of lag times to use for fitting. Defaults to
`inf`, i.e. using all.
exposure_time
Exposure time. 0 disables exposure time correction.
n_dim
Number of spatial dimensions
initial
Initial guesses for the fitting for :math:`D`, :math:`\epsilon`,
and :math:`\alpha`.
"""
def residual(x, lagt, target):
d, eps, alpha = x
r = self.theoretical(lagt, d, eps, alpha, exposure_time, n_dim)
return r - target
initial = np.asarray(initial)
self._results = OrderedDict()
self._err = OrderedDict()
for particle, all_m in msd_data.data.items():
lagt = msd_data.get_lagtimes(all_m.shape[0])
r = []
for target in all_m.T:
fin = np.isfinite(target)
tgt = target[fin]
lt = lagt[fin]
nl = min(n_lag, len(tgt))
f = scipy.optimize.least_squares(
residual,
initial,
bounds=([0, -np.inf, 0], [np.inf, np.inf, np.inf]),
kwargs={"lagt": lt[:nl], "target": tgt[:nl]},
)
r.append(f.x)
r = np.array(r)
self._results[particle] = np.mean(r, axis=0)
if r.shape[0] > 1:
# Use corrected sample std as a less biased estimator of the
# population std
self._err[particle] = np.std(r, axis=0, ddof=1)
self._msd_data = msd_data
self.exposure_time = exposure_time
self.n_dim = n_dim
[docs]
@staticmethod
def exposure_time_corr(t, alpha, exposure_time, n=100,
force_numeric=False):
r"""Correct lag times for the movement of particles during exposure
When particles move during exposure, it appears as if the lag times
change according to
.. math:: t_\text{app}^\alpha = \lim_{n\rightarrow\infty} \frac{1}{n^2}
\sum_{m_1 = 0}^{n-1} \sum_{m_2 = 0}^{n-1} |t +
\frac{t_\text{exp}}{n}(m_1 - m_2)|^\alpha -
|\frac{t_\text{exp}}{n}(m_1 - m_2)|^\alpha.
For :math:`\alpha=1`, :math:`t_\text{app} = t - t_\text{exp} / 3`. For
:math:`t_\text{exp} = 0` or :math:`\alpha = 2`,
:math:`t_\text{app} = t`. For other parameter values, the sum is
computed numerically using a sufficiently large `n` (100 by default).
See [Goul2000]_ for details.
Parameters
----------
t : numpy.ndarray
Lag times
alpha : float
Anomalous diffusion exponent
exposure_time : float
Exposure time
n : int, optional
Number of summands for the numeric calculation. The complexity
of the algorithm is O(n²). Defaults to 100.
Returns
-------
numpy.ndarray
Apparent lag times that account for diffusion during exposure
Other parameters
----------------
force_numeric : bool, optional
If True, do not return the analytical solutions for
:math:`\alpha \in \{1, 2\}` and :math:`t_\text{exp} = 0`, but
calculate numerically. Useful for testing.
"""
if not force_numeric:
if (math.isclose(exposure_time, 0) or
math.isclose(exposure_time, 2)):
return t
if math.isclose(alpha, 1):
return t - exposure_time / 3
m = np.arange(n)
m_diff = exposure_time / n * (m[:, None] - m[None, :])
s = (np.abs(t[:, None, None] + m_diff[None, ...])**alpha -
np.abs(m_diff[None, ...])**alpha)
return (np.sum(s, axis=(1, 2)) / n**2)**(1/alpha)
[docs]
@staticmethod
def theoretical(
t: np.ndarray | float,
d: np.ndarray | float,
eps: np.ndarray | float,
alpha: np.ndarray | float = 1.0,
exposure_time: float = 0.0,
n_dim: int = 2,
squeeze_result: bool = True,
) -> np.ndarray | float:
r"""Calculate theoretical MSDs for different lag times
Calculate :math:`msd(t_\text{lag}) = 4 D t_\text{app}^\alpha + 4
\epsilon^2`, where :math:`t_\text{app}` is the apparent time lag which
takes into account particle motion during exposure; see
:py:meth:`exposure_time_corr`.
Parameters
----------
t
Lag times
d
Diffusion coefficient
eps
Positional uncertainty.
alpha
Anomalous diffusion exponent.
exposure_time
Exposure time.
n_dim
Number of spatial dimensions
squeeze_result
If `True`, return the result as a scalar type or 1D array if
possible. Otherwise, always return a 2D array.
Returns
-------
Calculated theoretical MSDs
"""
t = np.atleast_1d(t)
d = np.atleast_1d(d)
eps = np.atleast_1d(eps)
alpha = np.atleast_1d(alpha)
if d.shape != eps.shape or d.shape != alpha.shape:
raise ValueError("`d`, `eps`, and `alpha` should have same shape.")
if t.ndim > 1 or d.ndim > 1 or eps.ndim > 1:
raise ValueError(
"Number of dimensions of `t`, `d`, `eps`, and `alpha` need to be"
"less than 2"
)
ic = 2 * n_dim * eps**2
ic[eps < 0] *= -1
t_corr = np.empty((len(t), len(alpha)), dtype=float)
for i, a in enumerate(alpha):
t_corr[:, i] = AnomalousDiffusion.exposure_time_corr(t, a, exposure_time)
ret = 2 * n_dim * d[None, :] * t_corr ** alpha[None, :] + ic[None, :]
if squeeze_result:
if ret.size == 1:
ret.item() # return scalar
return np.squeeze(ret)
return ret
Result = namedtuple("Result", ["fit", "fit_err"])
[docs]
def get_results(self, series=True):
"""Get fit results
The columns contain fitted parameters. Each row corresponds
to one trajectory. The row index is either the particle number or a
tuple identifying both the DataFrame and the particle; c.f.
:py:class:`Msd`.
If there is only one row, the `series` parameter controls whether to
return a :py:class:`pandas.DataFrame` or a :py:class:`pandas.Series`.
Parameters
----------
series : bool, optional
If `True` and and there is only one entry (e.g., due to
``ensemble=True`` in the :py:class:`Msd` constructor),
:py:class:`pandas.Series` objects will be returned. Otherwise,
return :py:class:`pandas.DataFrame`. Defaults to `True`.
Returns
-------
fit : pandas.DataFrame or pandas.Series
Fit results. Columns are the fit paramaters. Each row represents
one particle.
fit_err : pandas.DataFrame or pandas.Series
Fit results standard errors. If no bootstrapping was performed
for calculation of MSDs, this is all NaNs.
"""
if len(self._results) == 1 and series:
name, res = next(iter(self._results.items()))
series_args = {"name": name, "index": self._fit_parameters}
res = pd.Series(res, **series_args)
if self._err:
err = pd.Series(next(iter(self._err.values())), **series_args)
else:
err = pd.Series(**series_args, dtype=float)
return self.Result(res, err)
res_df = pd.DataFrame(self._results, index=self._fit_parameters).T
if self._err:
err_df = pd.DataFrame(self._err, index=self._fit_parameters).T
else:
err_df = pd.DataFrame(index=res_df.index, columns=res_df.columns,
dtype=float)
for d in (res_df, err_df):
d.columns.name = "parameter"
if isinstance(d.index, pd.MultiIndex):
d.index.names = ("file", "particle")
else:
d.index.name = "particle"
return self.Result(res_df, err_df)
[docs]
def plot(self, show_legend=True, ax=None):
"""Plot lag time vs. MSD with fitted theoretical curve
Parameters
----------
show_legend : bool, optional
Whether to add a legend to the plot. Defaults to `True`.
ax : matplotlib.axes.Axes or None, optional
Axes to use for plotting. If `None`, use ``pyplot.gca()``.
Defaults to `None`.
Returns
-------
matplotlib.axes.Axes
Axes used for plotting
"""
import matplotlib.pyplot as plt
if ax is None:
ax = plt.gca()
ax.set_xlabel("lag time [s]")
ax.set_ylabel("MSD [μm²]")
for p in self._results:
if isinstance(p, tuple):
name = "/".join(str(p2) for p2 in p)
else:
name = str(p)
m = self._msd_data.means[p]
e = self._msd_data.errors[p]
lt = self._msd_data.get_lagtimes(len(m))
eb = ax.errorbar(lt, m, yerr=e, linestyle="none", marker="o",
markerfacecolor="none")
self._plot_single(p, lt[-1], name, ax, eb[0].get_color())
if show_legend:
ax.legend(loc=0)
return ax
@staticmethod
def _value_with_error(name, unit, value, err=np.nan, formatter=".2g"):
"""Write a value with a name, a unit and an error
Parameters
----------
name : str
Value name
unit : str
Physical unit
value : number
Value
err : number, optional
Error of the value. If `NaN`, it is ignored. Defaults to `NaN`.
formatter : str, optional
Formatter for the numbers. Defaults to ".2g".
Returns
-------
str
String of the form "<name>: <value> ± <err> <unit>" if an error
was specified, otherwise "<name>: <value> <unit>".
"""
if not math.isfinite(err):
s = f"{{name}} = {{value:{formatter}}} {{unit}}"
else:
s = (f"{{name}} = {{value:{formatter}}} ± {{err:{formatter}}} "
f"{{unit}}")
return s.format(name=name, value=value, err=err, unit=unit)
def _plot_single(self, data_id, n_lag, name, ax, color):
"""Plot a single theoretical curve
Parameters
----------
data_id
A key in :py:attr:`_results` to plot
n_lag : int
Number of lag times to plot
name : str
Name of the data set given by `data_id` to be printed in the legend
ax : matplotlib.axes.Axes
Axes to use for plotting
color : str
Color of the plotted line
"""
d, eps, alpha = self._results[data_id]
d_err, eps_err, alpha_err = self._err.get(data_id, (np.nan,) * 3)
x = np.linspace(0, n_lag, 100)
y = self.theoretical(x, d, eps, alpha, self.exposure_time, self.n_dim)
legend = []
if name:
legend.append(name)
legend.append(self._value_with_error("D", r"μm²/s$^\alpha$", d, d_err))
legend.append(self._value_with_error(
"eps", "nm", eps * 1000, eps_err * 1000, ".0f"))
legend.append(self._value_with_error("α", "", alpha, alpha_err))
legend = "\n".join(legend)
ax.plot(x, y, c=color, label=legend)
[docs]
class BrownianMotion(AnomalousDiffusion):
r"""Fit Brownian motion parameters to MSD values
Fit a function :math:`\mathit{msd}(t_\text{lag}) = 4 D t_\text{lag} +
4 \epsilon^2` to
the tlag-vs.-MSD graph, where :math:`D` is the diffusion coefficient and
:math:`\epsilon` is the positional accuracy (uncertainty).
"""
_fit_parameters = ["D", "eps"]
def __init__(
self,
msd_data: msd_base.MsdData,
n_lag: int | Literal[math.inf] = 2,
exposure_time: float = 0.0,
n_dim: int = 2,
):
"""Parameters
----------
msd_data
MSD data
n_lag
Maximum number of lag times to use for fitting.
exposure_time
Exposure time. 0 disables exposure time correction.
n_dim
Number of spatial dimensions
"""
self._results = OrderedDict()
self._err = OrderedDict()
for particle, m in msd_data.data.items():
lagt = msd_data.get_lagtimes(m.shape[0])
# Handle NaNs that can appear if a particle/ensemble is
# present only e.g. in every other frame
fin = np.any(np.isfinite(m), axis=1)
m = m[fin, ...]
lagt = lagt[fin]
nl = min(n_lag, m.shape[0])
if nl < 2:
# Too few datapoints
self._results[particle] = [np.nan, np.nan]
if m.shape[1] > 1:
self._err[particle] = [np.nan, np.nan]
continue
if nl == 2:
dt = lagt[1] - lagt[0]
s = (m[1, :] - m[0, :]) / dt
i = m[0, :] - s * (dt - exposure_time / 3)
else:
s, i = np.polyfit(lagt[:nl] - exposure_time / 3, m[:nl, :], 1)
d = s / (2 * n_dim)
eps = np.sqrt(i.astype(complex) / (2 * n_dim))
eps = np.where(i > 0, np.real(eps), -np.imag(eps))
self._results[particle] = [np.mean(d), np.mean(eps)]
if len(d) > 1:
# Use corrected sample std as a less biased estimator of the
# population std
self._err[particle] = [np.std(d, ddof=1), np.std(eps, ddof=1)]
self._msd_data = msd_data
self.exposure_time = exposure_time
self.n_dim = n_dim
[docs]
@staticmethod
def theoretical( # pyright: ignore reportIncompatibleMethodOverride
t: np.ndarray | float,
d: np.ndarray | float,
eps: np.ndarray | float,
exposure_time: float = 0.0,
n_dim: int = 2,
squeeze_result: bool = False,
) -> np.ndarray | float:
r"""Calculate theoretical MSDs for different lag times
Calculate :math:`msd(t_\text{lag}) = 4 D t_\text{app} + 4
\epsilon^2`, where :math:`t_\text{app}` is the apparent time lag
which takes into account particle motion during exposure; see
:py:meth:`exposure_time_corr`.
Parameters
----------
t
Lag times
d
Diffusion coefficient
eps
Positional uncertainty
exposure_time
Exposure time
n_dim
Number of spatial dimensions
squeeze_result
If `True`, return the result as a scalar type or 1D array if
possible. Otherwise, always return a 2D array. Defaults to `True`.
Returns
-------
Calculated theoretical MSDs
"""
return AnomalousDiffusion.theoretical(
t, d, eps, np.ones_like(d), exposure_time, n_dim, squeeze_result
)
def _plot_single(self, data_id, n_lag, name, ax, color):
d, eps = self._results[data_id]
d_err, eps_err = self._err.get(data_id, (np.nan,) * 2)
x = np.linspace(0, n_lag, 100)
y = self.theoretical(x, d, eps, self.exposure_time, self.n_dim)
legend = []
if name:
legend.append(name)
legend.append(self._value_with_error("D", "μm²/s", d, d_err))
legend.append(self._value_with_error(
"ε", "nm", eps * 1000, eps_err * 1000, ".0f"))
legend = "\n".join(legend)
ax.plot(x, y, c=color, label=legend)