# Copyright (c) 2014 Johannes Kulick
# SPDX-FileCopyrightText: 2020 Lukas Schrangl <lukas.schrangl@tuwien.ac.at>
#
# SPDX-License-Identifier: BSD-3-Clause
# SPDX-License-Identifier: MIT
#
# Based on https://github.com/hildensia/bayesian_changepoint_detection
# (MIT licensed), adapted under BSD-3-Clause as part of the sdt-python
# package
"""Tools for performing online Bayesian changepoint detection"""
import math
import numpy as np
from scipy import stats, signal
from ..helper import numba
_jit = numba.jit(nopython=True, nogil=True)
[docs]class ConstHazard:
"""Class implementing a constant hazard function"""
def __init__(self, time_scale):
"""Parameters
----------
time_scale : float
Time scale
"""
self.time_scale = time_scale
[docs] def hazard(self, run_lengths):
"""Calculate hazard
Parameters
----------
run_lengths : numpy.ndarray
Run lengths
Returns
-------
numpy.ndarray
Hazards for run lengths
"""
return 1 / self.time_scale * np.ones(run_lengths.shape)
ConstHazardNumba = numba.jitclass(
[("time_scale", numba.float64)])(ConstHazard)
[docs]class StudentT:
"""Student T observation likelihood"""
def __init__(self, alpha, beta, kappa, mu):
"""Parameters
----------
alpha, beta, kappa, mu : float
Distribution parameters
"""
self._alpha0 = np.float64(alpha)
self._beta0 = np.float64(beta)
self._kappa0 = np.float64(kappa)
self._mu0 = np.float64(mu)
self.reset()
[docs] def reset(self):
"""Reset state"""
self._alpha = np.full(1, self._alpha0)
self._beta = np.full(1, self._beta0)
self._kappa = np.full(1, self._kappa0)
self._mu = np.full(1, self._mu0)
[docs] def pdf(self, data):
"""Calculate probability density function (PDF)
Parameters
----------
data : array-like
Data points for which to calculate the PDF
"""
df = 2 * self._alpha
loc = self._mu
scale = np.sqrt(self._beta * (self._kappa + 1) /
(self._alpha * self._kappa))
return stats.t.pdf(data, df, loc, scale)
[docs] def update_theta(self, data):
"""Update parameters for every possible run length
Parameters
----------
data : array-like
Data points to use for update
"""
muT0 = np.empty(len(self._mu) + 1)
muT0[0] = self._mu0
muT0[1:] = (self._kappa * self._mu + data) / (self._kappa + 1)
kappaT0 = np.empty(len(self._kappa) + 1)
kappaT0[0] = self._kappa0
kappaT0[1:] = self._kappa + 1.
alphaT0 = np.empty(len(self._alpha) + 1)
alphaT0[0] = self._alpha0
alphaT0[1:] = self._alpha + 0.5
betaT0 = np.empty(len(self._beta) + 1)
betaT0[0] = self._beta0
betaT0[1:] = (self._beta + (self._kappa * (data - self._mu)**2) /
(2. * (self._kappa + 1.)))
self._mu = muT0
self._kappa = kappaT0
self._alpha = alphaT0
self._beta = betaT0
@_jit
def t_pdf(x, df, loc=0, scale=1):
"""Numba-based implementation of :py:func:`scipy.stats.t.pdf`
This is for scalars only.
"""
y = (x - loc) / scale
ret = math.exp(math.lgamma((df + 1) / 2) - math.lgamma(df / 2))
ret /= (math.sqrt(math.pi * df) * (1 + y**2 / df)**((df + 1) / 2))
return ret / scale
@numba.jitclass([("_alpha0", numba.float64), ("_beta0", numba.float64),
("_kappa0", numba.float64), ("_mu0", numba.float64),
("_alpha", numba.float64[:]), ("_beta", numba.float64[:]),
("_kappa", numba.float64[:]), ("_mu", numba.float64[:])])
class StudentTNumba(StudentT):
"""Student T observation likelihood (numba-accelerated)"""
def pdf(self, data):
"""Calculate probability density function (PDF)
Parameters
----------
data : array-like
Data points for which to calculate the PDF
"""
df = 2 * self._alpha
loc = self._mu
scale = np.sqrt(self._beta * (self._kappa + 1) /
(self._alpha * self._kappa))
ret = np.empty(len(df))
for i in range(len(ret)):
ret[i] = t_pdf(data, df[i], loc[i], scale[i])
return ret
def segmentation_step(x, old_p, hazard, obs_likelihood):
"""Calculate changepoint probabilites for new datapoint
Parameters
----------
x : float
New datapoint
old_p : list-like of numpy.ndarray
Probabilities for changepoints in data excluding the new datapoint
hazard : class instance
Instance of a class implementing the hazard function. See
:py:class:`ConstHazard` for an example.
obs_likelihood : class instance
Instance of a class implementing the observation likelihood. See
:py:class:`StudenT` for an example.
Returns
-------
numpy.ndarray
Changepoint probabilities including the new datapoint
"""
# Evaluate the predictive distribution for the new datum under each
# of the parameters. This is the standard thing from Bayesian
# inference.
predprobs = obs_likelihood.pdf(x)
# Evaluate the hazard function for this interval
H = hazard.hazard(np.arange(len(old_p)))
# Evaluate the growth probabilities - shift the probabilities down
# and to the right, scaled by the hazard function and the
# predictive probabilities.
new_p = np.empty(len(old_p) + 1)
new_p[1:] = old_p * predprobs * (1 - H)
# Evaluate the probability that there *was* a changepoint and we're
# accumulating the mass back down at r = 0.
new_p[0] = np.sum(old_p * predprobs * H)
# Renormalize the run length probabilities for improved numerical
# stability.
new_p /= np.sum(new_p)
# Update the parameter sets for each possible run length.
obs_likelihood.update_theta(x)
return new_p
segmentation_step_numba = _jit(segmentation_step)
@_jit
def segmentation_numba(data, hazard, obs_likelihood):
ret = np.zeros((len(data) + 1, len(data) + 1))
ret[0, 0] = 1
for i in range(len(data)):
old_p = ret[i, :i+1]
new_p = segmentation_step_numba(data[i], old_p, hazard, obs_likelihood)
ret[i+1, :i+2] = new_p
return ret
[docs]class BayesOnline:
"""Bayesian online changepoint detector
This is an implementation of [Adam2007]_ based on the one from the
`bayesian_changepoint_detection
<https://github.com/hildensia/bayesian_changepoint_detection>`_ python
package.
Since this is an online detector, it keeps state. One can call
:py:meth:`update` for each datapoint and then extract the changepoint
probabilities from :py:attr:`probabilities`.
"""
hazard_map = dict(const=(ConstHazard, ConstHazardNumba))
likelihood_map = dict(student_t=(StudentT, StudentTNumba))
def __init__(self, hazard="const", obs_likelihood="student_t",
hazard_params={"time_scale": 250.},
obs_params={"alpha": 0.1, "beta": 0.01, "kappa": 1.,
"mu": 0.},
engine="numba"):
"""Parameters
----------
hazard_func : "const" or callable, optional
Hazard function. This has to take two parameters, the first
being an array of runlengths, the second an array of parameters.
See the `hazard_params` parameter for details. It has to return
the hazards corresponding to the runlengths.
If "const", use :py:func:`constant_hazard`. Defaults to "const".
obs_likelihood : "student_t" or type
Class implementing the observation likelihood. See
:py:class:`StudentTPython` for an example. If "student_t", use
:py:class:`StudentTPython`. Defaults to "student_t".
hazard_params : numpy.ndarray, optional
Parameters to pass as second argument to the hazard function.
Defaults to ``numpy.array([250])``.
obs_params : list, optional
Parameters to pass to the `observation_likelihood` constructor.
Defaults to ``[0.1, 0.01, 1., 0.]``.
"""
self._use_numba = (engine == "numba") and numba.numba_available
if isinstance(hazard, str):
hazard = self.hazard_map[hazard][int(self._use_numba)]
if isinstance(hazard, type):
hazard = hazard(**hazard_params)
self.hazard = hazard
if isinstance(obs_likelihood, str):
obs_likelihood = self.likelihood_map[obs_likelihood]
obs_likelihood = obs_likelihood[int(self._use_numba)]
if isinstance(obs_likelihood, type):
obs_likelihood = obs_likelihood(**obs_params)
self.obs_likelihood = obs_likelihood
self.reset()
[docs] def reset(self):
"""Reset the detector
All previous data will be forgotten. Useful if one wants to start
on a new dataset.
"""
self.probabilities = [np.array([1])]
self.obs_likelihood.reset()
[docs] def update(self, x):
"""Add data point an calculate changepoint probabilities
Parameters
----------
x : number
New data point
"""
old_p = self.probabilities[-1]
if self._use_numba:
new_p = segmentation_step_numba(x, old_p, self.hazard,
self.obs_likelihood)
else:
new_p = segmentation_step(x, old_p, self.hazard,
self.obs_likelihood)
self.probabilities.append(new_p)
[docs] def find_changepoints(self, data, past=3, prob_threshold=None):
"""Analyze dataset
This resets the detector and calls :py:meth:`update` on all data
points.
Parameters
----------
data : array-like
Dataset
past : int, optional
How many datapoints into the past to look. Larger values will
increase robustness, but also latency, meaning that if `past`
equals some number `x`, a changepoint within the last `x` data
points cannot be detected. Defaults to 3.
prob_threshold : float or None, optional
If this is a float, local maxima in the changepoint probabilities
are considered changepoints, if they are above the threshold. In
that case, an array of changepoints is returned. If `None`,
an array of probabilities is returned. Defaults to `None`.
Returns
-------
numpy.ndarray
Probabilities for a changepoint as a function of time (if
``prob_threshold=None``) or the enumeration of changepoints (if
`prob_threshold` is not `None`). Note that while the algorithm
sets the probability for a changepoint at index 0 to 100%
(meaning that there is a changepoint before the start of the
sequence), the returned probability array has the 0-th entry set
to 0.
"""
self.reset()
if self._use_numba:
prob = segmentation_numba(data, self.hazard, self.obs_likelihood)
self.probabilities = []
for i, p in enumerate(prob):
self.probabilities.append(p[:i+1])
else:
for x in data:
self.update(x)
prob = self.get_probabilities(past)
prob[0] = 0
if prob_threshold is not None:
lmax = signal.argrelmax(prob)[0]
return lmax[prob[lmax] >= prob_threshold]
else:
return prob
[docs] def get_probabilities(self, past):
"""Get changepoint probabilities
To calculate the probabilities, look a number of data points (as
given by the `past` parameter) into the past to increase robustness.
There is always 100% probability that there was a changepoint at the
start of the signal due to how the algorithm is implemented; one should
filter that out if necessary or use :py:meth:`find_changepoints`,
which does that for you.
Parameters
----------
past : int
How many datapoints into the past to look. Larger values will
increase robustness, but also latency, meaning that if `past`
equals some number `x`, a changepoint within the last `x` data
points cannot be detected.
Returns
-------
numpy.ndarray
Changepoint probabilities as a function of time. The length of
the array equals the number of datapoints - `past`.
"""
return np.array([p[past] for p in self.probabilities[past:-1]])