Skip to content

feat(kliep): rewrite kliep algorithm implementation #43

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 27 commits into
base: alistkova/kliep-rulsif
Choose a base branch
from
Open
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
4862341
feat(kliep): rewrite kliep algorithm implementation
alistkova Mar 26, 2025
f6348e9
refactor(algorithms): refactoring changes rebase.
alexdtat Mar 18, 2025
b331aa6
fix(algorithms): typing fix from List to list.
alexdtat Mar 18, 2025
02ebaba
refactor(algorithms): change of vision scope modifiers.
alexdtat Mar 18, 2025
7b49a81
fix(algorithms): change learning in Bayesian online CPD.
alexdtat Mar 18, 2025
3f0e3ff
refactor(bayesian algorithm): typing an naming changes
alexdtat Mar 26, 2025
067211f
refactor(bayesian algorithm): typing an naming changes
alexdtat Mar 26, 2025
53535e8
refactor(bayesian algorithm): detectors and localizers renaming.
alexdtat Mar 26, 2025
ef11990
docs(bayesian algorithm): license change.
alexdtat Mar 26, 2025
fdace5e
feat(algorithms): add online-algorithms related functionality
alexdtat Mar 30, 2025
b0f752f
fix(tests): fix import in cpd_solver tests
alexdtat Mar 30, 2025
f7e47fb
test(online-cpd): add tests to online-algorithms related functionality
alexdtat Mar 31, 2025
f70d1cb
refactor(bayesian online algorithm): add asserts
alexdtat Apr 1, 2025
033c0a5
fix(bayesian online algorithm): fix learning behavior after change po…
alexdtat Apr 1, 2025
cc05b16
docs(readme): update info about bayesian algorithm
alexdtat Apr 1, 2025
df400b4
refactor(online cpd solver): add suggested cp processing optimization
alexdtat Apr 5, 2025
d982093
fix(gaussian likelihood): change deprecated decorator
alexdtat Apr 5, 2025
1145426
docs(online algorithms): clarify localization method docstring
alexdtat Apr 5, 2025
2f19c50
chore(gitignore): add ./idea to gitignore
alexdtat Apr 5, 2025
714fe90
fix(kliep): typing fix
alistkova Apr 8, 2025
048b46d
Merge branch 'PySATL:main' into main
alistkova Apr 8, 2025
0b80f9e
revert merge
alistkova Apr 8, 2025
36433ff
fix(kliep): organize imports
alistkova Apr 8, 2025
e130d77
fix: correct type annotations
alistkova May 3, 2025
c8e9405
style: modernize type annotations and clean imports
alistkova May 3, 2025
fecf34f
style: organize imports
alistkova May 3, 2025
b8fd7f5
style: organize imports
alistkova May 3, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
181 changes: 124 additions & 57 deletions pysatl_cpd/core/algorithms/kliep_algorithm.py
Original file line number Diff line number Diff line change
@@ -1,82 +1,149 @@
from typing import cast
"""
Module for implementation of CPD algorithm using KLIEP-based divergence estimation.
"""

__author__ = "Aleksandra Listkova"
__copyright__ = "Copyright (c) 2025 Aleksandra Listkova"
__license__ = "SPDX-License-Identifier: MIT"

from typing import Any, Optional, cast

import numpy as np
import numpy.typing as npt
from numpy import dtype, float64, ndarray
from scipy.optimize import minimize # type: ignore[import-untyped]

from pysatl_cpd.core.algorithms.abstract_algorithm import Algorithm
from pysatl_cpd.core.algorithms.density.abstracts.density_based_algorithm import DensityBasedAlgorithm


class KliepAlgorithm(DensityBasedAlgorithm):
"""Kullback-Leibler Importance Estimation Procedure (KLIEP) algorithm
for change point detection.

KLIEP estimates the density ratio between two distributions and uses
the importance weights for detecting changes in the data distribution.
"""

def __init__(self, bandwidth: float, regularization_coef: float, threshold: float = 1.1):
"""Initialize the KLIEP algorithm.
class KliepAlgorithm(Algorithm):
def __init__(
self,
bandwidth: float = 1.0,
regularization: float = 0.1,
threshold: float = 1.1,
max_iter: int = 100,
min_window_size: int = 10
) -> None:
"""
Initializes a new instance of KLIEP based change point detection algorithm.

