diff --git a/pysatl_cpd/core/algorithms/kliep_algorithm.py b/pysatl_cpd/core/algorithms/kliep_algorithm.py index 39eb255..e53aee3 100644 --- a/pysatl_cpd/core/algorithms/kliep_algorithm.py +++ b/pysatl_cpd/core/algorithms/kliep_algorithm.py @@ -1,82 +1,149 @@ -from typing import cast +""" +Module for implementation of CPD algorithm using KLIEP-based divergence estimation. +""" + +__author__ = "Aleksandra Listkova" +__copyright__ = "Copyright (c) 2025 Aleksandra Listkova" +__license__ = "SPDX-License-Identifier: MIT" + +from typing import Any, Optional, cast import numpy as np import numpy.typing as npt -from numpy import dtype, float64, ndarray +from scipy.optimize import minimize # type: ignore[import-untyped] +from pysatl_cpd.core.algorithms.abstract_algorithm import Algorithm from pysatl_cpd.core.algorithms.density.abstracts.density_based_algorithm import DensityBasedAlgorithm -class KliepAlgorithm(DensityBasedAlgorithm): - """Kullback-Leibler Importance Estimation Procedure (KLIEP) algorithm - for change point detection. - - KLIEP estimates the density ratio between two distributions and uses - the importance weights for detecting changes in the data distribution. - """ - - def __init__(self, bandwidth: float, regularization_coef: float, threshold: float = 1.1): - """Initialize the KLIEP algorithm. +class KliepAlgorithm(Algorithm): + def __init__( + self, + bandwidth: float = 1.0, + regularization: float = 0.1, + threshold: float = 1.1, + max_iter: int = 100, + min_window_size: int = 10 + ) -> None: + """ + Initializes a new instance of KLIEP based change point detection algorithm. - Args: - bandwidth (float): bandwidth parameter for density estimation. - regularization_coef (float): regularization parameter. - threshold (float, optional): threshold for detecting change points. - Defaults to 1.1. + :param bandwidth: the bandwidth parameter for the kernel density estimation. + :param regularization: L2 regularization coefficient for the KLIEP optimization. + :param threshold: detection threshold for significant change points. + :param max_iter: maximum number of iterations for the L-BFGS-B optimizer. + :param min_window_size: minimum size of data segments to consider. """ self.bandwidth = bandwidth - self.regularization_coef = regularization_coef - self.threshold = np.float64(threshold) + self.regularisation = regularization + self.threshold = threshold + self.max_iter = max_iter + self.min_window_size = min_window_size - def _loss_function(self, density_ratio: npt.NDArray[np.float64], alpha: npt.NDArray[np.float64]) -> float: - """Loss function for KLIEP. + def detect(self, window: npt.NDArray[np.float64]) -> int: + """ + Finds change points in the given window. - Args: - density_ratio (np.ndarray): estimated density ratio. - alpha (np.ndarray): coefficients for the density ratio. + :param window: input data window for change point detection. + :return: number of detected change points in the window. + """ + return len(self.localize(window)) - Returns: - float: the computed loss value. + def localize(self, window: npt.NDArray[np.float64]) -> list[int]: """ - return -np.mean(density_ratio) + self.regularization_coef * np.sum(alpha**2) + Identifies and returns the locations of change points in the window. - def detect(self, window: npt.NDArray[np.float64]) -> int: - """Detect the number of change points in the given data window - using KLIEP. + :param window: input data window for change point localization. + :return: list of indices where change points were detected. + """ + window = self._validate_window(window) + if len(window) < self.min_window_size: + return [] - Args: - window (Iterable[float]): the data window to detect change points. + scores = self._compute_kliep_scores(window) + return self._find_change_points(scores) - Returns: - int: the number of detected change points. + def _validate_window(self, window: npt.NDArray[Any]) -> npt.NDArray[np.float64]: """ + Validates and prepares the input window for processing. - window_sample = np.array(window) - weights = self._calculate_weights( - test_value=window_sample, - reference_value=window_sample, - bandwidth=self.bandwidth, - objective_function=self._loss_function, - ) + :param window: input data window. + :return: validated window in 2D format. + """ + window = np.asarray(window, dtype=np.float64) + if window.ndim == 1: + window = window.reshape(-1, 1) + return window - return np.count_nonzero(weights > self.threshold) + def _compute_kliep_scores(self, window: npt.NDArray[np.float64]) -> npt.NDArray[np.float64]: + """ + Computes KLIEP anomaly scores for each point in the window. - def localize(self, window: npt.NDArray[np.float64]) -> list[int]: - """Localize the change points in the given data window using KLIEP. + :param window: validated input data window. + :return: array of KLIEP scores for each point. + """ + n_points = window.shape[0] + scores = np.zeros(n_points, dtype=np.float64) + + for i in range(self.min_window_size, n_points - self.min_window_size): + before = window[:i] + after = window[i:] + + before_density = DensityBasedAlgorithm._kernel_density_estimation( + before, self.bandwidth + ) + after_density = DensityBasedAlgorithm._kernel_density_estimation( + after, self.bandwidth + ) + + alpha = self._optimize_alpha(after_density, before_density) + scores[i] = np.mean(np.exp(after_density - before_density - alpha)) + + return scores + + def _optimize_alpha( + self, + test_density: npt.NDArray[np.float64], + ref_density: npt.NDArray[np.float64] + ) -> npt.NDArray[np.float64]: + """ + Optimizes the alpha parameters for KLIEP density ratio estimation. + """ + def loss(alpha: npt.NDArray[np.float64]) -> np.float64: + ratio = np.exp(test_density - ref_density - alpha) + return np.float64(-np.mean(np.log(ratio)) + self.regularisation * np.sum(alpha**2)) + + initial_alpha: npt.NDArray[np.float64] = np.zeros_like(test_density).flatten() + bounds: list[tuple[float, Optional[float]]] = [(0.0, None) for _ in test_density.flatten()] + + def wrapped_loss(alpha_flat: npt.NDArray[np.float64]) -> float: + alpha = alpha_flat.reshape(test_density.shape) + return float(loss(alpha)) + + res = minimize( + wrapped_loss, + initial_alpha, + method='L-BFGS-B', + options={'maxiter': self.max_iter}, + bounds=bounds + ) - Args: - window (Iterable[float]): the data window to localize - change points. + return cast(npt.NDArray[np.float64], res.x.reshape(test_density.shape)) - Returns: - List[int]: the indices of the detected change points. + def _find_change_points(self, scores: npt.NDArray[np.float64]) -> list[int]: """ - window_sample = np.array(window) - weights: ndarray[tuple[int, ...], dtype[float64]] = self._calculate_weights( - test_value=window_sample, - reference_value=window_sample, - bandwidth=self.bandwidth, - objective_function=self._loss_function, - ) + Identifies change points from computed KLIEP scores. + + :param scores: array of KLIEP scores for each point. + :return: list of detected change point indices. + """ + candidates = np.where(scores > self.threshold)[0] + if len(candidates) == 0: + return [] + + change_points = [int(candidates[0])] + for point in candidates[1:]: + if point - change_points[-1] > self.min_window_size: + change_points.append(int(point)) - return cast(list[int], np.where(weights > self.threshold)[0].tolist()) + return change_points