Source code for sdt.changepoint.pelt

# SPDX-FileCopyrightText: 2020 Lukas Schrangl <lukas.schrangl@tuwien.ac.at>
#
# SPDX-License-Identifier: BSD-3-Clause

"""PELT changepoint detection"""
import math

import numpy as np

from ..helper import numba


[docs]class CostL1: r"""L1 norm cost The cost is :math:`\sum_i |y_i - \operatorname{median}(y)|`. Attributes ---------- min_size : int Minimum size of a segment that works with this cost function """ def __init__(self): self.min_size = 2 self._data = np.empty((0, 0))
[docs] def set_data(self, data): """Set data for cost function Parameters ---------- data : numpy.ndarray, shape(n, m) m datasets of n data points """ self._data = data
[docs] def cost(self, t, s): """Calculate cost from time `t` to time `s` Parameters ---------- t, s : int Start and end point Returns ------- float Cost """ if s - t < self.min_size: raise ValueError("t - s less than min_size") sub = self._data[t:s] # Cannot use axis=0 argument in numba med = np.empty(sub.shape[1]) for i in range(sub.shape[1]): med[i] = np.median(sub[:, i]) return np.abs(sub - med).sum()
CostL1Numba = numba.jitclass( [("min_size", numba.int64), ("_data", numba.float64[:, :])])( CostL1)
[docs]class CostL2: r"""L2 norm cost The cost is :math:`\operatorname{var}(y) Δt`, where :math:`Δt` is the duration of the segment. Attributes ---------- min_size : int Minimum size of a segment that works with this cost function """ def __init__(self): self.min_size = 2 self._data = np.empty((0, 0))
[docs] def set_data(self, data): """Set data for cost function Parameters ---------- data : numpy.ndarray, shape(n, m) m datasets of n data points """ self._data = data
[docs] def cost(self, t, s): """Calculate cost from time `t` to time `s` Parameters ---------- t, s : int Start and end point Returns ------- float Cost """ if s - t < self.min_size: raise ValueError("t - s less than min_size") sub = self._data[t:s] # Cannot use axis=0 argument in numba var = np.empty(sub.shape[1]) for i in range(sub.shape[1]): var[i] = np.var(sub[:, i]) return var.sum() * (s - t)
CostL2Numba = numba.jitclass( [("min_size", numba.int64), ("_data", numba.float64[:, :])])( CostL2) def segmentation(cost, min_size, jump, penalty, max_exp_cp): """PELT changepoint detection Implementation of the PELT algorithm [1]_. It is compatible with the implementation from `ruptures <https://github.com/deepcharles/ruptures>`_. Usually, this function will not be used directly. Users may find the :py:class:`Pelt` class more convenient. Parameters ---------- cost : cost class instance This needs a `data` attribute with the data to find changepoints in and a `cost` function that computes the cost of a data segment. See :py:class:`CostL1` and :py:class:`CostL2` for details. min_size : int Minimum length of segments between change points jump : int Consider only every `jump`-th data point to speed up calculation. penalty : float Penalty of creating a new changepoint max_exp_cp : int Expected maximum number of changepoints. Memory for at least this many changepoints will be pre-allocated. If there are more, a larger array will be allocated, but this incurs some overhead. Returns ------- numpy.ndarray, shape(n) Array of changepoints References ---------- .. [1] Killick et al.: "Optimal Detection of Changepoints With a Linear Computational Cost", Journal of the American Statistical Association, Informa UK Limited, 2012, 107, 1590–1598 """ n_samples = len(cost._data) times = np.arange(0, n_samples + jump, jump) times[-1] = n_samples min_idx_diff = math.ceil(min_size/jump) if len(times) <= min_idx_diff: return np.empty(0, dtype=np.int64) costs = np.full(len(times), np.inf) costs[0] = 0 le = len(times) - min_idx_diff partitions = np.empty(le * max_exp_cp, dtype=np.int64) partition_starts = np.zeros(len(times) + 1, dtype=np.int64) start_idx = np.zeros(1, dtype=np.int64) for new_start, end_idx in enumerate(range(min_idx_diff, len(times))): new_costs = np.empty_like(start_idx, dtype=np.float64) for j, s in enumerate(start_idx): new_costs[j] = cost.cost(times[s], times[end_idx]) + penalty new_costs += costs[start_idx] best_idx = np.argmin(new_costs) best_cost = new_costs[best_idx] best_real_idx = start_idx[best_idx] if best_real_idx == 0: best_part = np.empty(0, dtype=np.int64) else: best_part_start = partition_starts[best_real_idx] best_part_end = partition_starts[best_real_idx+1] best_part = partitions[best_part_start:best_part_end] if end_idx == len(times) - 1: return times[best_part] new_partition = np.empty(len(best_part)+1, dtype=np.int64) new_partition[:-1] = best_part new_partition[-1] = end_idx new_part_start = partition_starts[end_idx] new_part_end = new_part_start + len(new_partition) while new_part_end > partitions.size: old_part = partitions partitions = np.empty(2 * old_part.size, dtype=np.int64) partitions[:old_part.size] = old_part partitions[new_part_start:new_part_end] = new_partition partition_starts[end_idx+1] = new_part_end costs[end_idx] = best_cost s2 = start_idx[new_costs <= best_cost + penalty] start_idx = np.empty(len(s2) + 1, dtype=np.int64) start_idx[:-1] = s2 start_idx[-1] = new_start + 1 segmentation_numba = numba.jit(nopython=True, nogil=True)(segmentation)
[docs]class Pelt: """PELT changepoint detection Implementation of the PELT algorithm [Kill2012]_. It is compatible with the implementation from `ruptures <https://github.com/deepcharles/ruptures>`_. Examples -------- >>> det = Pelt(cost="l2", min_size=1, jump=1) >>> # Make some data with a changepoint at t = 10 >>> data = np.concatenate([np.ones(10), np.zeros(10)]) >>> det.find_changepoints(data, 1) array([10]) """ cost_map = dict(l1=(CostL1, CostL1Numba), l2=(CostL2, CostL2Numba)) def __init__(self, cost="l2", min_size=2, jump=1, cost_params={}, engine="numba"): """Parameters ---------- cost : cost class or cost class instance or str, optional If "l1", use :py:class:`CostL1`, "l2", use :py:class:`CostL2`. A cost class type or instance can be passed directly. min_size : int, optional Minimum length of segments between change points. Defaults to 2. jump : int, optional Consider only every `jump`-th data point to speed up calculation. Defaults to 5. cost_params : dict, optional Parameters to pass to the cost class's `__init__`. Only relevant if `cost` is not a class instance. Defaults to {}. engine : {"python", "numba"}, optional If "numba", use the numba-accelerated implementation. Defaults to "numba". """ use_numba = (engine == "numba") and numba.numba_available if isinstance(cost, str): cost = self.cost_map[cost][int(use_numba)] if isinstance(cost, type): cost = cost(**cost_params) self.cost = cost self.segmentation = segmentation_numba if use_numba else segmentation self._min_size = max(min_size, self.cost.min_size) self._jump = jump
[docs] def find_changepoints(self, data, penalty, max_exp_cp=10): """Do the changpoint detection on `data` Parameters ---------- data : numpy.ndarray, shape(n, m) m datasets of n data points penalty : float Penalty of creating a new changepoint max_exp_cp : int, optional Expected maximum number of changepoints. Memory for at least this many changepoints will be pre-allocated. If there are more, a larger array will be allocated, but this incurs some overhead. Defaults to 10. Returns ------- numpy.ndarray, shape(n) Array of changepoints """ if data.ndim == 1: data = data.reshape((-1, 1)) self.cost.set_data(data) return self.segmentation(self.cost, self._min_size, self._jump, penalty, max_exp_cp)