Source code for sdt.motion.immobilization

# SPDX-FileCopyrightText: 2020 Lukas Schrangl <lukas.schrangl@tuwien.ac.at>
#
# SPDX-License-Identifier: BSD-3-Clause

"""Tools for finding immobilizations in tracking data"""
import numpy as np
import pandas as pd
from typing import Mapping, Literal

from .. import config, helper
from ..helper import numba


try:
    import numexpr
except ImportError:
    numexpr_usable = False
else:
    numexpr_usable = True


[docs]@config.set_columns def find_immobilizations(tracks, max_dist, min_duration, label_mobile=True, longest_only=False, rtol=0., atol=0, engine="numba", columns={}): """Find immobilizations in particle trajectories Analyze trajectories and mark parts where all localizations of a particle stay within a circle of radius `max_dist` from their center of mass for at least a certain number (`min_length`) of frames as an immobilization. The `tracks` DataFrame gets a new column ("immob") appended (or overwritten), where every immobilzation is assigned a different number greater or equal than 0. Parts of trajectories that are not immobilized are assigned -1 (if `label_mobile` is `False`) or one negative number per mobile section starting at -2 (if label_mobile is `True`). :py:func:`find_immobilizations_int` uses a slightly different criterion for finding immobilizations. Parameters ---------- tracks : pandas.DataFrame Tracking data max_dist : float Maximum radius within a particle may move while still being considered immobilized min_duration : int Minimum duration the particle has to stay at the same place for a part of a trajectory to be considered immobilized. Duration is the difference of the maximum frame number and the minimum frame number. E. g. if the frames 1, 2, and 4 would lead to a duration of 3. label_mobile : bool, optional Whether to give each mobile track section a distinct label (a negative number starting at -2). Defaults to True. longest_only : bool, optional If True, search only for the longest immobilzation in each track to speed up the process. Defaults to False. rtol : float, optional The fraction of localizations that may not be within `max_dist` of the center of mass while still recognizing the sub-track as immobile. One also has to set the `a_tol` parameter to something non-zero if using this since a sub-track will not be considered immobile if either `r_tol` or `a_tol` are exceeded. Defaults to 0. atol : int, optional The number of localizations that may not be within `max_dist` of the center of mass while still recognizing the sub-track as immobile. One also has to set the `r_tol` parameter to something non-zero if using this since a sub-track will not be considered immobile if either `r_tol` or `a_tol` are exceeded. Defaults to 0. Returns ------- pandas.DataFrame Although the `tracks` DataFrame is modified in place, it is also returned for convenience. See also -------- find_immobilizations_int : Uses a slightly different criterion for finding immobilizations Other parameters ---------------- engine : {"numba", "python"}, optional If `engine` is "numba" and the `numba` package is installed, use the much faster numba-accelerated alogrithm. Otherwise, fall back to a pure python one. Defaults to "numba" columns : dict, optional Override default column names as defined in :py:attr:`config.columns`. Relevant names are `coords`, `time`, and `particle`. This means, if your DataFrame has coordinate columns "x" and "z" and the time column "alt_frame", set ``columns={"coords": ["x", "z"], "time": "alt_frame"}``. """ immob_counter = 0 mob_counter = -2 # New column for `tracks` that holds the immobilization number immob_column = [] # corresponding indices immob_index = [] if engine == "numba" and numba.numba_available: count_immob_func = _count_immob_numba label_mob_func = _label_mob_numba else: count_immob_func = _count_immob_python label_mob_func = _label_mob_python t_sorted = tracks.sort_values([columns["particle"], columns["time"]]) t_split = helper.split_dataframe( t_sorted, columns["particle"], columns["coords"] + [columns["time"]], type="array_list", sort=False, keep_index=True) for pn, t in t_split: coords = np.array(t[1:-1]) # coordinates frames = t[-1].astype(int) # To be appended to immob_column icol = np.full(len(frames), -1, dtype=np.intp) # Count how many localizations are within max_dist to sub-track's # center of mass. # This is computationally expensive. count = count_immob_func(coords, max_dist) # We will divide by zero below with np.errstate(invalid="ignore"): # Number of localizations in sub-track (only valid for upper # triangle) num_locs = (np.arange(1, coords.shape[1]+1)[np.newaxis, :] - np.arange(coords.shape[1])[:, np.newaxis]) # Find immobilizations within tolerance limits immob = ((1 - count/num_locs <= rtol) & (num_locs - count <= atol)) # Get longest possible immobilizations duration = frames[np.newaxis, :] - frames[:, np.newaxis] duration[~immob] = -1 while True: mobile_idx = np.nonzero(icol < 0)[0] if not len(mobile_idx): # All localizations are part of immobilizations. # We are done. break # Durations stripped of all sub-tracks that overlap with # immobilizations that have already been found cur_dur = duration[mobile_idx[:, np.newaxis], mobile_idx[np.newaxis, :]] longest = np.argmax(cur_dur) i = longest // cur_dur.shape[1] j = longest % cur_dur.shape[0] if cur_dur[i, j] < min_duration: # The longest duration is already below the threshold. # We are done. break icol[mobile_idx[i:j+1]] = immob_counter immob_counter += 1 if longest_only: # We are not interested in shorter immobilizations. break if label_mobile: mob_counter = label_mob_func(icol, mob_counter) immob_column.append(icol) immob_index.append(t[0]) tracks["immob"] = pd.Series(np.concatenate(immob_column), index=np.concatenate(immob_index)) return tracks
def _count_immob_python(coords, max_dist): """Count immobilizations in sub-tracks For each sub-track starting at one frame and ending on anther, count how many localizations are within `max_dist` of the sub-track's center of mass Parameters ---------- coords : numpy.ndarray, shape=(ndim, nloc) Coordinate array. Each column is the set of coordinates for one localization frames : numpy.ndarray, shape=(nloc,) Frame number for each localization. This has to be in ascending order. max_dist : float Maximum distance that a localization may have from the sub-track's center of mass to be counted. Returns ------- numpy.ndarray, shape=(nloc, nloc) The [i, j]-th entry of the array holds the number of localizations within `max_dist` of the center of mass of the sub-track starting at the i-th localization and ending at the j-th (inclusive). Only localizations between the i-th and the j-th may be counted (e. g. if the (j+1)-th is still close enough to the center of mass, it will be discarded anyways.) """ # Calculate centers of mass, i. e. means of coordinates of all # sub-tracks sums = np.cumsum(coords, axis=1) # sum coordinates sums2 = np.hstack((np.zeros((coords.shape[0], 1)), sums[:, :-1])) # cm[i, j, k] will be the cm coordinate i of track starting at index j # and ending at k (provided j <= k) # # For fixed i, subtract from row j (where the k-th entry is the sum of # coordinates 0 to k) subtract the sum of coordinates 0 to j-1, # therefore calculating the sum of coordinates j to k. cm = sums[:, np.newaxis, :] - sums2[:, :, np.newaxis] # Save memory del sums, sums2 # Divide sum by number of localizations in track to get center of mass num_locs = (np.arange(1, coords.shape[1]+1)[np.newaxis, :] - np.arange(coords.shape[1])[:, np.newaxis]) cm /= num_locs[np.newaxis, ...] # dist_sq[i, j, k] is the distance of the k-th localization of the # track from center of mass of subtrack from localization i through j cm1 = cm[:, :, :, np.newaxis] co1 = coords[:, np.newaxis, np.newaxis, :] if numexpr_usable: # numexpr is a lot faster in this case dist_sq = numexpr.evaluate("sum((cm1 - co1)**2, axis=0)") else: dist_sq = np.sum((cm1 - co1)**2, axis=0) # Save some memory del cm, cm1, co1 # Select only relevant (i. e. i <= k <= j) from dist_sq i = np.arange(coords.shape[1]) m = ((i[:, np.newaxis, np.newaxis] > i[np.newaxis, np.newaxis, :]) | (i[np.newaxis, np.newaxis, :] > i[np.newaxis, :, np.newaxis])) # Set irrelevant entries to NaN. That way, we can sum over all True # values below to get the number of matches in the relevant range dist_sq[m] = np.nan # count[i, j] gives the number of localizations within max_dist of the # center of mass of the sub-track starting at i and ending at j return np.nansum(dist_sq <= max_dist**2, axis=2) @numba.jit(nopython=True, cache=True) def _count_immob_numba(coords, max_dist): """Count immobilizations in sub-tracks - numba-accellerated Numba-accellerated implementation of :py:func:`_immob_count_python` functionality. A lot faster and less memory-hungry """ max_dist_sq = max_dist**2 ndim = coords.shape[0] nloc = coords.shape[1] count = np.zeros((nloc, nloc), dtype=np.int64) for j in range(nloc): # Sub-track start index s = np.zeros(ndim) # Cum. sum of coordinates for k in range(j, nloc): # Sub-track end index cm = np.zeros(ndim) # Center of mass for sub-track [j, k] for i in range(ndim): s[i] += coords[i, k] # Add to cum. sum cm[i] = s[i] / (k - j + 1) # Divide by number of locs for m in range(j, k+1): # for each loc of sub-track [j, k] dist = 0. for i in range(ndim): # sum up coord dist**2 to center of mass dist += (cm[i] - coords[i, m])**2 if dist <= max_dist_sq: count[j, k] += 1 # and count if within max_dist return count
[docs]@config.set_columns def find_immobilizations_int(tracks, max_dist, min_duration, label_mobile=True, longest_only=False, columns={}): """Find immobilizations in particle trajectories Analyze trajectories and mark parts where all localizations of a particle stay within a circle of radius `max_dist` for at least a certain number (`min_length`) of frames as an immobilization. In other words: If consecutive localizations stay within the intersection of circles of radius `max_dist` and coordinates of the localizations as centers, they are considered immobilized. This is different from :py:func:`find_immobilizations`. The `tracks` DataFrame gets a new column ("immob") appended (or overwritten), where every immobilzation is assigned a different number greater or equal than 0. Parts of trajectories that are not immobilized are assigned -1 (if `label_mobile` is `False`) or one negative number per mobile section starting at -2 (if label_mobile is `True`). Parameters ---------- tracks : pandas.DataFrame Tracking data max_dist : float Maximum radius within a particle may move while still being considered immobilized min_duration : int Minimum duration the particle has to stay at the same place for a part of a trajectory to be considered immobilized. Duration is the difference of the maximum frame number and the minimum frame number. E. g. if the frames 1, 2, and 4 would lead to a duration of 3. label_mobile : bool, optional Whether to give each mobile track section a distinct label (a negative number starting at -2). Defaults to True. longest_only : bool, optional If True, search only for the longest immobilzation in each track to speed up the process. Defaults to False. Returns ------- pandas.DataFrame Although the `tracks` DataFrame is modified in place, it is also returned for convenience. See also -------- find_immobilizations : Uses a slightly different criterion for finding immobilizations Other parameters ---------------- columns : dict, optional Override default column names as defined in :py:attr:`config.columns`. Relevant names are `coords`, `time`, and `particle`. This means, if your DataFrame has coordinate columns "x" and "z" and the time column "alt_frame", set ``columns={"coords": ["x", "z"], "time": "alt_frame"}``. """ max_dist_sq = max_dist**2 immob_counter = 0 mob_counter = -2 # new column for `tracks` that holds the immobilization number immob_column = [] # corresponding indices immob_index = [] t_sorted = tracks.sort_values([columns["particle"], columns["time"]]) t_split = helper.split_dataframe( t_sorted, columns["particle"], columns["coords"] + [columns["time"]], type="array_list", sort=False, keep_index=True) for pn, t in t_split: pos = np.array(t[1:-1]).T # coordinates frames = t[-1].astype(int) # To be appended to immob_column icol = np.full(len(frames), -1, dtype=np.intp) # d[i, j, k] is the difference of the k-th coordinate of # loc `i` and loc `j` in the current track d = pos[:, np.newaxis, :] - pos[np.newaxis, :, :] # Euclidian distance squared d = np.sum(d**2, axis=2) close_enough = (d <= max_dist_sq) # A block of `True` around the diagonal means that for each # localization of a track all other localizations in temporal # proximity are within `max_dist` start, end = _find_diag_blocks(close_enough) first = frames[start] last = frames[end] duration = last - first # array that has start index, end index (+1 to include end in slices), # duration as columns immob = np.column_stack((start, end+1, duration)) # take only those that are long enough immob = immob[duration >= min_duration] if immob.size: # sort by duration immob = immob[immob[:, 2].argsort()[::-1]] cur_row = 0 while cur_row < len(immob): s, e, d = immob[cur_row] is_overlap = icol[s:e] != -1 if is_overlap.any(): # if it overlaps with a longer immobilization, strip # overlapping frames if is_overlap.all(): # here argmin won't work e = s d = 0 else: s += np.argmin(is_overlap) e -= np.argmin(is_overlap[::-1]) d = frames[e-1] - frames[s] # re-sort remaining part of immob immob[cur_row] = (s, e, d) immob = immob[cur_row:] immob = immob[immob[:, 2] >= min_duration] immob = immob[immob[:, 2].argsort()[::-1]] cur_row = 0 continue icol[s:e] = immob_counter immob_counter += 1 cur_row += 1 if longest_only: # break after first iteration break if label_mobile: mob_counter = _label_mob_python(icol, mob_counter) immob_column.append(icol) immob_index.append(t[0]) tracks["immob"] = pd.Series(np.concatenate(immob_column), index=np.concatenate(immob_index)) return tracks
def _find_diag_blocks(a): """Find diagonal blocks of in a boolean matrix Find all square blocks of value `True` on the diagonal of a symmetric array. Parameters ---------- a : numpy.ndarray Boolean array, which has to be symmetric. Returns ------- start, end : numpy.ndarray 1D arrays containing start and end row numbers for each block Examples -------- >>> a = numpy.array([[1, 1, 0], [1, 1, 1], [0, 1, 1]]) >>> _find_diag_blocks(a) (array([0, 1]), array([1, 2])) """ a = a.copy() # Set lower triangle to True so that only entries in upper triangle # are found when searching for False a[np.tri(*a.shape, -1, dtype=bool)] = True # Find first False entry in every row. If (and only if) there is none, # min_col will be 0 for that column, thus set it to a.shape[1] min_col = np.argmin(a, axis=1) min_col[min_col == 0] = a.shape[1] # If the difference of two consecutive indices is larger than 0, a new # block starts # e. g. for [[1, 1, 0], [1, 1, 1], [0, 1, 1]], min_col is [2, 3, 3], # the diff is [1, 0] while True: col_diff = np.diff(min_col) # if diff is < 0 somewhere, this may lead to false positives for the # preceding rows. E. g. if diff is -3 here and was 4 for the previous # row, the "net diff" is 1. neg_idx = np.nonzero(col_diff < 0)[0] if not len(neg_idx): break # overwrite the preceding value so that the diff is 0 and retry # this is fine since we are only interested in positive diffs below min_col[neg_idx] = min_col[neg_idx + 1] is_start = np.hstack(([True], col_diff > 0)) # first row is always start # To determine where blocks end, one has to basically do the same as for # starts, only with rows in reversed order and look for diff < 0 min_row = np.argmin(a[::-1, :], axis=0) min_row[min_row == 0] = a.shape[0] while True: row_diff = np.diff(min_row) pos_idx = np.nonzero(row_diff > 0)[0] if not len(pos_idx): break min_row[pos_idx + 1] = min_row[pos_idx] is_end = np.hstack((row_diff < 0, [True])) return is_start.nonzero()[0], is_end.nonzero()[0] def _label_mob_python(immob_col, start): """Give each mobile section of a track a label (unique number) Parameters ---------- immob_col : numpy.ndarray The immobilization number column for one particle. Elements which are -1 are considered unlabeled mobile localizations and will be assigned a label. start : int Start label. Should be a negative number as this gets decreased for each mobile section. Returns ------- int The last label decreased by one. This can be used as `start` for the next particle. """ mob = (immob_col == -1).astype(int) d = np.diff(mob, 1) begin = np.nonzero(d == 1)[0] + 1 end = np.nonzero(d == -1)[0] + 1 if mob[0] == 1: begin = np.hstack(([0], begin)) if mob[-1] == 1: end = np.hstack((end, [len(mob)])) for b, e in zip(begin, end): immob_col[b:e] = start start -= 1 return start @numba.jit(nopython=True, cache=True) def _label_mob_numba(immob_col, start): """numba-accelerated version of :py:func:`_label_mob_python`""" if not immob_col.size: return start is_mob = False for i in range(len(immob_col)): if immob_col[i] == -1: immob_col[i] = start is_mob = True elif is_mob: # is_mob is True, so last entry was still mobile; decrease label start -= 1 is_mob = False if is_mob: start -= 1 return start
[docs]@config.set_columns def label_mobile(tracks: pd.DataFrame, engine: Literal["numba", "python"] = "numba", columns: Mapping = {}) -> pd.DataFrame: """Give each mobile section of a track a label (unique number) When calling :py:func:`find_immobilizations` with ``label_mobile=False``, all mobile sections of tracks will have `-1` in their `immob` column. This function assigns a unique (negative) number to each section, starting at `-2`. The `data` DataFrame will be modified in place, but also returned. Parameters ---------- tracks Tracking data with an `immob` column where all mobile sections of tracks are assigned `-1`. Returns ------- `tracks` with updated ``"immob"`` column. Other parameters ---------------- engine If `engine` is "numba" and the `numba` package is installed, use the much faster numba-accelerated alogrithm. Otherwise, fall back to a pure python one. Defaults to "numba" columns Override default column names as defined in :py:attr:`config.columns`. The only relevant name is `particle`. """ if engine == "numba" and numba.numba_available: label_mob_func = _label_mob_numba else: label_mob_func = _label_mob_python new_immob_col = [] new_immob_index = [] t_sorted = tracks.sort_values([columns["particle"], columns["time"]]) t_split = helper.split_dataframe( t_sorted, columns["particle"], ["immob"], type="array_list", sort=False, keep_index=True) counter = -2 for pn, t in t_split: icol = t[1] counter = label_mob_func(icol, counter) new_immob_col.append(icol) new_immob_index.append(t[0]) tracks["immob"] = pd.Series(np.concatenate(new_immob_col), index=np.concatenate(new_immob_index)) return tracks