Source code for sdt.changepoint.bayes_offline

# Copyright (c) 2014 Johannes Kulick
# SPDX-FileCopyrightText: 2020 Lukas Schrangl <lukas.schrangl@tuwien.ac.at>
#
# SPDX-License-Identifier: BSD-3-Clause
# SPDX-License-Identifier: MIT
#
# Based on https://github.com/hildensia/bayesian_changepoint_detection
# (MIT licensed), adapted under BSD-3-Clause as part of the sdt-python
# package

"""Tools for performing offline Bayesian changepoint detection"""
import math

import numpy as np
import scipy.special
import scipy.misc
import scipy.signal

from ..helper import numba


_log_pi = math.log(np.pi)


[docs]class ConstPrior: """Class implementing a constant prior :math:`P(t) = 1 / (\\text{len}(data) + 1)` """ def __init__(self): self._data = np.empty((0, 0)) self._prior = np.NaN
[docs] def set_data(self, data): """Set data for calculation of the prior Parameters ---------- data : numpy.ndarray, shape(n, m) m datasets of n data points """ self._data = data self._prior = 1 / (len(data) + 1)
[docs] def prior(self, t): """Get prior at time `t`. Parameters ---------- t : int Time point Returns ------- float Prior probability for time point `t` """ return self._prior
ConstPriorNumba = numba.jitclass( [("_data", numba.float64[:, :]), ("_prior", numba.float64)])(ConstPrior)
[docs]class GeometricPrior: """Class implementing a geometrically distributed prior :math:`P(t) = p (1 - p)^{t - 1}` """ def __init__(self, p): """Parameters ---------- p : float `p` in the forumla above """ self._data = np.empty((0, 0)) self.p = p
[docs] def set_data(self, data): """Set data for calculation of the prior Parameters ---------- data : numpy.ndarray, shape(n, m) m datasets of n data points """ self._data = data
[docs] def prior(self, t): """Get prior at time `t`. Parameters ---------- t : int Time point Returns ------- float Prior probability for time point `t` """ return self.p * (1 - self.p)**(t - 1)
GeometricPriorNumba = numba.jitclass( [("_data", numba.float64[:, :]), ("p", numba.float64)])(GeometricPrior)
[docs]class NegBinomialPrior: r"""Class implementing a neg-binomially distributed prior :math:`P(t) = {{t - k}\choose{k - 1}} p^k (1 - p)^{t - k}` """ def __init__(self, k, p): """Parameters ---------- k : int `k` in the formula above p : float `p` in the formula above """ self._data = np.empty((0, 0)) self.p = p self.k = k def set_data(self, data): """Set data for calculation of the prior Parameters ---------- data : numpy.ndarray, shape(n, m) m datasets of n data points """ self._data = data def prior(self, t): """Get prior at time `t`. Parameters ---------- t : int Time point Returns ------- float Prior probability for time point `t` """ return (scipy.special.comb(t - self.k, self.k - 1) * self.p**self.k * (1 - self.p)**(t - self.k))
class _DynPLikelihood: """Base class for caching observation likelihood results""" def __init__(self): self._cache = {} def set_data(self, data): """Set data for calculation of the likelihood Parameters ---------- data : numpy.ndarray, shape(n, m) m datasets of n data points """ self._cache = {} self._data = data def likelihood(self, t, s): """Get likelihood Parameters ---------- t, s : int First and last time point to consider Returns ------- float likelihood """ if (t, s) not in self._cache: self._cache[(t, s)] = self._likelihood(t, s) return self._cache[(t, s)] class _GaussianObsLikelihoodBase: """Gaussian observation likelihood""" def __init__(self): self._data = np.empty((0, 0)) def set_data(self, data): """Set data for calculation of the likelihood Parameters ---------- data : numpy.ndarray, shape(n, m) m datasets of n data points """ self._data = data def likelihood(self, t, s): """Get likelihood Parameters ---------- t, s : int First and last time point to consider Returns ------- float likelihood """ return self._likelihood(t, s) def _likelihood(self, t, s): """Actual implementation of the `likelihood` method""" s += 1 n = s - t mean = self._data[t:s].sum(0) / n muT = n * mean / (1 + n) nuT = 1 + n alphaT = 1 + n / 2 betaT = (1 + 0.5 * ((self._data[t:s] - mean)**2).sum(0) + n / (1 + n) * mean**2 / 2) scale = betaT * (nuT + 1) / (alphaT * nuT) prob = np.sum(np.log(1 + (self._data[t:s] - muT)**2 / (nuT * scale))) lgA = (math.lgamma((nuT + 1) / 2) - np.log(np.sqrt(np.pi * nuT * scale)) - math.lgamma(nuT / 2)) return np.sum(n * lgA - (nuT + 1) / 2 * prob)
[docs]class GaussianObsLikelihood(_DynPLikelihood, _GaussianObsLikelihoodBase): """Gaussian observation likelihood""" pass
GaussianObsLikelihoodNumba = numba.jitclass( [("_data", numba.float64[:, :])])(_GaussianObsLikelihoodBase) class _IfmObsLikelihoodBase: """Independent features model from Xuan et al. See *Xuan Xiang, Kevin Murphy: "Modeling Changing Dependency Structure in Multivariate Time Series", ICML (2007), pp. 1055--1062*. """ def __init__(self): self._data = np.empty((0, 0)) def set_data(self, data): """Set data for calculation of the likelihood Parameters ---------- data : numpy.ndarray, shape(n, m) m datasets of n data points """ self._data = data def likelihood(self, t, s): """Get likelihood Parameters ---------- t, s : int First and last time point to consider Returns ------- float likelihood """ return self._likelihood(t, s) def _likelihood(self, t, s): """Actual implementation of the `likelihood` method""" s += 1 n = s - t x = self._data[t:s] d = x.shape[1] N0 = d # Weakest prior we can use to retain proper prior V0 = np.var(x) Vn = V0 + (x**2).sum(0) # Sum over dimension and return (section 3.1 from Xuan paper): return (d * (-(n / 2) * _log_pi + (N0 / 2) * np.log(V0) - math.lgamma(N0 / 2) + math.lgamma((N0 + n) / 2)) - np.sum(((N0 + n) / 2) * np.log(Vn), axis=0))
[docs]class IfmObsLikelihood(_DynPLikelihood, _IfmObsLikelihoodBase): """Independent features model from Xuan et al. See *Xuan Xiang, Kevin Murphy: "Modeling Changing Dependency Structure in Multivariate Time Series", ICML (2007), pp. 1055--1062*. """ pass
IfmObsLikelihoodNumba = numba.jitclass( [("_data", numba.float64[:, :])])(_IfmObsLikelihoodBase)
[docs]class FullCovObsLikelihood(_DynPLikelihood): """Full covariance model from Xuan et al. See *Xuan Xiang, Kevin Murphy: "Modeling Changing Dependency Structure in Multivariate Time Series", ICML (2007), pp. 1055--1062*. """ def __init__(self): super().__init__() self._data = np.empty((0, 0)) def _likelihood(self, t, s): s += 1 n = s - t x = self._data[t:s] dim = x.shape[1] N0 = dim # weakest prior we can use to retain proper prior V0 = np.var(x) * np.eye(dim) Vn = V0 + np.einsum("ij, ik -> jk", x, x) # section 3.2 from Xuan paper: return (-(dim * n / 2) * _log_pi + N0 / 2 * np.linalg.slogdet(V0)[1] - scipy.special.multigammaln(N0 / 2, dim) + scipy.special.multigammaln((N0 + n) / 2, dim) - (N0 + n) / 2 * np.linalg.slogdet(Vn)[1])
@numba.jitclass([("_data", numba.float64[:, :])]) class FullCovObsLikelihoodNumba: """Full covariance model from Xuan et al. See *Xuan Xiang, Kevin Murphy: "Modeling Changing Dependency Structure in Multivariate Time Series", ICML (2007), pp. 1055--1062*. """ def __init__(self): self._data = np.empty((0, 0)) def set_data(self, data): """Set data for calculation of the likelihood Parameters ---------- data : numpy.ndarray, shape(n, m) m datasets of n data points """ self._data = data def likelihood(self, t, s): """Get likelihood Parameters ---------- t, s : int First and last time point to consider Returns ------- float likelihood """ s += 1 n = s - t x = self._data[t:s] dim = x.shape[1] N0 = dim # weakest prior we can use to retain proper prior V0 = np.var(x) * np.eye(dim) einsum = np.zeros((x.shape[1], x.shape[1])) for j in range(x.shape[1]): for k in range(x.shape[1]): for i in range(x.shape[0]): einsum[j, k] += x[i, j] * x[i, k] Vn = V0 + einsum # section 3.2 from Xuan paper: return (-(dim * n / 2) * _log_pi + N0 / 2 * np.linalg.slogdet(V0)[1] - numba.multigammaln(N0 / 2, dim) + numba.multigammaln((N0 + n) / 2, dim) - (N0 + n) / 2 * np.linalg.slogdet(Vn)[1]) class _ScipyLogsumexp: """Wrapper class for `scipy.special.logsumexp` Necessary because functions cannot be passed as arguments to numba jitted functions, but jitclasses can. """ def call(self, *args, **kwargs): return scipy.special.logsumexp(*args, **kwargs) @numba.jitclass([]) class _NumbaLogsumexp: """Wrapper class for `sdt.helper.numba.logsumexp` Necessary because functions cannot be passed as arguments to numba jitted functions, but jitclasses can. """ def __init__(self): pass def call(self, a): return numba.logsumexp(a) def segmentation(prior, obs_likelihood, truncate, logsumexp_wrapper): """Bayesian offline changepoint detection (actual implementation) This is an implementation of *Fearnhead, Paul: "Exact and efficient Bayesian inference for multiple changepoint problems", Statistics and computing 16.2 (2006), pp. 203--213*. Parameters ---------- prior : class instance Instance of a class implementing a prior probability. See :py:class:`ConstPrior`, :py:class:`GeometricPrior`, or :py:class:`NegBinomialPrior` for examples. obs_likelihood : class instance Instance of a class implementing the observation likelihood. See :py:class:`GaussianObsLikelihood`, :py:class:`IfmObsLikelihood`, or :py:class:`FullCovObsLikelihood` for examples. truncate : float Speed up calculations by truncating a sum if the summands provide negligible contributions. This parameter is the exponent of the threshold. A sensible value would be e.g. -20. Defaults to -inf, i.e. no truncation. logsumexp_wrapper : _ScipyLogsumexp or _NumbaLogsumexp Which ``logsumexp`` function to use Returns ------- q : numpy.ndarray ``Q[t]`` is the log-likelihood of data ``[t, n]``.. p : numpy.ndarray ``P[t, s]`` is the log-likelihood of a datasequence ``[t, s]``, given there is no changepoint between ``t`` and ``s``. pcp : numpy.ndarray ``Pcp[i, t]`` is the log-likelihood that the ``i``-th changepoint is at time step ``t``. To actually get the probility of a changepoint at time step ``t``, sum the probabilities (which is `prob`). """ n = len(prior._data) Q = np.zeros(n) g = np.zeros(n) G = np.zeros(n) P = np.full((n, n), -np.inf) # Save everything in log representation for t in range(n): g[t] = np.log(prior.prior(t)) if t == 0: G[t] = g[t] else: G[t] = np.logaddexp(G[t-1], g[t]) P[n-1, n-1] = obs_likelihood.likelihood(n-1, n) Q[n-1] = P[n-1, n-1] for t in range(n-2, -1, -1): P_next_cp = -np.inf # == log(0) for s in range(t, n-1): P[t, s] = obs_likelihood.likelihood(t, s+1) # Compute recursion summand = P[t, s] + Q[s+1] + g[s+1-t] P_next_cp = np.logaddexp(P_next_cp, summand) # Truncate sum to become approx. linear in time (see # Fearnhead, 2006, eq. (3)) if ((np.isfinite(summand) or np.isfinite(P_next_cp)) and summand - P_next_cp < truncate): break P[t, n-1] = obs_likelihood.likelihood(t, n) # (1 - G) is numerical stable until G becomes numerically 1 if G[n-1-t] < -1e-15: # exp(-1e-15) = .99999... antiG = np.log(1 - np.exp(G[n-1-t])) else: # (1 - G) is approx. -log(G) for G close to 1 antiG = np.log(-G[n-1-t]) Q[t] = np.logaddexp(P_next_cp, P[t, n-1] + antiG) Pcp = np.full((n-1, n), -np.inf) for t in range(n-1): Pcp[0, t+1] = P[0, t] + Q[t + 1] + g[t] - Q[0] if np.isnan(Pcp[0, t+1]): Pcp[0, t+1] = -np.inf for j in range(1, n-1): for t in range(j, n-1): tmp_cond = (Pcp[j-1, j:t+1] + P[j:t+1, t] + Q[t + 1] + g[0:t-j+1] - Q[j:t+1]) Pcp[j, t+1] = logsumexp_wrapper.call(tmp_cond) if np.isnan(Pcp[j, t+1]): Pcp[j, t+1] = -np.inf return Q, P, Pcp segmentation_numba = numba.jit(nopython=True, nogil=True)(segmentation)
[docs]class BayesOffline: """Bayesian offline changepoint detector This is an implementation of [Fear2006]_ based on the one from the `bayesian_changepoint_detection <https://github.com/hildensia/bayesian_changepoint_detection>`_ python package. """ prior_map = dict(const=(ConstPrior, ConstPriorNumba), geometric=(GeometricPrior, GeometricPriorNumba), neg_binomial=(NegBinomialPrior, None)) likelihood_map = dict(gauss=(GaussianObsLikelihood, GaussianObsLikelihoodNumba), ifm=(IfmObsLikelihood, IfmObsLikelihoodNumba), full_cov=(FullCovObsLikelihood, FullCovObsLikelihoodNumba)) def __init__(self, prior="const", obs_likelihood="gauss", prior_params={}, obs_likelihood_params={}, numba_logsumexp=True, engine="numba"): """Parameters ---------- prior : {"const", "geometric", "neg_binomial"} or prior class, optional Prior probabiltiy. This can either be a string describing the prior or a type or instance of a class implementing the prior, as for example :py:class:`ConstPrior`, :py:class:`GeometricPrior`, or :py:class:`NegBinomialPrior`. If a string or a type ar passed, a class instance will be created passing `prior_params` to ``__init__``. If "const", use :py:class:`ConstPrior`. If "geometric", use :py:class:`GeometricPrior`. If "neg_binomial", use :py:class:`GeometricPrior`. Defaults to "const". obs_likelihood : {"gauss", "ifm", "full_cov"} or likelihood class, opt. Observation likelhood. This can either be a string describing the likelihood or a type or instance of a class implementing the likelihood, as for example :py:class:`GaussianObsLikelihood`, :py:class:`IfmObsLikelihood`, or :py:class:`FullCovObsLikelihood`. If a string or a type ar passed, a class instance will be created passing `obs_likelihood_params` to ``__init__``. If "gauss", use :py:class:`GaussianObsLikelihood`. If "ifm", use :py:class:`IfmObsLikelihood`. If "full_cov", use :py:class:`FullCovObsLikelihood`. Defaults to "gauss". For multivariate data, "ifm" or "full_cov" is recommended. prior_params : dict, optional Parameters to `prior`'s ``__init__`` if it needs to be constructed. Defaults to {}. obs_likelihood_params : dict, optional Parameters to `obs_likelihood`'s ``__init__`` if it needs to be constructed. Defaults to {}. engine : {"python", "numba"}, optional If "numba", use the numba-accelerated implementation. Defaults to "numba". Other parameters ---------------- numba_logsumexp : bool, optional If True, use numba-accelerated :py:func:`logsumexp`, otherwise use :py:func:`scipy.special.logsumexp`. Defaults to True. """ use_numba = (engine == "numba") and numba.numba_available if isinstance(prior, str): prior = self.prior_map[prior][int(use_numba)] if isinstance(prior, type): prior = prior(**prior_params) self.prior = prior if isinstance(obs_likelihood, str): obs_likelihood = self.likelihood_map[obs_likelihood] obs_likelihood = obs_likelihood[int(use_numba)] if isinstance(obs_likelihood, type): obs_likelihood = obs_likelihood(**obs_likelihood_params) self.obs_likelihood = obs_likelihood self.segmentation = segmentation_numba if use_numba else segmentation if numba_logsumexp and numba.numba_available: self.logsumexp = _NumbaLogsumexp() else: self.logsumexp = _ScipyLogsumexp()
[docs] def find_changepoints(self, data, prob_threshold=None, full_output=False, truncate=-20): """Find changepoints in datasets Parameters ---------- data : array-like Data array prob_threshold : float or None, optional If this is a float, local maxima in the changepoint probabilities are considered changepoints, if they are above the threshold. In that case, an array of changepoints is returned. If `None`, an array of probabilities is returned. Defaults to `None`. full_output : bool, optional Whether to return only the probabilities for a changepoint as a function of time or the full information. Defaults to False, i.e. only probabilities. Returns ------- cp_or_prob : numpy.ndarray Probabilities for a changepoint as a function of time (if ``prob_threshold=None``) or the enumeration of changepoints (if `prob_threshold` is not `None`). q : numpy.ndarray ``Q[t]`` is the log-likelihood of data ``[t, n]``. Only returned if ``full_output=True`` and ``prob_threshold=None``. p : numpy.ndarray ``P[t, s]`` is the log-likelihood of a datasequence ``[t, s]``, given there is no changepoint between ``t`` and ``s``. Only returned if ``full_output=True`` and ``prob_threshold=None``. pcp : numpy.ndarray ``Pcp[i, t]`` is the log-likelihood that the ``i``-th changepoint is at time step ``t``. To actually get the probility of a changepoint at time step ``t``, sum the probabilities (which is `prob`). Only returned if ``full_output=True`` and ``prob_threshold=None``. Other parameters ---------------- truncate : float, optional Speed up calculations by truncating a sum if the summands provide negligible contributions. This parameter is the exponent of the threshold. Set to ``-inf`` to turn off. Defaults to -20. """ if data.ndim == 1: data = data.reshape((-1, 1)) self.prior.set_data(data) self.obs_likelihood.set_data(data) Q, P, Pcp = self.segmentation(self.prior, self.obs_likelihood, truncate, self.logsumexp) prob = np.exp(Pcp).sum(axis=0) if prob_threshold is not None: lmax = scipy.signal.argrelmax(prob)[0] return lmax[prob[lmax] >= prob_threshold] elif full_output: return prob, Q, P, Pcp else: return prob