Source code for sdt.multicolor.coloc

# SPDX-FileCopyrightText: 2020 Lukas Schrangl <lukas.schrangl@tuwien.ac.at>
#
# SPDX-License-Identifier: BSD-3-Clause

import collections
from typing import Mapping, Optional, Sequence, Tuple, Union
import warnings

import pandas as pd
import numpy as np
from scipy.spatial import cKDTree

from .. import config


# Default values. If changed, also change doc strings
_channel_names = ["channel1", "channel2"]


[docs]def find_closest_pairs(coords1, coords2, max_dist): """Find best matches between coordinates Given two coordinate arrays (`coords1` and `coords2`), find pairs of points with the minimum distance between them. Each point will be in at most one pair (in contrast to `usual` KD-tree queries). Parameters ---------- coords1, coords2 : array-like, shape(n, m) Arrays of m-dimensional coordinate tuples. The dimension (m) has to be the same in both arrays while the number of points (n) can differ. max_dist : float Maximum distance for two points to be considered a pair Returns ------- numpy.ndarray, shape(n, 2), dtype(int) Each row describes one match. The first entry is the index of a point in `coords1`. The second entry is the index of its match in `coords2`. """ # Use cKDTrees to efficiently compute distances t1 = cKDTree(coords1) t2 = cKDTree(coords2) d = t1.sparse_distance_matrix(t2, max_dist, output_type="ndarray") # Sort w.r.t. distance between partners so that pairs with smallest # distances will be found first sort_idx = np.argsort(d["v"]) i1 = d["i"][sort_idx] i2 = d["j"][sort_idx] # Keep track of points that are already in pairs taken1 = np.zeros(len(coords1), dtype=bool) taken2 = np.zeros(len(coords2), dtype=bool) # Record pairs starting with those that have the smallest distance # between partners pairs = [] for ii1, ii2 in zip(i1, i2): if not (taken1[ii1] or taken2[ii2]): # Only if both partners are not already in another pair pairs.append((ii1, ii2)) taken1[ii1] = True taken2[ii2] = True return np.array(pairs, dtype=int).reshape((-1, 2))
[docs]@config.set_columns def find_colocalizations(features1: pd.DataFrame, features2: pd.DataFrame, max_dist: float = 2.0, keep_unmatched: bool = False, channel_names: Sequence[str] = _channel_names, columns: Mapping = {}, **kwargs) -> pd.DataFrame: """Match localizations in one channel to localizations in another For every localization in `features1` find localizations in `features2` (in the same frame) that are in a circle with radius `max_dist` around it, then pick the closest one. Parameters ---------- features1, features2 Localization data max_dist Maximum distance between features to still be considered colocalizing. keep_unmatched If True, also keep non-colocalized features in the result DataFrame. Non-colocalized features have NaNs in the columns of the channel they don't appear in. channel_names Names of the two channels. Returns ------- The DataFrame has a multi-index for columns with the top level given by the `channel_names` parameter. Each line of DataFrame corresponds to one pair of colocalizing particles. Other parameters ---------------- columns Override default column names as defined in :py:attr:`config.columns`. Relevant names are `coords` and `time`. This means, if your DataFrame has coordinate columns "x" and "z" and the time column "alt_frame", set ``columns={"coords": ["x", "z"], "time": "alt_frame"}``. keep_non_coloc Deprecated alias for `keep_unmatched` """ if "keep_non_coloc" in kwargs: warnings.warn("`keep_non_coloc` keyword argument is deprecated. Use " "`keep_unmatched` instead.", DeprecationWarning) keep_unmatched = kwargs.pop("keep_non_coloc") if kwargs: raise TypeError("got an unexpected keyword argument.") cols = columns["coords"] + [columns["time"]] p1_mat = features1[cols].values p2_mat = features2[cols].values pairs1_idx = [] pairs2_idx = [] for frame_no in np.unique(p1_mat[:, -1]): # indices of features in current frame p1_idx = np.nonzero(p1_mat[:, -1] == frame_no)[0] p2_idx = np.nonzero(p2_mat[:, -1] == frame_no)[0] # current frame positions with the frame column excluded p1_f = p1_mat[p1_idx, :-1] p2_f = p2_mat[p2_idx, :-1] if not (p1_f.size and p2_f.size): continue pair_idx = find_closest_pairs(p1_f, p2_f, max_dist) pairs1_idx.append(p1_idx[pair_idx[:, 0]]) pairs2_idx.append(p2_idx[pair_idx[:, 1]]) pairs1_idx = (np.concatenate(pairs1_idx) if pairs1_idx else np.empty(0, dtype=int)) pairs2_idx = (np.concatenate(pairs2_idx) if pairs2_idx else np.empty(0, dtype=int)) pairs = [pd.concat([features1.iloc[pairs1_idx].reset_index(drop=True), features2.iloc[pairs2_idx].reset_index(drop=True)], keys=channel_names, axis=1)] if keep_unmatched: if len(pairs) != len(features1): non_coloc_mask1 = np.ones(len(features1), dtype=bool) non_coloc_mask1[pairs1_idx] = False non_coloc1 = pd.concat([features1[non_coloc_mask1], features2.iloc[:0]], keys=channel_names, axis=1) non_coloc1[channel_names[1], columns["time"]] = \ non_coloc1[channel_names[0], columns["time"]] pairs.append(non_coloc1) if len(pairs) != len(features2): non_coloc_mask2 = np.ones(len(features2), dtype=bool) non_coloc_mask2[pairs2_idx] = False non_coloc2 = pd.concat([features1.iloc[:0], features2[non_coloc_mask2]], keys=channel_names, axis=1) non_coloc2[channel_names[0], columns["time"]] = \ non_coloc2[channel_names[1], columns["time"]] pairs.append(non_coloc2) return pd.concat(pairs, ignore_index=True, copy=False)
[docs]@config.set_columns def merge_channels(features1, features2, max_dist=2., mean_pos=False, return_data="data", columns={}): """Merge features of `features1` and `features2` Concatenate all of `features1` and those entries of `features2` that do not colocalize with any of `features1` (in the same frame). Parameters ---------- features1, features2 : pandas.DataFrame Localization data max_dist : float, optional Maximum distance between features to still be considered colocalizing. Defaults to 2. mean_pos : bool, optional When entries are merged (i. e., if an entry in `features1` is close enough to one in `features2`), calculate the center of mass of the two channel's entries. If `False`, use the position in `features1`. All other (non-coordinate) columns are taken from `features1` in any case. Defaults to `False`. return_data : {"data", "index", "both"}, optional Whether to return the full data of merged features, only indices (that is, the DatatFrame's indices) of features in `features2` that have no counterpart in `features1`, or both. Defaults to "data". Returns ------- data : pandas.DataFrame DataFrame of merged features. Returned if `return_data` is "data" or "both". feat2_index : pandas.Index Indices of features out of `features2` that have no counterpart in `features1`. One can construct the DataFrame (as returned if `return_data` is "data" or "both") by ``pandas.concat([features1, features2.loc[feat2_index]], keys=channel_names, ignore_index=True)``. Returned if `return_data` is "index" or "both". Other parameters ---------------- columns : dict, optional Override default column names as defined in :py:attr:`config.columns`. Relevant names are `coords` and `time`. This means, if your DataFrame has coordinate columns "x" and "z" and the time column "alt_frame", set ``columns={"coords": ["x", "z"], "time": "alt_frame"}``. """ cols = columns["coords"] + [columns["time"]] f1_mat = features1[cols].values f2_mat = features2[cols].values coloc_idx_1 = [] coloc_idx_2 = [] for frame_no in np.union1d(f1_mat[:, -1], f2_mat[:, -1]): # indices of features in current frame f1_idx = np.nonzero(f1_mat[:, -1] == frame_no)[0] f2_idx = np.nonzero(f2_mat[:, -1] == frame_no)[0] if not (f1_idx.size and f2_idx.size): # One channel does not have any features in this frame a = np.array([], dtype=int) coloc_idx_1.append(a) coloc_idx_2.append(a) continue # current frame positions with the frame column excluded f1_f = f1_mat[f1_idx, :-1] f2_f = f2_mat[f2_idx, :-1] pair_idx = find_closest_pairs(f1_f, f2_f, max_dist) coloc_idx_1.append(f1_idx[pair_idx[:, 0]]) coloc_idx_2.append(f2_idx[pair_idx[:, 1]]) coloc_idx_1 = np.concatenate(coloc_idx_1) coloc_idx_2 = np.concatenate(coloc_idx_2) # Indices of features of `features2` that don't colocalize with anything # in `features1` f2_non_coloc = np.ones(len(f2_mat), dtype=bool) f2_non_coloc[coloc_idx_2] = False f2_non_coloc_idx = features2.index[f2_non_coloc] if return_data == "index": return f2_non_coloc_idx ret = features1.copy() if mean_pos: p1 = f1_mat[coloc_idx_1, :-1] p2 = f2_mat[coloc_idx_2, :-1] ret.loc[ret.index[coloc_idx_1], columns["coords"]] = (p1 + p2) / 2 if coloc_idx_2.size != f2_mat.shape[0]: # Append non-merged features of channel 2 to `ret` f2_non_coloc = np.ones(len(f2_mat), dtype=bool) f2_non_coloc[coloc_idx_2] = False ret = pd.concat((ret, features2.loc[f2_non_coloc_idx]), ignore_index=True) if return_data == "data": return ret elif return_data == "both": return ret, f2_non_coloc_idx else: raise ValueError("`return_data` has to be one of 'data', 'index', " "or 'both'.")
[docs]@config.set_columns def find_codiffusion(tracks1: pd.DataFrame, tracks2: pd.DataFrame(), abs_threshold: int = 3, rel_threshold: float = 0.75, return_data: str = "data", feature_pairs: Optional[pd.DataFrame] = None, max_dist: float = 2.0, keep_unmatched: str = "gaps", channel_names: Sequence[str] = _channel_names, columns: Mapping = {}) -> Union[ pd.DataFrame, np.ndarray, Tuple[pd.DataFrame, np.ndarray]]: """Find codiffusion in tracking data First, find pairs of localizations, the look up to which tracks they belong to and thus match tracks. Parameters ---------- tracks1, tracks2 Tracking data. In addition to what is required by :py:func:`find_colocalizations`, a "particle" column has to be present. abs_threshold Minimum number of matched localizations per track pair. rel_threshold : float, optional Minimum fraction of matched localizations that have to belong to the same pair of tracks. E. g., if localizations of a particle in the first channel match five localizations of a particle in the second channel, and there are eight frames between the first and the last match, that fraction would be 5/8. return_data Whether to return the full data of the codiffusing particles (``"data"``), only their particle numbers (``"numbers"``), or both (``"both"``). feature_pairs If :py:func:`find_colocalizations` has already been called on the data, the result can be passed to save re-computation. If `None`, :py:func:`find_colocalizations` is called in this function. max_dist `max_dist` parameter for :py:func:`find_colocalizations` call. keep_unmatched If ``"all"``, also keep all non-colocalized features in the result DataFrame. Non-colocalized features have NaNs in the columns of the channel they don't appear in.If ``"gaps"``, only keep non-colocalized features within codiffusing parts tracks. If ``"none"``, remove all non-colocalized entries. channel_names Names of the two channels. Returns ------- If `return_data` is ``"data"`` or ``"both"``, a DataFrame with full data (from `tracks1` and `tracks2`) of the codiffusing particles is returned. The DataFrame has a multi-index for columns with the top level being the two channels. Each line of DataFrame corresponds to one pair of colocalizing particles. If `return_data` is ``"numbers"`` or ``"both"``, an array with four columns is returned. Each row's first entry is a particle number in the first channel. The second entry is the matching particle number in the second channel. Third and fourth columns are start and end frame, respectively. Other parameters ---------------- columns Override default column names as defined in :py:attr:`config.columns`. Relevant names are `coords`, `particle`, and `time`. This means, if your DataFrame has coordinate columns "x" and "z" and the time column "alt_frame", set ``columns={"coords": ["x", "z"], "time": "alt_frame"}``. """ keep_unmatched = keep_unmatched.lower() if keep_unmatched not in ("none", "gaps", "all"): raise ValueError("`keep_unmatched` must be one of 'none', 'gaps', " "'all'") if feature_pairs is None: feature_pairs = find_colocalizations( tracks1, tracks2, max_dist, channel_names=channel_names, keep_unmatched=keep_unmatched != "none", columns=columns) p_col = columns["particle"] t_col = columns["time"] ch1_pairs = feature_pairs[channel_names[0]] ch2_pairs = feature_pairs[channel_names[1]] matches = [] for pn, data1 in ch1_pairs.groupby(p_col): # get channel 2 pair data corresponding to particle `pn` in channel 1 # (only "particle" and "frame" columns) data2 = ch2_pairs.loc[data1.index, [p_col, t_col]].values # count how often which channel 2 particle appears count = collections.Counter(data2[:, 0]) # turn into array where each row is (channel2_particle_number, count) count_arr = np.array([(k, v) for k, v in count.items()]) # only take those which appear >= abs_threshold times candidate_pns = count_arr[count_arr[:, 1] >= abs_threshold] for c_pn, c_cnt in candidate_pns: # for each, get frame number for first and last match c_data2 = data2[data2[:, 0] == c_pn, 1] start = c_data2.min() end = c_data2.max() if c_cnt / (end - start + 1) >= rel_threshold: # if greater or equal to threshold, record as valid matches.append((pn, c_pn, start, end)) matches = np.array(matches) if return_data == "numbers": return matches feature_pairs["codiff", "particle"] = -1 pn1_list = ch1_pairs[p_col].to_numpy() pn1_nan = np.isnan(pn1_list) pn2_list = ch2_pairs[p_col].to_numpy() pn2_nan = np.isnan(pn2_list) fn_list = ch1_pairs[t_col].to_numpy() for new_pn, (pn1, pn2, start, end) in enumerate(matches): # particle numbers can be NaNs if unmatched # FIXME: maybe this can be made faster by sorting is_pn1 = (pn1_list == pn1) | pn1_nan is_pn2 = (pn2_list == pn2) | pn2_nan is_fn = (round(start) <= fn_list) & (fn_list <= round(end)) sel = is_pn1 & is_pn2 & is_fn feature_pairs.loc[sel, ("codiff", "particle")] = new_pn # overwrite NaNs in unmatched entries with particle numbers feature_pairs.loc[sel, (channel_names[0], "particle")] = pn1 feature_pairs.loc[sel, (channel_names[1], "particle")] = pn2 if keep_unmatched == "gaps": idx = feature_pairs.index[feature_pairs["codiff", "particle"] < 0] feature_pairs.drop(index=idx, inplace=True) if return_data == "data": return feature_pairs elif return_data == "both": return feature_pairs, matches else: raise ValueError("`return_data` has to be one of 'data', 'numbers', " "or 'both'.")
[docs]@config.set_columns def calc_pair_distance(data, channel_names=None, columns={}): """Calculate distances between colocalized features Parameters ---------- data : pandas.DataFrame Colocalized feature data, e.g. output of :py:func:`find_colocalizations`. Returns ------- pandas.Series Distances between colocalized features. The series' index is the same as in the input DataFrame. Other parameters ---------------- channel_names : list of str or None, optional Names of the channels. If None, use the first two entries of teh top level of `data`'s MultiIndex. Defaults to None. columns : dict, optional Override default column names as defined in :py:attr:`config.columns`. The only relevant name is `coords. This means, if your DataFrame has coordinate columns "x" and "z", set ``columns={"coords": ["x", "z"]}``. """ if channel_names is None: channel_names = data.columns.remove_unused_levels().levels[0][:2] d = (data[channel_names[0]][columns["coords"]] - data[channel_names[1]][columns["coords"]])**2 return np.sqrt(np.sum(d, axis=1))
[docs]@config.set_columns def plot_codiffusion(data, particle, ax=None, cmap=None, show_legend=True, legend_loc=0, linestyles=["-", "--", ":", "-."], channel_names=None, columns={}): """Plot trajectories of codiffusing particles Each step is colored differently so that by comparing colors one can figure out which steps in one channel correspond to which steps in the other channel. Parameters ---------- data : pandas.DataFrame or tuple of pandas.DataFrames Tracking data of codiffusing particles. This can be a :py:class:`pandas.DataFrame` with a MultiIndex for columns as e. g. returned by :py:func:`find_codiffusion` (i. e. matching indices in the DataFrames correspond to matching localizations) or a tuple of DataFrames, one for each channel. particle : int or tuple of int Specify particle ID. In case `data` is a list of DataFrames and the particles have different IDs, one can pass the tuple of IDs. ax : matplotlib.axes.Axes To be used for plotting. If `None`, ``matplotlib.pyplot.gca()`` will be used. Defaults to `None`. cmap: matplotlib.colors.Colormap, optional To be used for coloring steps. Defaults to the "Paired" map of `matplotlib`. show_legend : bool, optional Whether to print a legend or not. Defaults to True. legend_loc : int or str Is passed as the `loc` parameter to matplotlib's axes' `legend` method. Defaults to 0. channel_names : list of str or None, optional Names of the channels. If None, use the first two entries of teh top level of `data`'s MultiIndex if it has one (i. e. if it is a DataFrame as created by :py:func:`find_codiffusion`), otherwise use ["channel1", "channel2"]. Defaults to None. Other parameters ---------------- columns : dict, optional Override default column names as defined in :py:attr:`config.columns`. Relevant names are `coords`, `particle`, and `time`. This means, if your DataFrame has coordinate columns "x" and "z" and the time column "alt_frame", set ``columns={"coords": ["x", "z"], "time": "alt_frame"}``. """ import matplotlib as mpl import matplotlib.pyplot as plt if ax is None: ax = plt.gca() if cmap is None: cmap = plt.get_cmap("Paired") ax.set_aspect(1.) col = getattr(data, "columns", None) if channel_names is None: if isinstance(col, pd.MultiIndex): channel_names = col.remove_unused_levels().levels[0][:2] else: channel_names = _channel_names if isinstance(data, pd.DataFrame): d_iter = (data.loc[data[c, columns["particle"]] == particle, c] for c in channel_names) else: if not isinstance(particle, collections.abc.Iterable): particle = (particle,) * len(data) d_iter = (d[d[columns["particle"]] == p] for d, p in zip(data, particle)) legend = [] for d, ls in zip(d_iter, linestyles): # the following two lines create a 3D array s. t. the i-th entry is # the matrix [[start_x, start_y], [end_x, end_y]] xy = d.sort_values(columns["time"])[columns["coords"]] xy = xy.values[:, np.newaxis, :] segments = np.concatenate([xy[:-1], xy[1:]], axis=1) lc = mpl.collections.LineCollection( segments, cmap=cmap, array=np.linspace(0., 1., len(d)), linestyles=ls) ax.add_collection(lc, autolim=True) legend.append(mpl.lines.Line2D([0, 1], [0, 1], ls=ls, c="black")) ax.autoscale_view() if show_legend: ax.legend(legend, channel_names[:len(legend)], loc=legend_loc)