Args:
bandwidth (float): bandwidth parameter for density estimation.
regularization_coef (float): regularization parameter.
threshold (float, optional): threshold for detecting change points.
Defaults to 1.1.
:param bandwidth: the bandwidth parameter for the kernel density estimation.
:param regularization: L2 regularization coefficient for the KLIEP optimization.
:param threshold: detection threshold for significant change points.
:param max_iter: maximum number of iterations for the L-BFGS-B optimizer.
:param min_window_size: minimum size of data segments to consider.
"""
self.bandwidth = bandwidth
self.regularization_coef = regularization_coef
self.threshold = np.float64(threshold)
self.regularisation = regularization
self.threshold = threshold
self.max_iter = max_iter
self.min_window_size = min_window_size

def _loss_function(self, density_ratio: npt.NDArray[np.float64], alpha: npt.NDArray[np.float64]) -> float:
"""Loss function for KLIEP.
def detect(self, window: npt.NDArray[np.float64]) -> int:
"""
Finds change points in the given window.

Args:
density_ratio (np.ndarray): estimated density ratio.
alpha (np.ndarray): coefficients for the density ratio.
:param window: input data window for change point detection.
:return: number of detected change points in the window.
"""
return len(self.localize(window))

Returns:
float: the computed loss value.
def localize(self, window: npt.NDArray[np.float64]) -> list[int]:
"""
return -np.mean(density_ratio) + self.regularization_coef * np.sum(alpha**2)
Identifies and returns the locations of change points in the window.

def detect(self, window: npt.NDArray[np.float64]) -> int:
"""Detect the number of change points in the given data window
using KLIEP.
:param window: input data window for change point localization.
:return: list of indices where change points were detected.
"""
window = self._validate_window(window)
if len(window) < self.min_window_size:
return []

Args:
window (Iterable[float]): the data window to detect change points.
scores = self._compute_kliep_scores(window)
return self._find_change_points(scores)

Returns:
int: the number of detected change points.
def _validate_window(self, window: npt.NDArray[Any]) -> npt.NDArray[np.float64]:
"""
Validates and prepares the input window for processing.

window_sample = np.array(window)
weights = self._calculate_weights(
test_value=window_sample,
reference_value=window_sample,
bandwidth=self.bandwidth,
objective_function=self._loss_function,
)
:param window: input data window.
:return: validated window in 2D format.
"""
window = np.asarray(window, dtype=np.float64)
if window.ndim == 1:
window = window.reshape(-1, 1)
return window

return np.count_nonzero(weights > self.threshold)
def _compute_kliep_scores(self, window: npt.NDArray[np.float64]) -> npt.NDArray[np.float64]:
"""
Computes KLIEP anomaly scores for each point in the window.

def localize(self, window: npt.NDArray[np.float64]) -> list[int]:
"""Localize the change points in the given data window using KLIEP.
:param window: validated input data window.
:return: array of KLIEP scores for each point.
"""
n_points = window.shape[0]
scores = np.zeros(n_points, dtype=np.float64)

for i in range(self.min_window_size, n_points - self.min_window_size):
before = window[:i]
after = window[i:]

before_density = DensityBasedAlgorithm._kernel_density_estimation(
before, self.bandwidth
)
after_density = DensityBasedAlgorithm._kernel_density_estimation(
after, self.bandwidth
)

alpha = self._optimize_alpha(after_density, before_density)
scores[i] = np.mean(np.exp(after_density - before_density - alpha))

return scores

def _optimize_alpha(
self,
test_density: npt.NDArray[np.float64],
ref_density: npt.NDArray[np.float64]
) -> npt.NDArray[np.float64]:
"""
Optimizes the alpha parameters for KLIEP density ratio estimation.
"""
def loss(alpha: npt.NDArray[np.float64]) -> np.float64:
ratio = np.exp(test_density - ref_density - alpha)
return np.float64(-np.mean(np.log(ratio)) + self.regularisation * np.sum(alpha**2))

initial_alpha: npt.NDArray[np.float64] = np.zeros_like(test_density).flatten()
bounds: list[tuple[float, Optional[float]]] = [(0.0, None) for _ in test_density.flatten()]

def wrapped_loss(alpha_flat: npt.NDArray[np.float64]) -> float:
alpha = alpha_flat.reshape(test_density.shape)
return float(loss(alpha))

res = minimize(
wrapped_loss,
initial_alpha,
method='L-BFGS-B',
options={'maxiter': self.max_iter},
bounds=bounds
)

Args:
window (Iterable[float]): the data window to localize
change points.
return cast(npt.NDArray[np.float64], res.x.reshape(test_density.shape))

Returns:
List[int]: the indices of the detected change points.
def _find_change_points(self, scores: npt.NDArray[np.float64]) -> list[int]:
"""
window_sample = np.array(window)
weights: ndarray[tuple[int, ...], dtype[float64]] = self._calculate_weights(
test_value=window_sample,
reference_value=window_sample,
bandwidth=self.bandwidth,
objective_function=self._loss_function,
)
Identifies change points from computed KLIEP scores.

:param scores: array of KLIEP scores for each point.
:return: list of detected change point indices.
"""
candidates = np.where(scores > self.threshold)[0]
if len(candidates) == 0:
return []

change_points = [int(candidates[0])]
for point in candidates[1:]:
if point - change_points[-1] > self.min_window_size:
change_points.append(int(point))

return cast(list[int], np.where(weights > self.threshold)[0].tolist())
return change_points
Loading