Skip to content

Improving code and adding new methods and tests #45

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -161,3 +161,4 @@ cython_debug/
# option (not recommended) you can uncomment the following to ignore the entire idea folder.
.idea/
/poetry.lock
.DS_Store
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ scipy = "^1.14.0"
matplotlib = "^3.9.1"
scikit-learn = "^1.5.2"
PyQt5 = "^5.15.11"
black = "^25.1.0"

[tool.poetry.group.dev.dependencies]
pytest = "^8.2.2"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,10 @@ def update(self, observation: np.float64) -> None:
beta_divider = 2.0 * self.__k_params + 1.0
assert np.count_nonzero(beta_divider) == beta_divider.shape[0], "Beta dividers cannot be 0.0"

new_mu_params = np.append([self.__mu_0], (self.__mu_params * self.__k_params + observation) / mu_divider)
new_mu_params = np.append(
[self.__mu_0],
(self.__mu_params * self.__k_params + observation) / mu_divider,
)
new_k_params = np.append([self.__k_0], self.__k_params + 1.0)
new_alpha_params = np.append([self.__alpha_0], self.__alpha_params + 0.5)
new_beta_params = np.append(
Expand Down
4 changes: 3 additions & 1 deletion pysatl_cpd/core/algorithms/classification/classifiers/knn.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@ class KNNClassifier(Classifier):
"""

def __init__(
self, k: int, distance: tp.Literal["manhattan", "euclidean", "minkowski", "hamming"] = "minkowski"
self,
k: int,
distance: tp.Literal["manhattan", "euclidean", "minkowski", "hamming"] = "minkowski",
) -> None:
"""
Initializes a new instance of knn classifier for cpd.
Expand Down
5 changes: 4 additions & 1 deletion pysatl_cpd/core/algorithms/classification/classifiers/svm.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,10 @@ class SVMClassifier(Classifier):
The class implementing svm classifier for cpd.
"""

def __init__(self, kernel: tp.Literal["linear", "poly", "rbf", "sigmoid", "precomputed"] = "rbf") -> None:
def __init__(
self,
kernel: tp.Literal["linear", "poly", "rbf", "sigmoid", "precomputed"] = "rbf",
) -> None:
"""
Initializes a new instance of svm classifier for cpd.
:param kernel: specifies the kernel type to be used in the algorithm. If none is given, 'rbf' will be used.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@
import numpy as np
import numpy.typing as npt

from pysatl_cpd.core.algorithms.classification.abstracts.iquality_metric import QualityMetric
from pysatl_cpd.core.algorithms.classification.abstracts.iquality_metric import (
QualityMetric,
)


class Accuracy(QualityMetric):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@
import numpy as np
import numpy.typing as npt

from pysatl_cpd.core.algorithms.classification.abstracts.iquality_metric import QualityMetric
from pysatl_cpd.core.algorithms.classification.abstracts.iquality_metric import (
QualityMetric,
)


class F1(QualityMetric):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@
import numpy as np
import numpy.typing as npt

from pysatl_cpd.core.algorithms.classification.abstracts.iquality_metric import QualityMetric
from pysatl_cpd.core.algorithms.classification.abstracts.iquality_metric import (
QualityMetric,
)


class MCC(QualityMetric):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@
__copyright__ = "Copyright (c) 2024 Artemii Patov"
__license__ = "SPDX-License-Identifier: MIT"

from pysatl_cpd.core.algorithms.classification.abstracts.istatistic_test import TestStatistic
from pysatl_cpd.core.algorithms.classification.abstracts.istatistic_test import (
TestStatistic,
)


class ThresholdOvercome(TestStatistic):
Expand Down
14 changes: 11 additions & 3 deletions pysatl_cpd/core/algorithms/classification_algorithm.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,12 @@

from pysatl_cpd.core.algorithms.abstract_algorithm import Algorithm
from pysatl_cpd.core.algorithms.classification.abstracts.iclassifier import Classifier
from pysatl_cpd.core.algorithms.classification.abstracts.iquality_metric import QualityMetric
from pysatl_cpd.core.algorithms.classification.abstracts.istatistic_test import TestStatistic
from pysatl_cpd.core.algorithms.classification.abstracts.iquality_metric import (
QualityMetric,
)
from pysatl_cpd.core.algorithms.classification.abstracts.istatistic_test import (
TestStatistic,
)


class ClassificationAlgorithm(Algorithm):
Expand All @@ -22,7 +26,11 @@ class ClassificationAlgorithm(Algorithm):
"""

def __init__(
self, classifier: Classifier, quality_metric: QualityMetric, test_statistic: TestStatistic, indent_coeff: float
self,
classifier: Classifier,
quality_metric: QualityMetric,
test_statistic: TestStatistic,
indent_coeff: float,
) -> None:
"""
Initializes a new instance of classification based change point detection algorithm.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,11 @@ def _kernel_density_estimation(observation: npt.NDArray[np.float64], bandwidth:
:return: estimated density values for the observations.
"""
n = len(observation)
x_grid = np.linspace(np.min(observation) - 3 * bandwidth, np.max(observation) + 3 * bandwidth, 1000)
x_grid = np.linspace(
np.min(observation) - 3 * bandwidth,
np.max(observation) + 3 * bandwidth,
1000,
)
kde_values = np.zeros_like(x_grid)
for x in observation:
kde_values += np.exp(-0.5 * ((x_grid - x) / bandwidth) ** 2)
Expand Down
162 changes: 162 additions & 0 deletions pysatl_cpd/core/algorithms/entropies/bubble_entropy.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
from collections import Counter, deque
from typing import Optional

import numpy as np
import numpy.typing as npt

from pysatl_cpd.core.algorithms.online_algorithm import OnlineAlgorithm


class BubbleEntropyAlgorithm(OnlineAlgorithm):
"""
**BubbleEntropyAlgorithm** detects change points in a time series using bubble entropy.

The algorithm calculates bubble entropy values based on permutation entropy with varying embedding dimensions.
It then detects significant changes based on predefined thresholds.

:param window_size: Size of each sliding window.
:param embedding_dimension: The embedding dimension used for calculating permutation entropy.
:param time_delay: Time delay between elements in each state vector for calculating permutation entropy.
:param threshold: Threshold for detecting changes based on entropy differences.

**Attributes:**
- `window_size` (int): Size of each sliding window.
- `embedding_dimension` (int): The embedding dimension used for calculating permutation entropy.
- `time_delay` (int): Time delay between elements in each state vector.
- `threshold` (float): Threshold for change detection based on entropy shift.
- `min_observations_for_detection` (int): Minimum number of observations required to detect a change point.
- `_buffer` (deque): A buffer for storing the most recent observations.
- `_entropy_values` (list): A list to store the calculated entropy values.
- `_position` (int): The current position in the observation sequence.
- `_last_change_point` (Optional[int]): The position of the last detected change point.
"""

def __init__(
self,
window_size: int = 100,
embedding_dimension: int = 3,
time_delay: int = 1,
threshold: float = 0.2,
):
"""
Initializes the BubbleEntropyAlgorithm with the specified parameters.

:param window_size: Size of each sliding window.
:param embedding_dimension: The embedding dimension used for calculating permutation entropy.
:param time_delay: Time delay between elements in each state vector for calculating permutation entropy.
:param threshold: Threshold for detecting changes based on entropy differences.
"""
self._window_size = window_size
self._embedding_dimension = embedding_dimension
self._time_delay = time_delay
self._threshold = threshold

self._buffer: deque[float] = deque(maxlen=window_size * 2)
self._entropy_values: list[float] = []
self._position: int = 0
self._last_change_point: Optional[int] = None

def detect(self, observation: np.float64 | npt.NDArray[np.float64]) -> bool:
"""
Processes the input observation to detect if a change point occurs in the time series.

:param observation: A single observation or an array of observations.
:return: `True` if a change point is detected, otherwise `False`.
"""
if isinstance(observation, np.ndarray):
for obs in observation:
self._process_single_observation(float(obs))
else:
self._process_single_observation(float(observation))

return self._last_change_point is not None

def localize(self, observation: np.float64 | npt.NDArray[np.float64]) -> Optional[int]:
"""
Localizes the detected change point based on the observation.

:param observation: A single observation or an array of observations.
:return: The position of the detected change point, or `None` if no change point is detected.
"""
change_detected = self.detect(observation)

if change_detected:
change_point = self._last_change_point
self._last_change_point = None
return change_point

return None

def _process_single_observation(self, observation: float) -> None:
"""
Processes a single observation and updates the internal state. This method checks for significant deviations,
computes bubble entropy, and detects change points when applicable.

:param observation: The observation value to be processed.
"""
threshold_value1 = 3.0
threshold_value2 = 2.0

if len(self._buffer) >= self._window_size // 2:
buffer_mean = sum(list(self._buffer)[-self._window_size // 2 :]) / (self._window_size // 2)
if abs(observation - buffer_mean) > threshold_value1:
self._last_change_point = self._position

self._buffer.append(observation)
self._position += 1

min_required = (self._embedding_dimension + 1) * self._time_delay + 1
if len(self._buffer) < self._window_size or len(self._buffer) < min_required:
return

current_entropy = self._calculate_bubble_entropy(np.array(list(self._buffer)[-self._window_size :]))
self._entropy_values.append(current_entropy)

if len(self._entropy_values) >= threshold_value2:
entropy_diff = abs(self._entropy_values[-1] - self._entropy_values[-2])

if entropy_diff > self._threshold:
self._last_change_point = self._position - self._window_size // 2

def _calculate_bubble_entropy(self, time_series: npt.NDArray[np.float64]) -> float:
"""
Calculates the bubble entropy of a time series by computing the difference in permutation entropy
between two different embedding dimensions.

:param time_series: The time series to analyze.
:return: The computed bubble entropy value.
"""
H_swaps_m = self._calculate_permutation_entropy(time_series, self._embedding_dimension)
H_swaps_m_plus_1 = self._calculate_permutation_entropy(time_series, self._embedding_dimension + 1)

denom = np.log((self._embedding_dimension + 1) / self._embedding_dimension)
bubble_entropy = (H_swaps_m_plus_1 - H_swaps_m) / denom

return float(bubble_entropy)

def _calculate_permutation_entropy(self, time_series: npt.NDArray[np.float64], embedding_dimension: int) -> float:
"""
Calculates the permutation entropy of a time series based on the given embedding dimension.

:param time_series: The time series data to analyze.
:param embedding_dimension: The embedding dimension for the state vectors.
:return: The computed permutation entropy value.
"""
permutation_vectors = []
for index in range(len(time_series) - embedding_dimension * self._time_delay):
current_window = time_series[index : index + embedding_dimension * self._time_delay : self._time_delay]
permutation_vector = np.argsort(current_window)
permutation_vectors.append(tuple(permutation_vector))

permutation_counts = Counter(permutation_vectors)
total_permutations = len(permutation_vectors)

if total_permutations == 0:
return float(0)

permutation_probabilities = [count / total_permutations for count in permutation_counts.values()]
permutation_entropy = -np.sum(
[probability * np.log2(probability) for probability in permutation_probabilities if probability > 0]
)

return float(permutation_entropy)
Loading