# SPDX-FileCopyrightText: 2020 Lukas Schrangl <lukas.schrangl@tuwien.ac.at>
#
# SPDX-License-Identifier: BSD-3-Clause
"""Tools for calculation of MSDs via cumulative density functions"""
from collections import namedtuple, OrderedDict
import itertools
import math
import warnings
import numpy as np
import pandas as pd
from scipy import optimize as sp_opt
from . import msd_base, msd
from .. import config, funcs, optimize as sdt_opt
def _fit_cdf_prony(x, y, n_exp, poly_order, initial_guess=None):
r"""Use :py:class:`sdt_op.ProbExpSumModel` to fit the CDF
Parameters
----------
x : numpy.ndarray
Abscissa (x-axis) data
y : numpy.ndarray
CDF function values corresponding to `x`.
n_exp : int
Number of exponential functions (``p``) in the sum
poly_order : int
For calculation, the sum of exponentials is approximated by a sum
of Legendre polynomials. This parameter gives the degree of the
highest order polynomial.
Returns
-------
mant_coeff : numpy.ndarray
Mantissa coefficients (:math:`\beta_k`)
exp_coeff : numpy.ndarray
List of exponential coefficients (:math:`\lambda_k`)
Other parameters
----------------
initial_guess : numpy.ndarray or None, optional
An initial guess for determining the parameters of the ODE (if you
don't know what this is about, don't bother). The array is 1D and has
`n_exp` + 1 entries. If None, use ``numpy.ones(n_exp + 1)``.
Defaults to None.
"""
res = sdt_opt.ProbExpSumModel(n_exp, poly_order).fit(y, x, initial_guess)
return res.mant, res.exp
def _fit_cdf_lsq(x, y, n_exp, weighted=True, initial_b=None, initial_l=None):
r"""Fit CDF by least squares fitting
Determine the best parameters :math:`\alpha, \beta_k, \lambda_k` by fitting
:math:`\alpha + \sum_{k=1}^p \beta_k \text{e}^{\lambda_k t}` to the data
using least squares fitting. Additionally, there are the constraints
:math:`\sum_{k=1}^p -\beta_k = 1` and :math:`\alpha = 1`.
Parameters
----------
x : numpy.ndarray
Abscissa (x-axis) data
y : numpy.ndarray
CDF function values corresponding to `x`.
n_exp : int
Number of exponential functions (``p``) in the sum
weighted : bool, optional
Whether to way the residual according to inverse data point density
on the abscissa. Usually, there are many data points close to x=0,
which makes the least squares fit very accurate in that region and not
so much everywhere else. Defaults to True.
initial_b : numpy.ndarray, optional
initial guesses for the :math:`b_k`
initial_l : numpy.ndarray, optional
initial guesses for the :math:`\lambda_k`
Returns
-------
mant_coeff : numpy.ndarray
Mantissa coefficients (:math:`\beta_k`)
exp_coeff : numpy.ndarray
List of exponential coefficients (:math:`\lambda_k`)
"""
def constrained_func(t, *args):
lam = args[n_exp-1:]
beta = np.zeros_like(lam)
beta[:-1] = args[:n_exp-1]
beta[-1] = -1 - beta.sum()
return funcs.exp_sum(t, 1, beta, lam)
bounds_low = np.array([-1] * (n_exp - 1) + [-np.inf] * n_exp)
bounds_high = np.array([0] * (n_exp - 1) + [0] * n_exp)
# initial guesses
if initial_b is None:
initial_b = -np.logspace(-n_exp, 0, n_exp-1, base=2,
endpoint=False)
if initial_l is None:
initial_l = -1 / np.logspace(-n_exp, 0, n_exp)
p0 = np.concatenate([initial_b, initial_l])
# fit
if weighted:
w = np.empty(x.shape)
w[1:] = x[1:] - x[:-1]
w[0] = w[1]
w /= w.max()
w = 1/np.sqrt(w) # since residual is squared, use sqrt
else:
w = None
popt, pcov = sp_opt.curve_fit(constrained_func, x, y, p0,
bounds=(bounds_low, bounds_high),
sigma=w)
b = np.zeros(n_exp, dtype=float)
b[:-1] = popt[:n_exp-1]
b[-1] = -1 - b.sum()
return b, popt[n_exp-1:]
def _msd_from_cdf(square_disp, n_components, method, n_boot, random_state=None,
poly_order=30):
r"""Fit the CDF of square displacements to an exponential model
The cumulative density function (CDF) of square displacements is the
sum
.. math:: 1 - \sum_{i=1}^n \beta_i e^{-\frac{r^2}{4 D \Delta t_i}},
where :math:`n` is the number of diffusing components, :math:`\beta_i` the
fraction of th i-th component, :math:`r^2` the square displacement and
:math:`D` the diffusion coefficient. By fitting to the measured CDF, these
parameters can be extracted.
Parameters
----------
square_disp : list of numpy.ndarray
The i-th array contains square deviations for the i-th lag time.
In other words, `square_disp` is one value from the result of
:py:func:`msd_base._all_square_displacements`.
n_components : int
The number of components
fit_method : {"prony", "lsq", "weighted-lsq"}
Method to use for fitting CDFs. "prony" is a modified Prony's method,
"lsq" is least squares fitting, and "weighted-lsq" is weighted least
squares fitting to account for the fact that the CDF data are
concentrated at x=0.
n_boot : int
Number of bootstrapping iterations for calculation of errors.
Set to 0 to turn off bootstrapping for performance gain, in which
case there will be no errors on the fit results.
random_state : numpy.random.RandomState
:py:class:`numpy.random.RandomState` instance to use for
random sampling for bootstrapping. Only needed for `n_boot` > 1.
Returns
-------
msds : numpy.ndarray, shape(n_frac, len(square_disp), n_boot)
Mean square displacements. First index is for the component, second for
the lag time, third for bootstrap run.
weights : numpy.ndarray, shape(n_frac, len(square_disp), n_boot)
Component weights. First index is for the component, second for
the lag time, third for bootstrap run.
Other parameters
----------------
poly_order : int, optional
For the "prony" method, the sum of exponentials is approximated by a
polynomial. This parameter gives the degree of the polynomial.
Defaults to 30.
"""
n_boot = max(n_boot, 1)
weights = np.empty((n_components, len(square_disp), n_boot))
msds = np.empty((n_components, len(square_disp), n_boot))
for i, cur_orig_sd in enumerate(square_disp):
y = np.linspace(0, 1, len(cur_orig_sd), endpoint=True)
for j in range(n_boot):
if n_boot > 1:
cur_sd = random_state.choice(cur_orig_sd, len(cur_orig_sd),
replace=True)
else:
cur_sd = cur_orig_sd
cur_sd = np.sort(cur_sd)
if method == "prony":
beta, lam = _fit_cdf_prony(cur_sd, y, n_components, poly_order)
elif method == "lsq":
beta, lam = _fit_cdf_lsq(cur_sd, y, n_components,
weighted=False)
elif method == "weighted-lsq":
beta, lam = _fit_cdf_lsq(cur_sd, y, n_components,
weighted=True)
else:
raise ValueError("Unknown method")
weights[:, i, j] = -beta
msds[:, i, j] = -1. / lam
return msds, weights
def _assign_components(msds, weights, how):
"""Assign MSDs and weights to components
Which of the MSDs / weights gets assigned to which component in
:py:func:`_msd_from_cdf` depends on the order the fitting function
returns them, which is not predictable.
E.g., the slow MSD could be assigned sometimes to component 1, sometimes
to component 2, etc.
This function tries to resolve the problem.
Parameters
----------
msds, weights : numpy.ndarray
Arrays as returned by :py:func:`_msd_from_cdf`
how : {"msd", "weight"}
If "msd", components are assigned according to increasing MSDs. If
"weight", components are assigned according to increasing weights.
Returns
-------
msds_assigned : numpy.ndarray
MSDs assigned to components. ``msds_assigned[i, j, k]`` is the
MSD of the i-th component, the j-th lag time, and the k-th bootstrap
run.
weights_assigned : numpy.ndarray
Weights assigned to components. ``weights_assigned[i, j, k]`` is the
weight of the i-th component, the j-th lag time, and the k-th bootstrap
run.
"""
if how == "msd":
labels = np.argsort(msds, axis=0)
elif how == "weight":
labels = np.argsort(weights, axis=0)
else:
raise ValueError("Unknown assignment method: {}".format(how))
# Assign
idx = np.indices(msds.shape)
idx[0] = labels
idx = tuple(idx)
msds_assigned = msds[idx]
weights_assigned = weights[idx]
return msds_assigned, weights_assigned
[docs]class MsdDist:
"""Calculate and analyze mean square displacement (MSD) distribution
from single moleclue tracking data. This differs from
:py:class:`sdt.motion.Msd` by the fact that the distributions
are analyzed, which allows for detection of multiple
components with different diffusion coefficients [Schu1997]_.
Only valid for 2D data.
"""
@config.set_columns
def __init__(self, data, frame_rate, n_components=2, n_lag=10, n_boot=0,
ensemble=True, fit_method="lsq", poly_order=30,
e_name="ensemble", random_state=None, pixel_size=1,
assign_method="msd", columns={}):
"""Parameters
----------
data : pandas.DataFrame or iterable of pandas.DataFrame
Tracking data. Either a single DataFrame or a collection of
DataFrames.
frame_rate : float
Frame rate
n_components : int, optional
Number of diffusing components. Defaults to 2.
n_lag : int or inf, optional
Number of lag times (time steps) to consider at most. Defaults to
10.
n_boot : int, optional
Number of bootstrapping iterations for calculation of errors.
Set to 0 to turn off bootstrapping for performance gain, in which
case there will be no errors on the fit results. Defaults to 0.
ensemble : bool, optional
Whether to calculate the MSDs for the whole data set or for each
trajectory individually. Defaults to True.
fit_method : {"prony", "lsq", "weighted-lsq"}, optional
Method to use for cumulative distribution function (CDF) fitting.
"prony" is a modified Prony's method, "lsq" is least squares
fitting, and "weighted-lsq" is weighted least squares fitting to
account for the fact that the CDF data are concentrated at x=0.
Defaults to "lsq".
pixel_size : float, optional
Pixel size; multiply coordinates by this factor. Defaults to 1
(no scaling).
assign_method : {"msd", "weight"}, optional
If "msd", components are assigned according to increasing MSDs,
i.e., lowest MSD will be component 1, next will be component 2,
and so on. If "weight", components are assigned according to
increasing weights. Defaults to "msd".
Other parameters
----------------
e_name : str, optional
If the `ensemble` parameter is `True`, use this as the name for
the dataset. It shows up in the MSD DataFrame (see
:py:meth:`get_msd`) and in plots. Defaults to "ensemble".
random_state : numpy.random.RandomState or None, optional
:py:class:`numpy.random.RandomState` instance to use for
random sampling for bootstrapping. If `None`, use create a new
instance with ``seed=None``. Defaults to `None`.
columns : dict, optional
Override default column names as defined in
:py:attr:`config.columns`. Relevant names are `coords`, `particle`,
and `time`. This means, if your DataFrame has coordinate columns
"x" and "z" and the time column "alt_frame", set
``columns={"coords": ["x", "z"], "time": "alt_frame"}``.
"""
square_disp = msd_base._all_square_displacements(
data, n_lag, ensemble, e_name, pixel_size, columns)
if n_boot > 1 and random_state is None:
random_state = np.random.RandomState()
msd_set = OrderedDict()
weight_set = OrderedDict()
for p_id, p_sds in square_disp.items():
msds, weights = _msd_from_cdf(p_sds, n_components, fit_method,
n_boot, random_state, poly_order)
msds_ass, weights_ass = _assign_components(msds, weights,
assign_method)
msd_set[p_id] = msds_ass
weight_set[p_id] = weights_ass
self._msd_data = []
self._weight_data = []
for n in range(n_components):
for src, dst in [(msd_set, self._msd_data),
(weight_set, self._weight_data)]:
d = msd_base.MsdData(
frame_rate,
OrderedDict([(k, v[n, :, :]) for k, v in src.items()]))
dst.append(d)
Result = namedtuple("Result",
["msd", "msd_err", "weight", "weight_err"])
[docs] def get_msd(self, series=True):
"""Get MSD and error DataFrames
The columns contain data for different lag times. Each row corresponds
to one trajectory. The row index is either the particle number if a
single DataFrame was passed to :py:meth:`__init__` or a tuple
identifying both the DataFrame and the particle. If
:py:meth:`__init__` was called with ``ensemble=True``, there will only
be one row with index `e_name`.
If there is only one row, the `series` parameter controls whether to
return a :py:class:`pandas.DataFrame` or a :py:class:`pandas.Series`.
Parameters
----------
series : bool, optional
If `True` and and there is only one entry (e.g., due to
``ensemble=True`` in the constructor), :py:class:`pandas.Series`
objects will be returned. Otherwise, return
:py:class:`pandas.DataFrame`. Defaults to `True`.
Returns
-------
list of namedTuple(["msd", "msd_err", "weight", "weight_err"])
Each tuple describes one component. The tuple members are
:py:class:`pandas.DataFrame` or :py:class:`pandas.Series`
(depending on the `series` parameter), where `msd` holds the mean
square displacements, `msd_err` contains standard errors of the
MSDs, `weight` hold the relative weight of the component calculated
for the different lag times, and `weight_err` contains standard
errors of the weights. If no bootstrapping was performed, all
errors are `NaN`.
"""
ret = []
for md, fd in zip(self._msd_data, self._weight_data):
data = self.Result(
md.get_data("means", series),
md.get_data("errors", series),
fd.get_data("means", series),
fd.get_data("errors", series))
ret.append(data)
return ret
[docs] def fit(self, model="brownian", *args, **kwargs):
"""Fit a model function to the MSD data
Parameters
----------
model : {"anomalous", "brownian"}
Type of model to fit
n_lag : int or inf, optional
Number of lag times to use for fitting. Defaults to 2 for the
Brownian model and `inf` for anomalous diffusion.
exposure_time : float, optional
Exposure time. Defaults to 0, i.e. no exposure time compensation
initial : tuple of float, len 3, optional
Initial guess for fitting anomalous diffusion. The tuple entries
are diffusion coefficient, positional accuracy, and alpha.
"""
return MsdDistFit(self._msd_data, model, self._weight_data,
**kwargs)
class MsdDistFit:
"""Fit diffusion parameters to MSDs from square displacement dists"""
def __init__(self, msd_data, model, weight_data, **kwargs):
"""Parameters
----------
msd_data : list of msd_base.MsdData
Each MsdData instance should contain MSD data for one component
model : {"brownian", "anomalous", model class}
Model to fit. If "brownian", use :py:class:`msd.BrownianMotion`.
If "anomalous", use :py:class:`msd.AnomalousDiffusion`. If a class,
use that. It has to be modelled like the aforemntioned classes,
i.e., ``__init__`` should accept a :py:class:`msd_base.MsdData`
instance and keyword args. Fitting should be done in ``__init__``.
A ``get_results`` method should return two DataFrames, the first
containing in each row the fit results for one particle; the second
should contain the fit errors per particle.
weight_data : list of msd_base.MsdData
Each MsdData instance should contain weight data for one component
**kwargs
Keyword arguments passed to model class ``__init__``. This could be
``exposure_time`` for exposure time correction in
:py:class:`msd.BrownianMotion`or :py:class:`msd.AnomalousDiffusion`
models or a ``"initial"`` triple of inital guesses for `D`,
uncertainty, and α for the :py:class:`msd.AnomalousDiffusion`
model.
"""
if isinstance(model, str):
model = model.lower()
if model.startswith("brownian"):
model = msd.BrownianMotion
elif model.startswith("anomalous"):
model = msd.AnomalousDiffusion
else:
raise ValueError("Unknown model: " + model)
self.msd_fits = [model(md, **kwargs) for md in msd_data]
self.weights = Weights(weight_data)
Result = namedtuple("Result", ["fit", "fit_err"])
def get_results(self, series=True):
"""Get fit results
The columns contain fitted parameters. Each row corresponds
to one trajectory. The row index is either the particle number or a
tuple identifying both the DataFrame and the particle; c.f.
:py:class:`MsdDist`.
If there is only one row, the `series` parameter controls whether to
return a :py:class:`pandas.DataFrame` or a :py:class:`pandas.Series`.
Parameters
----------
series : bool, optional
If `True` and and there is only one entry (e.g., due to
``ensemble=True`` in the :py:class:`MsdDist` constructor),
:py:class:`pandas.Series` objects will be returned. Otherwise,
return :py:class:`pandas.DataFrame`. Defaults to `True`.
Returns
-------
list of namedTuple(["fit", "fit_err"])
Each entry describes one component. The tuple members are
:py:class:`pandas.DataFrame` or :py:class`pandas.Series`,
where ``fit`` holds the fit result as well as the mean weight and
``fit_err`` contains the corresponding standard errors. If no
bootstrapping was performed, fit errors are `NaN`.
"""
ret = []
weight, weight_err = self.weights.get_results(series)
for comp, msd_fit in enumerate(self.msd_fits):
fit, err = msd_fit.get_results(series)
fit["weight"] = weight[comp]
err["weight"] = weight_err[comp]
data = self.Result(fit, err)
ret.append(data)
return ret
def plot(self, fig=None, weight_ax=None, msd_ax=None, show_legend=True):
"""Plot MSDs and fit results
First plot will show weights of the components, other plots will
show MSDs and fits of each component.
Parameters
----------
fig : matplotlib.figure.Figure or None, optional
Figure object to use in case weight_ax or msd_ax are not given.
If `None`, use ``matplotlib.pyplot.gcf()``. Defaults to `None`.
weight_ax : matplotlib.axes.Axes or None, optional
Axes object for plotting the weights of the components. If `None`,
use ``fig`` to create it. Defaults to `None`.
msd_ax : list-like of matplotlib.axes.Axes or None, optional
Axes objects for plotting the MSDs of each component. If `None`,
use ``fig`` to create them. Defaults to `None`.
show_legend : bool, optional
Whether to show numerical fit results in the plot legend. Defaults
to `True`.
Returns
-------
fig : matplotlib.figure.Figure
Figure object used.
weight_ax : matplotlib.axes.Axes
Axes object for plotting the weights of the components
msd_ax : list of matplotlib.axes.Axes
Axes objects for plotting the MSDs
"""
import matplotlib.pyplot as plt
if weight_ax is None or msd_ax is None:
if fig is None:
fig = plt.gcf()
fig.set_constrained_layout(True)
n_sub = len(self.msd_fits) + 1
ax = fig.subplots(1, n_sub)
weight_ax = ax[0]
msd_ax = ax[1:]
self.weights.plot(ax=weight_ax, show_legend=show_legend)
for i, (f, a) in enumerate(zip(self.msd_fits, msd_ax)):
f.plot(ax=a, show_legend=show_legend)
a.set_title("Component {}".format(i+1))
return fig, weight_ax, msd_ax
class Weights:
"""Handle weight data for a single component from :py:class:`MsdDist`
This is similar to what e.g. :py:class:`msd.BrownianMotion` does with
MSD data.
"""
def __init__(self, weight_data):
"""Parameters
----------
weight_data : list of msd_base.MsdData
Weight data for each component
"""
n_components = len(weight_data)
if not n_components:
raise ValueError("`weight_data` is empty.")
self._results = OrderedDict()
self._err = OrderedDict()
for p_id in weight_data[0].means.keys():
self._results[p_id] = [np.mean(weight_data[i].means[p_id])
for i in range(n_components)]
self._err[p_id] = [np.std(weight_data[i].means[p_id], ddof=1)
for i in range(n_components)]
self._weight_data = weight_data
def get_results(self, series=True):
"""Get results
Parameters
----------
series : bool, optional
If `True` and and there is only one entry (e.g., due to
``ensemble=True`` in the :py:class:`MsdDist` constructor),
:py:class:`pandas.Series` objects will be returned. Otherwise,
return :py:class:`pandas.DataFrame`. Defaults to `True`.
Returns
-------
results : pandas.DataFrame or pandas.Series
Columns are the weight for each component. Each row represents
one particle.
errors : pandas.DataFrame or pandas.Series
Standard errors.
"""
if len(self._results) == 1 and series:
name, res = next(iter(self._results.items()))
res = pd.Series(res, name=name)
if self._err:
err = pd.Series(next(iter(self._err.values())), name=name)
else:
err = pd.Series(name=name)
return res, err
res_df = pd.DataFrame(self._results).T
err_df = pd.DataFrame(self._err).T
for d in (res_df, err_df):
d.columns.name = "component"
if isinstance(d.index, pd.MultiIndex):
d.index.names = ("file", "particle")
else:
d.index.name = "particle"
return res_df, err_df
def plot(self, show_legend=True, ax=None):
"""Plot lag time vs. weight
Parameters
----------
show_legend : bool, optional
Whether to add a legend to the plot. Defaults to `True`.
ax : matplotlib.axes.Axes or None, optional
Axes to use for plotting. If `None`, use ``pyplot.gca()``.
Defaults to `None`.
Returns
-------
matplotlib.axes.Axes
Axes used for plotting
"""
import matplotlib.pyplot as plt
if ax is None:
ax = plt.gca()
ax.set_xlabel("lag time [s]")
ax.set_ylabel("weight")
for p_id in self._results:
marker = itertools.cycle((".", "x", "+", "o", "*", "D", "v", "^"))
linestyles = itertools.cycle(("-", "-.", "--", ":"))
color = None
if isinstance(p_id, tuple):
name = "/".join(str(p) for p in p_id)
else:
name = str(p_id)
legend = []
if name:
legend.append(name)
weight_str = []
for r, e in zip(self._results[p_id], self._err[p_id]):
if math.isfinite(e):
weight_str.append(f"{r:.2g} ± {e:.2g}")
else:
weight_str.append(f"{r:2.g}")
legend.append("weights:" + ", ".join(weight_str))
legend = "\n".join(legend)
loop_vars = zip(self._weight_data, marker, linestyles)
for i, (w, mark, lstyle) in enumerate(loop_vars):
m = w.means[p_id]
e = w.errors[p_id]
lt = w.get_lagtimes(len(m))
line = ax.plot(lt, [self._results[p_id][i]] * len(lt),
ls=lstyle, c=color, label=legend)
color = line[0].get_color()
ax.errorbar(lt, m, yerr=e, linestyle="none", marker=mark,
markerfacecolor="none", c=color)
legend = None # only add for the first line
if show_legend:
ax.legend(loc=0)
return ax
# Old API
@config.set_columns
def emsd_cdf(data, pixel_size, fps, num_frac=2, max_lagtime=10, method="lsq",
poly_order=30, columns={}):
r"""Calculate ensemble mean square displacements from tracking data CDF
.. deperecated:: 14.0
Use :py:class:`MsdDist` instead.
Fit the model cumulative density function to the measured CDF of tracking
data. For details, see the documentation of :py:class:`MsdDist`.
Parameters
----------
data : list of pandas.DataFrames or pandas.DataFrame
Tracking data
pixel_size : float
width of a pixel in micrometers.
fps : float
Frames per second
num_frac : int, optional
The number of diffusing species. Defaults to 2
max_lagtime : int, optional
Maximum number of time lags to consider. Defaults to 10.
method : {"prony", "lsq", "weighted-lsq"}, optional
Which fit method to use. "prony" is a modified Prony's method, "lsq"
is least squares fitting, and "weighted-lsq" is weighted least squares
fitting to account for the fact that the CDF data are concentrated
at x=0. Defaults to "prony".
Returns
-------
list of pandas.DataFrames([lagt, msd, fraction])
For each species, the DataFrame contains for each lag time the msd,
the fraction.
Other parameters
----------------
poly_order : int, optional
For the "prony" method, the sum of exponentials is approximated by a
polynomial. This parameter gives the degree of the polynomial.
Defaults to 30.
columns : dict, optional
Override default column names as defined in :py:attr:`config.columns`.
Relevant names are `coords`, `particle`, and `time`.
This means, if your DataFrame has coordinate columns "x" and "z" and
the time column "alt_frame", set ``columns={"coords": ["x", "z"],
"time": "alt_frame"}``.
"""
warnings.warn("`emsd_cdf` is deprecated. Use the `Msd` class instead.",
np.VisibleDeprecationWarning)
msd_cls = MsdDist(data, fps, n_components=num_frac, n_lag=max_lagtime,
fit_method=method, poly_order=poly_order,
pixel_size=pixel_size, columns=columns)
ret = []
for m in msd_cls.get_msd():
r = pd.DataFrame.from_dict(
OrderedDict([("lagt", m.msd.index), ("msd", m.msd),
("fraction", m.weight)]))
r.index.name = None
r.sort_values("lagt", inplace=True)
ret.append(r)
return ret