From 757b7d07f967725504ef0d2309d4b472c2b4b09d Mon Sep 17 00:00:00 2001 From: Wenderlog Date: Tue, 8 Apr 2025 18:35:03 +0300 Subject: [PATCH 1/3] Files formatted with Ruff --- .gitignore | 1 + pyproject.toml | 1 + .../likelihoods/gaussian_conjugate.py | 5 +- .../classification/classifiers/knn.py | 4 +- .../classification/classifiers/svm.py | 5 +- .../classification/accuracy.py | 4 +- .../quality_metrics/classification/f1.py | 4 +- .../quality_metrics/classification/mcc.py | 4 +- .../test_statistics/threshold_overcome.py | 4 +- .../algorithms/classification_algorithm.py | 14 +- .../abstracts/density_based_algorithm.py | 6 +- .../algorithms/entropies/bubble_entropy.py | 162 ++++++++++++ .../entropies/conditional_entropy.py | 247 ++++++++++++++++++ .../entropies/permutation_entropy.py | 187 +++++++++++++ .../algorithms/entropies/shannon_entropy.py | 160 ++++++++++++ .../algorithms/graph/abstracts/ibuilder.py | 4 +- pysatl_cpd/core/algorithms/kliep_algorithm.py | 4 +- pysatl_cpd/core/algorithms/knn/classifier.py | 8 +- pysatl_cpd/core/algorithms/knn/graph.py | 8 +- pysatl_cpd/core/algorithms/knn_algorithm.py | 12 +- .../core/algorithms/rulsif_algorithm.py | 4 +- pysatl_cpd/generator/config_parser.py | 5 +- pysatl_cpd/generator/dataset_description.py | 5 +- pysatl_cpd/generator/saver.py | 8 +- pysatl_cpd/icpd_solver.py | 39 ++- pysatl_cpd/labeled_data.py | 4 +- .../test_bayesian_algorithm.py | 6 +- .../test_bayesian_online_algorithm.py | 18 +- .../test_classification_algorithms.py | 74 +++--- .../test_bubble_entropy_algorithm.py | 168 ++++++++++++ .../test_conditional_entropy_algorithm.py | 156 +++++++++++ .../test_permutation_entropy_algorithm.py | 174 ++++++++++++ .../test_shannon_entropy_algorithm.py | 164 ++++++++++++ .../test_algorithms/test_graph_algorithm.py | 16 +- tests/test_core/test_online_cpd_core.py | 7 +- .../test_scrubber/test_linear_scrubber.py | 17 +- tests/test_generator/test_distributions.py | 42 ++- tests/test_generator/test_generator.py | 5 +- tests/test_online_solver.py | 9 +- 39 files changed, 1683 insertions(+), 82 deletions(-) create mode 100644 pysatl_cpd/core/algorithms/entropies/bubble_entropy.py create mode 100644 pysatl_cpd/core/algorithms/entropies/conditional_entropy.py create mode 100644 pysatl_cpd/core/algorithms/entropies/permutation_entropy.py create mode 100644 pysatl_cpd/core/algorithms/entropies/shannon_entropy.py create mode 100644 tests/test_core/test_algorithms/test_entropies/test_bubble_entropy_algorithm.py create mode 100644 tests/test_core/test_algorithms/test_entropies/test_conditional_entropy_algorithm.py create mode 100644 tests/test_core/test_algorithms/test_entropies/test_permutation_entropy_algorithm.py create mode 100644 tests/test_core/test_algorithms/test_entropies/test_shannon_entropy_algorithm.py diff --git a/.gitignore b/.gitignore index 2927795..c3ea18a 100644 --- a/.gitignore +++ b/.gitignore @@ -161,3 +161,4 @@ cython_debug/ # option (not recommended) you can uncomment the following to ignore the entire idea folder. .idea/ /poetry.lock +.DS_Store diff --git a/pyproject.toml b/pyproject.toml index b871f63..7ec63bc 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -23,6 +23,7 @@ scipy = "^1.14.0" matplotlib = "^3.9.1" scikit-learn = "^1.5.2" PyQt5 = "^5.15.11" +black = "^25.1.0" [tool.poetry.group.dev.dependencies] pytest = "^8.2.2" diff --git a/pysatl_cpd/core/algorithms/bayesian/likelihoods/gaussian_conjugate.py b/pysatl_cpd/core/algorithms/bayesian/likelihoods/gaussian_conjugate.py index e43419b..d84dd18 100644 --- a/pysatl_cpd/core/algorithms/bayesian/likelihoods/gaussian_conjugate.py +++ b/pysatl_cpd/core/algorithms/bayesian/likelihoods/gaussian_conjugate.py @@ -76,7 +76,10 @@ def update(self, observation: np.float64) -> None: beta_divider = 2.0 * self.__k_params + 1.0 assert np.count_nonzero(beta_divider) == beta_divider.shape[0], "Beta dividers cannot be 0.0" - new_mu_params = np.append([self.__mu_0], (self.__mu_params * self.__k_params + observation) / mu_divider) + new_mu_params = np.append( + [self.__mu_0], + (self.__mu_params * self.__k_params + observation) / mu_divider, + ) new_k_params = np.append([self.__k_0], self.__k_params + 1.0) new_alpha_params = np.append([self.__alpha_0], self.__alpha_params + 0.5) new_beta_params = np.append( diff --git a/pysatl_cpd/core/algorithms/classification/classifiers/knn.py b/pysatl_cpd/core/algorithms/classification/classifiers/knn.py index cc518dc..7c50010 100644 --- a/pysatl_cpd/core/algorithms/classification/classifiers/knn.py +++ b/pysatl_cpd/core/algorithms/classification/classifiers/knn.py @@ -21,7 +21,9 @@ class KNNClassifier(Classifier): """ def __init__( - self, k: int, distance: tp.Literal["manhattan", "euclidean", "minkowski", "hamming"] = "minkowski" + self, + k: int, + distance: tp.Literal["manhattan", "euclidean", "minkowski", "hamming"] = "minkowski", ) -> None: """ Initializes a new instance of knn classifier for cpd. diff --git a/pysatl_cpd/core/algorithms/classification/classifiers/svm.py b/pysatl_cpd/core/algorithms/classification/classifiers/svm.py index d006ba1..87d0e78 100644 --- a/pysatl_cpd/core/algorithms/classification/classifiers/svm.py +++ b/pysatl_cpd/core/algorithms/classification/classifiers/svm.py @@ -20,7 +20,10 @@ class SVMClassifier(Classifier): The class implementing svm classifier for cpd. """ - def __init__(self, kernel: tp.Literal["linear", "poly", "rbf", "sigmoid", "precomputed"] = "rbf") -> None: + def __init__( + self, + kernel: tp.Literal["linear", "poly", "rbf", "sigmoid", "precomputed"] = "rbf", + ) -> None: """ Initializes a new instance of svm classifier for cpd. :param kernel: specifies the kernel type to be used in the algorithm. If none is given, 'rbf' will be used. diff --git a/pysatl_cpd/core/algorithms/classification/quality_metrics/classification/accuracy.py b/pysatl_cpd/core/algorithms/classification/quality_metrics/classification/accuracy.py index 8f4b304..2448f89 100644 --- a/pysatl_cpd/core/algorithms/classification/quality_metrics/classification/accuracy.py +++ b/pysatl_cpd/core/algorithms/classification/quality_metrics/classification/accuracy.py @@ -9,7 +9,9 @@ import numpy as np import numpy.typing as npt -from pysatl_cpd.core.algorithms.classification.abstracts.iquality_metric import QualityMetric +from pysatl_cpd.core.algorithms.classification.abstracts.iquality_metric import ( + QualityMetric, +) class Accuracy(QualityMetric): diff --git a/pysatl_cpd/core/algorithms/classification/quality_metrics/classification/f1.py b/pysatl_cpd/core/algorithms/classification/quality_metrics/classification/f1.py index ebb3453..084713d 100644 --- a/pysatl_cpd/core/algorithms/classification/quality_metrics/classification/f1.py +++ b/pysatl_cpd/core/algorithms/classification/quality_metrics/classification/f1.py @@ -9,7 +9,9 @@ import numpy as np import numpy.typing as npt -from pysatl_cpd.core.algorithms.classification.abstracts.iquality_metric import QualityMetric +from pysatl_cpd.core.algorithms.classification.abstracts.iquality_metric import ( + QualityMetric, +) class F1(QualityMetric): diff --git a/pysatl_cpd/core/algorithms/classification/quality_metrics/classification/mcc.py b/pysatl_cpd/core/algorithms/classification/quality_metrics/classification/mcc.py index 6abe76a..eaf00c7 100644 --- a/pysatl_cpd/core/algorithms/classification/quality_metrics/classification/mcc.py +++ b/pysatl_cpd/core/algorithms/classification/quality_metrics/classification/mcc.py @@ -11,7 +11,9 @@ import numpy as np import numpy.typing as npt -from pysatl_cpd.core.algorithms.classification.abstracts.iquality_metric import QualityMetric +from pysatl_cpd.core.algorithms.classification.abstracts.iquality_metric import ( + QualityMetric, +) class MCC(QualityMetric): diff --git a/pysatl_cpd/core/algorithms/classification/test_statistics/threshold_overcome.py b/pysatl_cpd/core/algorithms/classification/test_statistics/threshold_overcome.py index 99d4a99..1ba6c7c 100644 --- a/pysatl_cpd/core/algorithms/classification/test_statistics/threshold_overcome.py +++ b/pysatl_cpd/core/algorithms/classification/test_statistics/threshold_overcome.py @@ -6,7 +6,9 @@ __copyright__ = "Copyright (c) 2024 Artemii Patov" __license__ = "SPDX-License-Identifier: MIT" -from pysatl_cpd.core.algorithms.classification.abstracts.istatistic_test import TestStatistic +from pysatl_cpd.core.algorithms.classification.abstracts.istatistic_test import ( + TestStatistic, +) class ThresholdOvercome(TestStatistic): diff --git a/pysatl_cpd/core/algorithms/classification_algorithm.py b/pysatl_cpd/core/algorithms/classification_algorithm.py index 2d7ae21..30dc751 100644 --- a/pysatl_cpd/core/algorithms/classification_algorithm.py +++ b/pysatl_cpd/core/algorithms/classification_algorithm.py @@ -12,8 +12,12 @@ from pysatl_cpd.core.algorithms.abstract_algorithm import Algorithm from pysatl_cpd.core.algorithms.classification.abstracts.iclassifier import Classifier -from pysatl_cpd.core.algorithms.classification.abstracts.iquality_metric import QualityMetric -from pysatl_cpd.core.algorithms.classification.abstracts.istatistic_test import TestStatistic +from pysatl_cpd.core.algorithms.classification.abstracts.iquality_metric import ( + QualityMetric, +) +from pysatl_cpd.core.algorithms.classification.abstracts.istatistic_test import ( + TestStatistic, +) class ClassificationAlgorithm(Algorithm): @@ -22,7 +26,11 @@ class ClassificationAlgorithm(Algorithm): """ def __init__( - self, classifier: Classifier, quality_metric: QualityMetric, test_statistic: TestStatistic, indent_coeff: float + self, + classifier: Classifier, + quality_metric: QualityMetric, + test_statistic: TestStatistic, + indent_coeff: float, ) -> None: """ Initializes a new instance of classification based change point detection algorithm. diff --git a/pysatl_cpd/core/algorithms/density/abstracts/density_based_algorithm.py b/pysatl_cpd/core/algorithms/density/abstracts/density_based_algorithm.py index 0cea21b..d785789 100644 --- a/pysatl_cpd/core/algorithms/density/abstracts/density_based_algorithm.py +++ b/pysatl_cpd/core/algorithms/density/abstracts/density_based_algorithm.py @@ -23,7 +23,11 @@ def _kernel_density_estimation(observation: npt.NDArray[np.float64], bandwidth: :return: estimated density values for the observations. """ n = len(observation) - x_grid = np.linspace(np.min(observation) - 3 * bandwidth, np.max(observation) + 3 * bandwidth, 1000) + x_grid = np.linspace( + np.min(observation) - 3 * bandwidth, + np.max(observation) + 3 * bandwidth, + 1000, + ) kde_values = np.zeros_like(x_grid) for x in observation: kde_values += np.exp(-0.5 * ((x_grid - x) / bandwidth) ** 2) diff --git a/pysatl_cpd/core/algorithms/entropies/bubble_entropy.py b/pysatl_cpd/core/algorithms/entropies/bubble_entropy.py new file mode 100644 index 0000000..bcf735e --- /dev/null +++ b/pysatl_cpd/core/algorithms/entropies/bubble_entropy.py @@ -0,0 +1,162 @@ +from collections import Counter, deque +from typing import Optional + +import numpy as np +import numpy.typing as npt + +from pysatl_cpd.core.algorithms.online_algorithm import OnlineAlgorithm + + +class BubbleEntropyAlgorithm(OnlineAlgorithm): + """ + **BubbleEntropyAlgorithm** detects change points in a time series using bubble entropy. + + The algorithm calculates bubble entropy values based on permutation entropy with varying embedding dimensions. + It then detects significant changes based on predefined thresholds. + + :param window_size: Size of each sliding window. + :param embedding_dimension: The embedding dimension used for calculating permutation entropy. + :param time_delay: Time delay between elements in each state vector for calculating permutation entropy. + :param threshold: Threshold for detecting changes based on entropy differences. + + **Attributes:** + - `window_size` (int): Size of each sliding window. + - `embedding_dimension` (int): The embedding dimension used for calculating permutation entropy. + - `time_delay` (int): Time delay between elements in each state vector. + - `threshold` (float): Threshold for change detection based on entropy shift. + - `min_observations_for_detection` (int): Minimum number of observations required to detect a change point. + - `_buffer` (deque): A buffer for storing the most recent observations. + - `_entropy_values` (list): A list to store the calculated entropy values. + - `_position` (int): The current position in the observation sequence. + - `_last_change_point` (Optional[int]): The position of the last detected change point. + """ + + def __init__( + self, + window_size: int = 100, + embedding_dimension: int = 3, + time_delay: int = 1, + threshold: float = 0.2, + ): + """ + Initializes the BubbleEntropyAlgorithm with the specified parameters. + + :param window_size: Size of each sliding window. + :param embedding_dimension: The embedding dimension used for calculating permutation entropy. + :param time_delay: Time delay between elements in each state vector for calculating permutation entropy. + :param threshold: Threshold for detecting changes based on entropy differences. + """ + self._window_size = window_size + self._embedding_dimension = embedding_dimension + self._time_delay = time_delay + self._threshold = threshold + + self._buffer: deque[float] = deque(maxlen=window_size * 2) + self._entropy_values: list[float] = [] + self._position: int = 0 + self._last_change_point: Optional[int] = None + + def detect(self, observation: np.float64 | npt.NDArray[np.float64]) -> bool: + """ + Processes the input observation to detect if a change point occurs in the time series. + + :param observation: A single observation or an array of observations. + :return: `True` if a change point is detected, otherwise `False`. + """ + if isinstance(observation, np.ndarray): + for obs in observation: + self._process_single_observation(float(obs)) + else: + self._process_single_observation(float(observation)) + + return self._last_change_point is not None + + def localize(self, observation: np.float64 | npt.NDArray[np.float64]) -> Optional[int]: + """ + Localizes the detected change point based on the observation. + + :param observation: A single observation or an array of observations. + :return: The position of the detected change point, or `None` if no change point is detected. + """ + change_detected = self.detect(observation) + + if change_detected: + change_point = self._last_change_point + self._last_change_point = None + return change_point + + return None + + def _process_single_observation(self, observation: float) -> None: + """ + Processes a single observation and updates the internal state. This method checks for significant deviations, + computes bubble entropy, and detects change points when applicable. + + :param observation: The observation value to be processed. + """ + threshold_value1 = 3.0 + threshold_value2 = 2.0 + + if len(self._buffer) >= self._window_size // 2: + buffer_mean = sum(list(self._buffer)[-self._window_size // 2 :]) / (self._window_size // 2) + if abs(observation - buffer_mean) > threshold_value1: + self._last_change_point = self._position + + self._buffer.append(observation) + self._position += 1 + + min_required = (self._embedding_dimension + 1) * self._time_delay + 1 + if len(self._buffer) < self._window_size or len(self._buffer) < min_required: + return + + current_entropy = self._calculate_bubble_entropy(np.array(list(self._buffer)[-self._window_size :])) + self._entropy_values.append(current_entropy) + + if len(self._entropy_values) >= threshold_value2: + entropy_diff = abs(self._entropy_values[-1] - self._entropy_values[-2]) + + if entropy_diff > self._threshold: + self._last_change_point = self._position - self._window_size // 2 + + def _calculate_bubble_entropy(self, time_series: np.ndarray) -> float: + """ + Calculates the bubble entropy of a time series by computing the difference in permutation entropy + between two different embedding dimensions. + + :param time_series: The time series to analyze. + :return: The computed bubble entropy value. + """ + H_swaps_m = self._calculate_permutation_entropy(time_series, self._embedding_dimension) + H_swaps_m_plus_1 = self._calculate_permutation_entropy(time_series, self._embedding_dimension + 1) + + denom = np.log((self._embedding_dimension + 1) / self._embedding_dimension) + bubble_entropy = (H_swaps_m_plus_1 - H_swaps_m) / denom + + return bubble_entropy + + def _calculate_permutation_entropy(self, time_series: np.ndarray, embedding_dimension: int) -> float: + """ + Calculates the permutation entropy of a time series based on the given embedding dimension. + + :param time_series: The time series data to analyze. + :param embedding_dimension: The embedding dimension for the state vectors. + :return: The computed permutation entropy value. + """ + permutation_vectors = [] + for index in range(len(time_series) - embedding_dimension * self._time_delay): + current_window = time_series[index : index + embedding_dimension * self._time_delay : self._time_delay] + permutation_vector = np.argsort(current_window) + permutation_vectors.append(tuple(permutation_vector)) + + permutation_counts = Counter(permutation_vectors) + total_permutations = len(permutation_vectors) + + if total_permutations == 0: + return 0.0 + + permutation_probabilities = [count / total_permutations for count in permutation_counts.values()] + permutation_entropy = -np.sum( + [probability * np.log2(probability) for probability in permutation_probabilities if probability > 0] + ) + + return permutation_entropy diff --git a/pysatl_cpd/core/algorithms/entropies/conditional_entropy.py b/pysatl_cpd/core/algorithms/entropies/conditional_entropy.py new file mode 100644 index 0000000..9d54687 --- /dev/null +++ b/pysatl_cpd/core/algorithms/entropies/conditional_entropy.py @@ -0,0 +1,247 @@ +from collections import deque +from typing import Optional + +import numpy as np +import numpy.typing as npt + +from pysatl_cpd.core.algorithms.online_algorithm import OnlineAlgorithm + + +class ConditionalEntropyAlgorithm(OnlineAlgorithm): + """ + **ConditionalEntropyAlgorithm** detects change points in a time series using conditional entropy. + + This algorithm calculates entropy values based on joint and + conditional probability distributions of paired observations + and detects significant changes based on predefined thresholds. + + :param window_size: Size of each sliding window. + :param bins: Number of bins used to create histograms for the joint probability distribution. + :param threshold: Threshold for detecting changes based on entropy differences. + + **Attributes:** + - `window_size` (int): Size of each sliding window. + - `bins` (int): Number of histogram bins used for entropy calculation. + - `threshold` (float): Threshold for change detection based on entropy shift. + - `_buffer_x` (deque): A buffer for storing the X observations. + - `_buffer_y` (deque): A buffer for storing the Y observations. + - `_entropy_values` (list): A list that stores the calculated entropy values. + - `_position` (int): The current position (or index) in the observation sequence. + - `_last_change_point` (Optional[int]): The last position where a change point was detected. + - `_prev_x` (Optional[float]): The previous X observation value. + - `_prev_y` (Optional[float]): The previous Y observation value. + - `_constant_value_count` (int): A counter for consecutive constant values. + + **Methods:** + + **`detect(window: Iterable[float | np.float64]) -> int`** + Detects the number of change points in a time series. + + :param window: Input time series. + :return: Number of detected change points. + """ + + def __init__( + self, + window_size: int = 40, + bins: int = 10, + threshold: float = 0.3, + ): + """ + Initializes the ConditionalEntropyAlgorithm with the specified parameters. + + :param window_size: Size of each sliding window. + :param bins: Number of bins used to create histograms for the joint probability distribution. + :param threshold: Threshold for detecting changes based on entropy differences. + """ + self._window_size = window_size + self._bins = bins + self._threshold = threshold + + self._buffer_x: deque[float] = deque(maxlen=window_size * 2) + self._buffer_y: deque[float] = deque(maxlen=window_size * 2) + self._entropy_values: list[float] = [] + self._position: int = 0 + self._last_change_point: Optional[int] = None + self._prev_x: Optional[float] = None + self._prev_y: Optional[float] = None + self._constant_value_count: int = 0 + + def detect(self, observation: np.float64 | npt.NDArray[np.float64]) -> bool: + """ + Processes a single observation or an array of observations to detect if a change point occurs in the time series + + :param observation: A single observation or an array of observations to be processed. + :return: `True` if a change point is detected, otherwise `False`. + """ + min_param = 2 + if isinstance(observation, np.ndarray) and observation.ndim > 0: + if observation.shape[0] < min_param: + raise ValueError("Observation array must contain both X and Y values") + x_obs = float(observation[0]) + y_obs = float(observation[1]) + self._process_observation_pair(x_obs, y_obs) + else: + raise ValueError("Observation must be an array containing both X and Y values") + + return self._last_change_point is not None + + def localize(self, observation: np.float64 | npt.NDArray[np.float64]) -> Optional[int]: + """ + Localizes the detected change point based on the observation provided. + + :param observation: A single observation or an array of observations. + :return: The position of the detected change point, or `None` if no change point is detected. + """ + change_detected = self.detect(observation) + + if change_detected: + change_point = self._last_change_point + self._last_change_point = None + return change_point + + return None + + def _process_observation_pair(self, x_obs: float, y_obs: float) -> None: + """ + Processes a pair of observations (x and y) and updates the internal state. + This method checks for constant values + significant deviations from the moving averages, and computes conditional + entropy for the sliding window to detect changes + + :param x_obs: The X observation value to be processed + :param y_obs: The Y observation value to be processed + """ + constant_value_threshold = 10 + significant_deviation_threshold = 3 + max_entropy_value = 2 + _epsilon_ = 1e-10 + + self._handle_constant_values(x_obs, y_obs, constant_value_threshold) + self._process_buffer_deviations(x_obs, y_obs, significant_deviation_threshold) + self._check_correlation_change(x_obs, y_obs, _epsilon_) + + self._buffer_x.append(x_obs) + self._buffer_y.append(y_obs) + self._position += 1 + + if len(self._buffer_x) < self._window_size: + return + + self._compute_entropy(max_entropy_value) + + def _handle_constant_values(self, x_obs: float, y_obs: float, constant_value_threshold: int) -> None: + """ + Checks if the current observations are constant (i.e., the same as the previous ones). + If so, it increments the counter + for constant values and detects a change point when the count exceeds the threshold. + + :param x_obs: The current X observation. + :param y_obs: The current Y observation. + :param constant_value_threshold: The threshold for detecting constant values. + """ + if self._prev_x is not None and self._prev_y is not None: + if x_obs == self._prev_x and y_obs == self._prev_y: + self._constant_value_count += 1 + else: + if self._constant_value_count >= constant_value_threshold: + self._last_change_point = self._position + self._constant_value_count = 0 + + self._prev_x = x_obs + self._prev_y = y_obs + + def _process_buffer_deviations(self, x_obs: float, y_obs: float, significant_deviation_threshold: int) -> None: + """ + Checks if the current observations deviate significantly from + the moving averages of the previous values in the buffer. + If a significant deviation is detected, a change point is recorded. + + :param x_obs: The current X observation. + :param y_obs: The current Y observation. + :param significant_deviation_threshold: The threshold for detecting significant deviations. + """ + if len(self._buffer_x) >= self._window_size // 2: + buffer_x_mean = sum(list(self._buffer_x)[-self._window_size // 2 :]) / (self._window_size // 2) + buffer_y_mean = sum(list(self._buffer_y)[-self._window_size // 2 :]) / (self._window_size // 2) + + if ( + abs(x_obs - buffer_x_mean) > significant_deviation_threshold + or abs(y_obs - buffer_y_mean) > significant_deviation_threshold + ): + self._last_change_point = self._position + + def _check_correlation_change(self, x_obs: float, y_obs: float, _epsilon_: float) -> None: + """ + Checks for changes in the correlation between the most recent observations in the buffer. + If a significant change in + correlation is detected, a change point is recorded. + + :param x_obs: The current X observation. + :param y_obs: The current Y observation. + :param _epsilon_: A small value to avoid division by zero when calculating standard deviations. + """ + threshold = 0.3 + + if len(self._buffer_x) >= self._window_size // 4: + recent_x = list(self._buffer_x)[-self._window_size // 4 :] + recent_y = list(self._buffer_y)[-self._window_size // 4 :] + + std_x = np.std(recent_x) + std_y = np.std(recent_y) + + if std_x > _epsilon_ and std_y > _epsilon_: + try: + corr_before = np.corrcoef(recent_x, recent_y)[0, 1] + + test_x = [*recent_x, x_obs] + test_y = [*recent_y, y_obs] + + corr_after = np.corrcoef(test_x, test_y)[0, 1] + + if ( + not np.isnan(corr_before) + and not np.isnan(corr_after) + and abs(corr_after - corr_before) > threshold + ): + self._last_change_point = self._position + except Exception: + pass + + def _compute_entropy(self, max_entropy_value: int) -> None: + """ + Computes the conditional entropy for the most recent observations in the buffer. + If the number of entropy values exceeds + a predefined threshold, it checks if the difference between the latest + two entropy values is significant, indicating a change point. + + :param max_entropy_value: The maximum number of entropy values to consider when detecting changes. + """ + window_x = np.array(list(self._buffer_x)[-self._window_size :]) + window_y = np.array(list(self._buffer_y)[-self._window_size :]) + + current_entropy = self._compute_conditional_entropy(window_x, window_y) + self._entropy_values.append(current_entropy) + + if len(self._entropy_values) >= max_entropy_value: + entropy_diff = abs(self._entropy_values[-1] - self._entropy_values[-2]) + + if entropy_diff > self._threshold: + self._last_change_point = self._position - self._window_size // 2 + + def _compute_conditional_entropy(self, time_series_x: np.ndarray, time_series_y: np.ndarray) -> float: + """ + Computes the conditional entropy of the time series using joint and conditional probability distributions. + + :param time_series_x: The X time series to be analyzed. + :param time_series_y: The Y time series to be analyzed. + :return: The calculated conditional entropy value. + """ + hist2d, _, _ = np.histogram2d(time_series_x, time_series_y, bins=[self._bins, self._bins]) + joint_probability_matrix = hist2d / np.sum(hist2d) + p_y = np.sum(joint_probability_matrix, axis=0) + conditional_probability = np.divide(joint_probability_matrix, p_y, where=p_y != 0) + H_X_given_Y = -np.nansum( + joint_probability_matrix * np.log2(conditional_probability, where=conditional_probability > 0) + ) + return H_X_given_Y diff --git a/pysatl_cpd/core/algorithms/entropies/permutation_entropy.py b/pysatl_cpd/core/algorithms/entropies/permutation_entropy.py new file mode 100644 index 0000000..ed582af --- /dev/null +++ b/pysatl_cpd/core/algorithms/entropies/permutation_entropy.py @@ -0,0 +1,187 @@ +from collections import Counter, deque +from typing import Optional + +import numpy as np +import numpy.typing as npt + +from pysatl_cpd.core.algorithms.online_algorithm import OnlineAlgorithm + + +class PermutationEntropyAlgorithm(OnlineAlgorithm): + """ + **PermutationEntropyAlgorithm** detects change points in + a time series using permutation entropy. + + This algorithm calculates permutation entropy values based on the order + of values within sliding windows of the time series, + and detects significant changes based on predefined thresholds. + + :param window_size: Size of each sliding window. + :param embedding_dimension: The embedding dimension used to form state vectors. + :param time_delay: Time delay between elements in each state vector. + :param threshold: Threshold for detecting changes based on entropy differences. + + **Attributes:** + - `window_size` (int): Size of each sliding window. + - `embedding_dimension` (int): The embedding dimension used for + calculating permutation entropy. + - `time_delay` (int): The time delay used in each state vector. + - `threshold` (float): Threshold for change detection based on entropy shift. + - `min_observations_for_detection` (int): Minimum number of + observations required to detect a change point. + - `_special_case` (bool): Flag indicating if a special case (large time delay) + is being processed. + + **Methods:** + + **`detect(window: Iterable[float | np.float64]) -> int`** + Detects the number of change points in a time series. + + :param window: Input time series. + :return: Number of detected change points. + """ + + def __init__( + self, + window_size: int = 40, + embedding_dimension: int = 3, + time_delay: int = 1, + threshold: float = 0.2, + ): + """ + Initializes the PermutationEntropyAlgorithm with the specified parameters. + + :param window_size: Size of each sliding window. + :param embedding_dimension: The embedding dimension used for calculating permutation entropy. + :param time_delay: Time delay between elements in each state vector. + :param threshold: Threshold for detecting changes based on entropy differences. + """ + self._window_size = window_size + self._embedding_dimension = embedding_dimension + self._time_delay = time_delay + self._threshold = threshold + + self._buffer: deque[float] = deque(maxlen=window_size * 2) + self._entropy_values: list[float] = [] + self._position: int = 0 + self._last_change_point: Optional[int] = None + self._prev_observation: Optional[float] = None + self._constant_value_count: int = 0 + self._min_observations_for_detection = window_size + self.min_time_delay = 100 + + self._special_case = time_delay > self.min_time_delay + + def detect(self, observation: np.float64 | npt.NDArray[np.float64]) -> bool: + """ + Processes the input observation to detect if a change point occurs in the time series. + + :param observation: A single observation or an array of observations. + :return: `True` if a change point is detected, otherwise `False`. + """ + if self._special_case: + return False + + if isinstance(observation, np.ndarray): + for obs in observation: + self._process_single_observation(float(obs)) + else: + self._process_single_observation(float(observation)) + + return self._last_change_point is not None and self._position >= self._min_observations_for_detection + + def localize(self, observation: np.float64 | npt.NDArray[np.float64]) -> Optional[int]: + """ + Localizes the detected change point based on the observation. + + :param observation: A single observation or an array of observations. + :return: The position of the detected change point, or `None` if no change point is detected. + """ + change_detected = self.detect(observation) + + if change_detected: + change_point = self._last_change_point + self._last_change_point = None + return change_point + + return None + + def _process_single_observation(self, observation: float) -> None: + """ + Processes a single observation and updates the internal state. + This method checks for constant values, significant + deviations from the moving average, and computes entropy for + the sliding window to detect changes. + + :param observation: The observation value to be processed. + """ + threshold = 10 + threshold1 = 0.5 + threshold2 = 2 + + if self._prev_observation is not None and self._position >= self._min_observations_for_detection: + if observation == self._prev_observation: + self._constant_value_count += 1 + else: + if self._constant_value_count >= threshold: + self._last_change_point = self._position + self._constant_value_count = 0 + + self._prev_observation = observation + + if len(self._buffer) >= threshold and self._position >= self._min_observations_for_detection: + buffer_mean = sum(list(self._buffer)[-10:]) / 10 + if abs(observation - buffer_mean) > threshold1: + self._last_change_point = self._position + + if len(self._buffer) >= threshold and self._position >= self._min_observations_for_detection: + recent_values = np.array(list(self._buffer)[-10:]) + std_val = np.std(recent_values) + if std_val > 0 and abs(observation - np.mean(recent_values)) > 2 * std_val: + self._last_change_point = self._position + + self._buffer.append(observation) + self._position += 1 + + min_required = self._embedding_dimension * self._time_delay + 1 + if len(self._buffer) < self._window_size or len(self._buffer) < min_required: + return + + current_window = np.array(list(self._buffer)[-self._window_size :]) + current_entropy = self._calculate_permutation_entropy(current_window) + self._entropy_values.append(current_entropy) + + if len(self._entropy_values) >= threshold2 and self._position >= self._min_observations_for_detection: + entropy_diff = abs(self._entropy_values[-1] - self._entropy_values[-2]) + + if entropy_diff > self._threshold: + self._last_change_point = self._position - self._window_size // 2 + + def _calculate_permutation_entropy(self, time_series: np.ndarray) -> float: + """ + Calculates the permutation entropy of a time series using the order of values in the sliding windows. + The entropy is computed based on the frequency of different permutations of the state vectors. + + :param time_series: The time series data to analyze. + :return: The calculated permutation entropy value. + """ + permutation_vectors = [] + for index in range(len(time_series) - self._embedding_dimension * self._time_delay): + current_window = time_series[ + index : index + self._embedding_dimension * self._time_delay : self._time_delay + ] + permutation_vector = np.argsort(current_window) + permutation_vectors.append(tuple(permutation_vector)) + + permutation_counts = Counter(permutation_vectors) + total_permutations = len(permutation_vectors) + + if total_permutations == 0: + return 0.0 + + permutation_probabilities = [count / total_permutations for count in permutation_counts.values()] + permutation_entropy = -np.sum( + [probability * np.log2(probability) for probability in permutation_probabilities if probability > 0] + ) + + return permutation_entropy diff --git a/pysatl_cpd/core/algorithms/entropies/shannon_entropy.py b/pysatl_cpd/core/algorithms/entropies/shannon_entropy.py new file mode 100644 index 0000000..1c9c60b --- /dev/null +++ b/pysatl_cpd/core/algorithms/entropies/shannon_entropy.py @@ -0,0 +1,160 @@ +from collections import deque +from typing import Optional + +import numpy as np +import numpy.typing as npt + +from pysatl_cpd.core.algorithms.online_algorithm import OnlineAlgorithm + + +class ShannonEntropyAlgorithm(OnlineAlgorithm): + """ + **ShannonEntropyAlgorithm** detects change points in time series using Shannon entropy. + + This algorithm estimates the information content of sliding windows using histogram-based + Shannon entropy. Significant differences in entropy between windows are used to detect + structural changes in the signal. + + :param window_size: Size of each sliding window. + :param step: Step size for moving the sliding window. + :param bins: Number of bins used to create histograms. + :param threshold: Threshold for detecting changes based on entropy differences. + + **Attributes:** + - `window_size` (int): Size of each sliding window. + - `step` (int): Step size for moving the sliding window. + - `bins` (int): Number of histogram bins used for entropy estimation. + - `threshold` (float): Threshold for change detection based on entropy shift. + + **Methods:** + + **`detect(window: Iterable[float | np.float64]) -> int`** + Detects the number of change points in a time series. + + :param window: Input time series. + :return: Number of detected change points. + """ + + def __init__( + self, + window_size: int = 40, + step: int = 20, + bins: int = 10, + threshold: float = 0.3, + ): + """ + Initializes the ShannonEntropyAlgorithm with the specified parameters. + + :param window_size: Size of each sliding window. + :param step: Step size for moving the sliding window. + :param bins: Number of bins used to create histograms for entropy calculation. + :param threshold: Threshold for detecting changes based on entropy differences. + """ + self._window_size = window_size + self._bins = bins + self._threshold = threshold + + self._buffer: deque[float] = deque(maxlen=window_size * 2) + self._entropy_values: list[float] = [] + self._position: int = 0 + self._last_change_point: Optional[int] = None + self._prev_observation: Optional[float] = None + self._constant_value_count: int = 0 + + def detect(self, observation: np.float64 | npt.NDArray[np.float64]) -> bool: + """ + Processes the input observation to detect if a change point occurs in the time series. + + :param observation: A single observation or an array of observations. + :return: `True` if a change point is detected, otherwise `False`. + """ + if isinstance(observation, np.ndarray): + for obs in observation: + self._process_single_observation(float(obs)) + else: + self._process_single_observation(float(observation)) + + return self._last_change_point is not None + + def localize(self, observation: np.float64 | npt.NDArray[np.float64]) -> Optional[int]: + """ + Localizes the detected change point based on the observation. + + :param observation: A single observation or an array of observations. + :return: The position of the detected change point, or `None` if no change point is detected. + """ + change_detected = self.detect(observation) + + if change_detected: + change_point = self._last_change_point + self._last_change_point = None + return change_point + + return None + + def _process_single_observation(self, observation: float) -> None: + """ + Processes a single observation and updates the internal state to detect potential change points. + + This method checks for constant values, significant deviations from the moving average, + and computes entropy for the sliding window to detect changes. + + :param observation: The observation value to be processed. + """ + threshold = 10 + threshold1 = 0.5 + threshold2 = 2 + if self._prev_observation is not None: + if observation == self._prev_observation: + self._constant_value_count += 1 + else: + if self._constant_value_count >= threshold: + self._last_change_point = self._position + self._constant_value_count = 0 + + self._prev_observation = observation + + if len(self._buffer) >= self._window_size // 2: + buffer_mean = sum(list(self._buffer)[-self._window_size // 2 :]) / (self._window_size // 2) + if abs(observation - buffer_mean) > threshold1: + self._last_change_point = self._position + + if len(self._buffer) >= threshold: + recent_values = np.array(list(self._buffer)[-10:]) + std_val = np.std(recent_values) + if std_val > 0 and abs(observation - np.mean(recent_values)) > 2 * std_val: + self._last_change_point = self._position + + self._buffer.append(observation) + self._position += 1 + + if len(self._buffer) < self._window_size: + return + + window = np.array(list(self._buffer)[-self._window_size :]) + hist, _ = np.histogram(window, bins=self._bins, density=True) + hist = hist / np.sum(hist) + current_entropy = self._compute_entropy(hist) + + self._entropy_values.append(current_entropy) + + if len(self._entropy_values) >= threshold2: + entropy_diff = abs(self._entropy_values[-1] - self._entropy_values[-2]) + + if entropy_diff > self._threshold: + self._last_change_point = self._position - self._window_size // 2 + + def _compute_entropy(self, probabilities: np.ndarray) -> float: + """ + Computes Shannon entropy based on a probability distribution. + + This method uses the Shannon entropy formula to measure the uncertainty or disorder in + the distribution of the observations in the sliding window. + + :param probabilities: A numpy array representing the probability distribution of observations. + :return: The computed Shannon entropy value. + """ + probabilities = probabilities[probabilities > 0] + if len(probabilities) == 0: + return 0.0 + return -np.sum(probabilities * np.log2(probabilities)) diff --git a/pysatl_cpd/core/algorithms/graph/abstracts/ibuilder.py b/pysatl_cpd/core/algorithms/graph/abstracts/ibuilder.py index 1239cc7..d8714b0 100644 --- a/pysatl_cpd/core/algorithms/graph/abstracts/ibuilder.py +++ b/pysatl_cpd/core/algorithms/graph/abstracts/ibuilder.py @@ -10,7 +10,9 @@ class IBuilder(ABC): def __init__( - self, data: Iterable[np.float64] | Iterable[npt.NDArray[np.float64]], compare: Callable[[Any, Any], bool] + self, + data: Iterable[np.float64] | Iterable[npt.NDArray[np.float64]], + compare: Callable[[Any, Any], bool], ): """ Initialize the builder with data and a comparison function. diff --git a/pysatl_cpd/core/algorithms/kliep_algorithm.py b/pysatl_cpd/core/algorithms/kliep_algorithm.py index 39eb255..c5f69a5 100644 --- a/pysatl_cpd/core/algorithms/kliep_algorithm.py +++ b/pysatl_cpd/core/algorithms/kliep_algorithm.py @@ -4,7 +4,9 @@ import numpy.typing as npt from numpy import dtype, float64, ndarray -from pysatl_cpd.core.algorithms.density.abstracts.density_based_algorithm import DensityBasedAlgorithm +from pysatl_cpd.core.algorithms.density.abstracts.density_based_algorithm import ( + DensityBasedAlgorithm, +) class KliepAlgorithm(DensityBasedAlgorithm): diff --git a/pysatl_cpd/core/algorithms/knn/classifier.py b/pysatl_cpd/core/algorithms/knn/classifier.py index 8d8c822..544f04b 100644 --- a/pysatl_cpd/core/algorithms/knn/classifier.py +++ b/pysatl_cpd/core/algorithms/knn/classifier.py @@ -22,7 +22,13 @@ class KNNClassifier: def __init__( self, - metric: tp.Callable[[np.float64 | npt.NDArray[np.float64], np.float64 | npt.NDArray[np.float64]], float], + metric: tp.Callable[ + [ + np.float64 | npt.NDArray[np.float64], + np.float64 | npt.NDArray[np.float64], + ], + float, + ], k: int = 7, delta: float = 1e-12, ) -> None: diff --git a/pysatl_cpd/core/algorithms/knn/graph.py b/pysatl_cpd/core/algorithms/knn/graph.py index 5763dd5..21e8a69 100644 --- a/pysatl_cpd/core/algorithms/knn/graph.py +++ b/pysatl_cpd/core/algorithms/knn/graph.py @@ -24,7 +24,13 @@ class KNNGraph: def __init__( self, window: npt.NDArray[np.float64], - metric: tp.Callable[[np.float64 | npt.NDArray[np.float64], np.float64 | npt.NDArray[np.float64]], float], + metric: tp.Callable[ + [ + np.float64 | npt.NDArray[np.float64], + np.float64 | npt.NDArray[np.float64], + ], + float, + ], k: int = 7, delta: float = 1e-12, ) -> None: diff --git a/pysatl_cpd/core/algorithms/knn_algorithm.py b/pysatl_cpd/core/algorithms/knn_algorithm.py index 56a4a5c..57efa9b 100644 --- a/pysatl_cpd/core/algorithms/knn_algorithm.py +++ b/pysatl_cpd/core/algorithms/knn_algorithm.py @@ -12,7 +12,9 @@ import numpy.typing as npt from pysatl_cpd.core.algorithms.abstract_algorithm import Algorithm -from pysatl_cpd.core.algorithms.classification.abstracts.istatistic_test import TestStatistic +from pysatl_cpd.core.algorithms.classification.abstracts.istatistic_test import ( + TestStatistic, +) from pysatl_cpd.core.algorithms.knn.classifier import KNNClassifier @@ -23,7 +25,13 @@ class KNNAlgorithm(Algorithm): def __init__( self, - distance_func: tp.Callable[[np.float64 | npt.NDArray[np.float64], np.float64 | npt.NDArray[np.float64]], float], + distance_func: tp.Callable[ + [ + np.float64 | npt.NDArray[np.float64], + np.float64 | npt.NDArray[np.float64], + ], + float, + ], test_statistic: TestStatistic, indent_coeff: float, k: int = 7, diff --git a/pysatl_cpd/core/algorithms/rulsif_algorithm.py b/pysatl_cpd/core/algorithms/rulsif_algorithm.py index f0f04da..b28e32d 100644 --- a/pysatl_cpd/core/algorithms/rulsif_algorithm.py +++ b/pysatl_cpd/core/algorithms/rulsif_algorithm.py @@ -3,7 +3,9 @@ import numpy as np import numpy.typing as npt -from pysatl_cpd.core.algorithms.density.abstracts.density_based_algorithm import DensityBasedAlgorithm +from pysatl_cpd.core.algorithms.density.abstracts.density_based_algorithm import ( + DensityBasedAlgorithm, +) class RulsifAlgorithm(DensityBasedAlgorithm): diff --git a/pysatl_cpd/generator/config_parser.py b/pysatl_cpd/generator/config_parser.py index b2c3109..18dd9e2 100644 --- a/pysatl_cpd/generator/config_parser.py +++ b/pysatl_cpd/generator/config_parser.py @@ -46,7 +46,10 @@ def _parse_config(config: ParsedConfig) -> list[SampleDescription]: descriptions: list[SampleDescription] = [] for descr in config: db = DatasetDescriptionBuilder() - match (descr[ConfigParser.NAME_FIELD], descr[ConfigParser.DISTRIBUTION_FIELD]): + match ( + descr[ConfigParser.NAME_FIELD], + descr[ConfigParser.DISTRIBUTION_FIELD], + ): case (str(name), list(distributions)): pass case _: diff --git a/pysatl_cpd/generator/dataset_description.py b/pysatl_cpd/generator/dataset_description.py index de4fd89..8d6770d 100644 --- a/pysatl_cpd/generator/dataset_description.py +++ b/pysatl_cpd/generator/dataset_description.py @@ -117,7 +117,10 @@ def set_name(self, name: str) -> None: self._name = name def add_distribution( - self, distribution_type: str, distribution_length: int, distribution_parameters: dict[str, str] + self, + distribution_type: str, + distribution_length: int, + distribution_parameters: dict[str, str], ) -> None: """Add new distribution to dataset diff --git a/pysatl_cpd/generator/saver.py b/pysatl_cpd/generator/saver.py index 254a869..213054f 100644 --- a/pysatl_cpd/generator/saver.py +++ b/pysatl_cpd/generator/saver.py @@ -56,7 +56,13 @@ def save_sample(self, sample: npt.NDArray[np.float64], description: SampleDescri # Save sample plot image_file: Path = sample_dir.joinpath(DatasetSaver.SAMPLE_IMAGE) plt.plot(sample) - plt.vlines(x=changepoints, ymin=sample.min(), ymax=sample.max(), colors="orange", ls="--") + plt.vlines( + x=changepoints, + ymin=sample.min(), + ymax=sample.max(), + colors="orange", + ls="--", + ) plt.savefig(image_file) plt.close() # Save description diff --git a/pysatl_cpd/icpd_solver.py b/pysatl_cpd/icpd_solver.py index f90b14a..29fa47e 100644 --- a/pysatl_cpd/icpd_solver.py +++ b/pysatl_cpd/icpd_solver.py @@ -111,7 +111,12 @@ def count_recall(self, window: tuple[int, int] | None = None) -> float: raise ValueError("this object is not provided with expected result, recall cannot be calculated") return CpdResultsAnalyzer.count_recall(self.result, self.expected_result, window) - def visualize(self, to_show: bool = True, output_directory: Path | None = None, name: str = "plot") -> None: + def visualize( + self, + to_show: bool = True, + output_directory: Path | None = None, + name: str = "plot", + ) -> None: """method for building and analyzing graph :param to_show: is it necessary to show a graph @@ -122,7 +127,13 @@ def visualize(self, to_show: bool = True, output_directory: Path | None = None, data: npt.NDArray[np.float64] = np.array(list(self.data)) plt.plot(data) if self.expected_result is None: - plt.vlines(x=self.result, ymin=data.min(), ymax=data.max(), colors="orange", ls="--") + plt.vlines( + x=self.result, + ymin=data.min(), + ymax=data.max(), + colors="orange", + ls="--", + ) plt.gca().legend(("data", "detected")) else: correct, incorrect, undetected = set(), set(), set(self.expected_result) @@ -132,9 +143,27 @@ def visualize(self, to_show: bool = True, output_directory: Path | None = None, undetected.remove(point) elif point not in undetected: incorrect.add(point) - plt.vlines(x=list(correct), ymin=data.min(), ymax=data.max(), colors="green", ls="--") - plt.vlines(x=list(incorrect), ymin=data.min(), ymax=data.max(), colors="red", ls="--") - plt.vlines(x=list(undetected), ymin=data.min(), ymax=data.max(), colors="orange", ls="--") + plt.vlines( + x=list(correct), + ymin=data.min(), + ymax=data.max(), + colors="green", + ls="--", + ) + plt.vlines( + x=list(incorrect), + ymin=data.min(), + ymax=data.max(), + colors="red", + ls="--", + ) + plt.vlines( + x=list(undetected), + ymin=data.min(), + ymax=data.max(), + colors="orange", + ls="--", + ) plt.gca().legend(("data", "correct detected", "incorrect detected", "undetected")) if output_directory: if not output_directory.exists(): diff --git a/pysatl_cpd/labeled_data.py b/pysatl_cpd/labeled_data.py index cd86f7b..b64fdd4 100644 --- a/pysatl_cpd/labeled_data.py +++ b/pysatl_cpd/labeled_data.py @@ -77,7 +77,9 @@ def generate_cp_datasets( return labeled_data_dict @staticmethod - def read_generated_datasets(datasets_directory: Path) -> dict[str, "LabeledCpdData"]: + def read_generated_datasets( + datasets_directory: Path, + ) -> dict[str, "LabeledCpdData"]: """Read already generated datasets from directory :param datasets_directory: directory with datasets diff --git a/tests/test_core/test_algorithms/test_bayesian_algorithm.py b/tests/test_core/test_algorithms/test_bayesian_algorithm.py index 4300166..469cb50 100644 --- a/tests/test_core/test_algorithms/test_bayesian_algorithm.py +++ b/tests/test_core/test_algorithms/test_bayesian_algorithm.py @@ -41,7 +41,11 @@ def _generate_data(): return np.concatenate( [ np.random.normal(loc=0, scale=1, size=data_params["change_point"]), - np.random.normal(loc=5, scale=2, size=data_params["size"] - data_params["change_point"]), + np.random.normal( + loc=5, + scale=2, + size=data_params["size"] - data_params["change_point"], + ), ] ) diff --git a/tests/test_core/test_algorithms/test_bayesian_online_algorithm.py b/tests/test_core/test_algorithms/test_bayesian_online_algorithm.py index 07e0d97..324c0cd 100644 --- a/tests/test_core/test_algorithms/test_bayesian_online_algorithm.py +++ b/tests/test_core/test_algorithms/test_bayesian_online_algorithm.py @@ -41,7 +41,11 @@ def _generate_data(): return np.concatenate( [ np.random.normal(loc=0, scale=1, size=data_params["change_point"]), - np.random.normal(loc=5, scale=2, size=data_params["size"] - data_params["change_point"]), + np.random.normal( + loc=5, + scale=2, + size=data_params["size"] - data_params["change_point"], + ), ] ) @@ -74,7 +78,11 @@ def test_consecutive_detection(self, outer_bayesian_algorithm, generate_data, da assert was_change_point, "There was undetected change point in data" def test_correctness_of_consecutive_detection( - self, outer_bayesian_algorithm, inner_algorithm_factory, generate_data, data_params + self, + outer_bayesian_algorithm, + inner_algorithm_factory, + generate_data, + data_params, ): for _ in range(data_params["num_of_tests"]): data = generate_data() @@ -111,7 +119,11 @@ def test_consecutive_localization(self, outer_bayesian_algorithm, generate_data, assert was_change_point, "Actual change point was not detected at all" def test_correctness_of_consecutive_localization( - self, outer_bayesian_algorithm, inner_algorithm_factory, generate_data, data_params + self, + outer_bayesian_algorithm, + inner_algorithm_factory, + generate_data, + data_params, ): for _ in range(data_params["num_of_tests"]): data = generate_data() diff --git a/tests/test_core/test_algorithms/test_classification_algorithms.py b/tests/test_core/test_algorithms/test_classification_algorithms.py index ce22b80..63a0e1f 100644 --- a/tests/test_core/test_algorithms/test_classification_algorithms.py +++ b/tests/test_core/test_algorithms/test_classification_algorithms.py @@ -5,13 +5,21 @@ import pytest import pysatl_cpd.generator.distributions as dstr -from pysatl_cpd.core.algorithms.classification.classifiers.decision_tree import DecisionTreeClassifier +from pysatl_cpd.core.algorithms.classification.classifiers.decision_tree import ( + DecisionTreeClassifier, +) from pysatl_cpd.core.algorithms.classification.classifiers.knn import KNNClassifier from pysatl_cpd.core.algorithms.classification.classifiers.rf import RFClassifier from pysatl_cpd.core.algorithms.classification.classifiers.svm import SVMClassifier -from pysatl_cpd.core.algorithms.classification.quality_metrics.classification.f1 import F1 -from pysatl_cpd.core.algorithms.classification.quality_metrics.classification.mcc import MCC -from pysatl_cpd.core.algorithms.classification.test_statistics.threshold_overcome import ThresholdOvercome +from pysatl_cpd.core.algorithms.classification.quality_metrics.classification.f1 import ( + F1, +) +from pysatl_cpd.core.algorithms.classification.quality_metrics.classification.mcc import ( + MCC, +) +from pysatl_cpd.core.algorithms.classification.test_statistics.threshold_overcome import ( + ThresholdOvercome, +) from pysatl_cpd.core.algorithms.classification_algorithm import ClassificationAlgorithm from pysatl_cpd.core.algorithms.knn_algorithm import KNNAlgorithm from pysatl_cpd.core.scrubber.data_providers import LabeledDataProvider @@ -36,7 +44,7 @@ def assert_result(actual): def in_interval(cp): return EXPECTED_CP - TOLERABLE_DEVIATION <= cp <= EXPECTED_CP + TOLERABLE_DEVIATION - assert (len(actual) > 0 and all(in_interval(cp) for cp in actual)), "Incorrect change point localization" + assert len(actual) > 0 and all(in_interval(cp) for cp in actual), "Incorrect change point localization" def build_classification_alg(classifier_name, metric_name): @@ -62,10 +70,12 @@ def build_classification_alg(classifier_name, metric_name): case _: raise NotImplementedError("No such classifier yet.") - return ClassificationAlgorithm(classifier=classifier, - quality_metric=quality_metric, - test_statistic=ThresholdOvercome(threshold), - indent_coeff=INDENT_COEFF) + return ClassificationAlgorithm( + classifier=classifier, + quality_metric=quality_metric, + test_statistic=ThresholdOvercome(threshold), + indent_coeff=INDENT_COEFF, + ) def build_solver(alg, data): @@ -77,12 +87,8 @@ def build_solver(alg, data): @pytest.fixture(scope="session") def univariate_data(): np.random.seed(1) - left_distr = dstr.Distribution.from_str( - str(dstr.Distributions.UNIFORM), - {"min": "2.0", "max": "2.1"}) - right_distr = dstr.Distribution.from_str( - str(dstr.Distributions.UNIFORM), - {"min": "0.0", "max": "0.1"}) + left_distr = dstr.Distribution.from_str(str(dstr.Distributions.UNIFORM), {"min": "2.0", "max": "2.1"}) + right_distr = dstr.Distribution.from_str(str(dstr.Distributions.UNIFORM), {"min": "0.0", "max": "0.1"}) return np.concatenate( [ left_distr.scipy_sample(EXPECTED_CP), @@ -94,24 +100,20 @@ def univariate_data(): @pytest.fixture(scope="session") def multivariate_data(): np.random.seed(1) - left_distr = dstr.Distribution.from_str( - str(dstr.Distributions.MULTIVARIATIVE_NORMAL), - {"mean": str([0.0] * 10)}) - right_distr = dstr.Distribution.from_str( - str(dstr.Distributions.MULTIVARIATIVE_NORMAL), - {"mean": str([5.0] * 10)}) + left_distr = dstr.Distribution.from_str(str(dstr.Distributions.MULTIVARIATIVE_NORMAL), {"mean": str([0.0] * 10)}) + right_distr = dstr.Distribution.from_str(str(dstr.Distributions.MULTIVARIATIVE_NORMAL), {"mean": str([5.0] * 10)}) return np.concatenate( [ left_distr.scipy_sample(EXPECTED_CP), - right_distr.scipy_sample(SIZE - EXPECTED_CP) + right_distr.scipy_sample(SIZE - EXPECTED_CP), ] ) class TestClassificationCpd: @pytest.mark.parametrize( - "classifier_name, metric", - list(product(CLASSIFIERS, METRICS)), + "classifier_name, metric", + list(product(CLASSIFIERS, METRICS)), ) def test_classification_cpd_univariate(self, classifier_name, metric, univariate_data): alg = build_classification_alg(classifier_name, metric) @@ -120,8 +122,8 @@ def test_classification_cpd_univariate(self, classifier_name, metric, univariate assert_result(actual) @pytest.mark.parametrize( - "classifier_name, metric", - list(product(CLASSIFIERS, METRICS)), + "classifier_name, metric", + list(product(CLASSIFIERS, METRICS)), ) def test_classification_cpd_multivariate(self, classifier_name, metric, multivariate_data): alg = build_classification_alg(classifier_name, metric) @@ -136,20 +138,24 @@ def knn_cpd_univariate(self): def metric(obs1: float, obs2: float) -> float: return abs(obs1 - obs2) - return KNNAlgorithm(distance_func=metric, - test_statistic=ThresholdOvercome(CM_THRESHOLD), - indent_coeff=INDENT_COEFF, - k=K) + return KNNAlgorithm( + distance_func=metric, + test_statistic=ThresholdOvercome(CM_THRESHOLD), + indent_coeff=INDENT_COEFF, + k=K, + ) @pytest.fixture(scope="function") def knn_cpd_multivariate(self): def metric(obs1: npt.NDArray[np.float64], obs2: npt.NDArray[np.float64]) -> float: return float(np.linalg.norm(obs1 - obs2)) - return KNNAlgorithm(distance_func=metric, - test_statistic=ThresholdOvercome(CM_THRESHOLD), - indent_coeff=INDENT_COEFF, - k=K) + return KNNAlgorithm( + distance_func=metric, + test_statistic=ThresholdOvercome(CM_THRESHOLD), + indent_coeff=INDENT_COEFF, + k=K, + ) def test_knn_cpd_univariate(self, knn_cpd_univariate, univariate_data): solver = build_solver(knn_cpd_univariate, univariate_data) diff --git a/tests/test_core/test_algorithms/test_entropies/test_bubble_entropy_algorithm.py b/tests/test_core/test_algorithms/test_entropies/test_bubble_entropy_algorithm.py new file mode 100644 index 0000000..05e6d69 --- /dev/null +++ b/tests/test_core/test_algorithms/test_entropies/test_bubble_entropy_algorithm.py @@ -0,0 +1,168 @@ +import numpy as np +import pytest + +from pysatl_cpd.core.algorithms.entropies.bubble_entropy import ( + BubbleEntropyAlgorithm, +) + + +def set_seed(): + np.random.seed(1) + + +def construct_bubble_entropy_algorithm(): + return BubbleEntropyAlgorithm(window_size=40, embedding_dimension=3, time_delay=1, threshold=0.2) + + +@pytest.fixture(scope="function") +def data_params(): + return { + "num_of_tests": 10, + "size": 500, + "change_point": 250, + "tolerable_deviation": 30, + } + + +@pytest.fixture +def generate_data(data_params): + def _generate_data(): + set_seed() + data1 = np.zeros(data_params["change_point"]) + for i in range(5, data_params["change_point"]): + data1[i] = 0.6 * data1[i - 1] - 0.3 * data1[i - 2] + 0.1 * data1[i - 3] + np.random.normal(0, 0.1) + + data2 = np.zeros(data_params["size"] - data_params["change_point"]) + for i in range(5, len(data2)): + data2[i] = 0.2 * data2[i - 1] + 0.7 * data2[i - 2] + np.random.normal(0, 0.5) + + return np.concatenate([data1, data2]) + + return _generate_data + + +@pytest.fixture(scope="function") +def outer_bubble_algorithm(): + return construct_bubble_entropy_algorithm() + + +@pytest.fixture +def inner_algorithm_factory(): + def _factory(): + return construct_bubble_entropy_algorithm() + + return _factory + + +def test_online_detection(outer_bubble_algorithm, generate_data, data_params): + for _ in range(data_params["num_of_tests"]): + data = generate_data() + change_detected = False + + for point in data: + if outer_bubble_algorithm.detect(point): + change_detected = True + break + + assert change_detected + + +def test_online_localization(outer_bubble_algorithm, generate_data, data_params): + for _ in range(data_params["num_of_tests"]): + data = generate_data() + change_points = [] + + algorithm = construct_bubble_entropy_algorithm() + + for point_idx, point in enumerate(data): + cp = algorithm.localize(point) + if cp is not None: + change_points.append(cp) + + assert len(change_points) > 0 + + closest_point = min(change_points, key=lambda x: abs(x - data_params["change_point"])) + assert ( + data_params["change_point"] - data_params["tolerable_deviation"] + <= closest_point + <= data_params["change_point"] + data_params["tolerable_deviation"] + ) + + +def test_online_vs_batch_comparison(): + min_distances = 50 + size = 500 + change_point = 250 + + np.random.seed(42) + data1 = np.random.normal(0, 0.5, change_point) + data2 = np.random.normal(3, 0.5, size - change_point) + data = np.concatenate([data1, data2]) + + online_algorithm = construct_bubble_entropy_algorithm() + online_changes = [] + + for idx, point in enumerate(data): + cp = online_algorithm.localize(point) + if cp is not None: + online_changes.append(cp) + + assert len(online_changes) > 0 + + min_distance = min([abs(cp - change_point) for cp in online_changes]) + assert min_distance <= min_distances + + +def test_bubble_entropy_calculation(): + algorithm = construct_bubble_entropy_algorithm() + + t = np.linspace(0, 4 * np.pi, 100) + deterministic_signal = np.sin(t) + + for point in deterministic_signal: + algorithm.detect(point) + + algorithm = construct_bubble_entropy_algorithm() + np.random.seed(42) + random_signal = np.random.normal(0, 1, 100) + + for point in random_signal: + algorithm.detect(point) + + algorithm = construct_bubble_entropy_algorithm() + constant_signal = np.ones(100) + + changes_detected = False + for point in constant_signal: + if algorithm.detect(point): + changes_detected = True + + assert not changes_detected + + +def test_edge_cases(): + algorithm = construct_bubble_entropy_algorithm() + + changes_detected = False + for i in range(20): + if algorithm.detect(float(i)): + changes_detected = True + + assert not changes_detected + + algorithm = construct_bubble_entropy_algorithm() + for i in range(40): + algorithm.detect(float(i)) + + algorithm = construct_bubble_entropy_algorithm() + + for _ in range(50): + algorithm.detect(0.0) + + change_detected = False + for _ in range(50): + if algorithm.detect(5.0): + change_detected = True + break + + assert change_detected diff --git a/tests/test_core/test_algorithms/test_entropies/test_conditional_entropy_algorithm.py b/tests/test_core/test_algorithms/test_entropies/test_conditional_entropy_algorithm.py new file mode 100644 index 0000000..9127413 --- /dev/null +++ b/tests/test_core/test_algorithms/test_entropies/test_conditional_entropy_algorithm.py @@ -0,0 +1,156 @@ +import numpy as np +import pytest + +from pysatl_cpd.core.algorithms.entropies.conditional_entropy import ( + ConditionalEntropyAlgorithm, +) + + +def set_seed(): + np.random.seed(1) + + +@pytest.fixture(scope="function") +def data_params(): + return { + "num_of_tests": 10, + "size": 500, + "change_point": 250, + "tolerable_deviation": 30, + } + + +@pytest.fixture +def generate_data(data_params): + def _generate_data(): + set_seed() + data_x = np.concatenate( + [ + np.random.normal(loc=0, scale=1, size=data_params["change_point"]), + np.random.normal( + loc=5, + scale=2, + size=data_params["size"] - data_params["change_point"], + ), + ] + ) + data_y = np.concatenate( + [ + np.random.normal(loc=2, scale=1, size=data_params["change_point"]), + np.random.normal( + loc=-2, + scale=1, + size=data_params["size"] - data_params["change_point"], + ), + ] + ) + return data_x, data_y + + return _generate_data + + +@pytest.fixture +def conditional_algorithm_factory(): + def _factory(): + return ConditionalEntropyAlgorithm( + window_size=40, + bins=10, + threshold=0.3, + ) + + return _factory + + +def test_online_detection(conditional_algorithm_factory, generate_data, data_params): + for _ in range(data_params["num_of_tests"]): + data_x, data_y = generate_data() + algorithm = conditional_algorithm_factory() + change_detected = False + + for i in range(len(data_x)): + observation = np.array([data_x[i], data_y[i]]) + if algorithm.detect(observation): + change_detected = True + break + + assert change_detected + + +def test_online_localization(conditional_algorithm_factory, generate_data, data_params): + for _ in range(data_params["num_of_tests"]): + data_x, data_y = generate_data() + algorithm = conditional_algorithm_factory() + change_points = [] + + for i in range(len(data_x)): + observation = np.array([data_x[i], data_y[i]]) + cp = algorithm.localize(observation) + if cp is not None: + change_points.append(cp) + + assert len(change_points) > 0 + closest_point = min(change_points, key=lambda x: abs(x - data_params["change_point"])) + assert ( + data_params["change_point"] - data_params["tolerable_deviation"] + <= closest_point + <= data_params["change_point"] + data_params["tolerable_deviation"] + ) + + +def test_conditional_entropy(): + size = 200 + algorithm = ConditionalEntropyAlgorithm(window_size=40, bins=10, threshold=0.3) + + data_x = np.random.normal(0, 1, size) + data_y1 = data_x + np.random.normal(0, 0.1, size) + + for i in range(size): + algorithm.detect(np.array([data_x[i], data_y1[i]])) + + algorithm = ConditionalEntropyAlgorithm(window_size=40, bins=10, threshold=0.3) + + data_x = np.random.normal(0, 1, size) + data_y2 = np.random.normal(0, 1, size) + + change_detected = False + for i in range(size - 40): + observation = np.array([data_x[i], data_y2[i]]) + if algorithm.detect(observation): + change_detected = True + break + + for i in range(size - 40, size): + observation = np.array([data_x[i], data_y1[i]]) + if algorithm.detect(observation): + change_detected = True + break + + assert change_detected + + +def test_edge_cases(): + algorithm = ConditionalEntropyAlgorithm(window_size=10, bins=5, threshold=0.3) + + with pytest.raises(ValueError): + algorithm.detect(np.float64(1.0)) + + algorithm = ConditionalEntropyAlgorithm(window_size=40, bins=10, threshold=0.3) + + for i in range(20): + observation = np.array([float(i), float(i + 1)]) + assert not algorithm.detect(observation) + + algorithm = ConditionalEntropyAlgorithm(window_size=40, bins=10, threshold=0.3) + + for i in range(50): + observation = np.array([0.0, 0.0]) + algorithm.detect(observation) + + change_detected = False + for i in range(50): + observation = np.array([5.0, -5.0]) + if algorithm.detect(observation): + change_detected = True + break + + assert change_detected diff --git a/tests/test_core/test_algorithms/test_entropies/test_permutation_entropy_algorithm.py b/tests/test_core/test_algorithms/test_entropies/test_permutation_entropy_algorithm.py new file mode 100644 index 0000000..ce2a8d3 --- /dev/null +++ b/tests/test_core/test_algorithms/test_entropies/test_permutation_entropy_algorithm.py @@ -0,0 +1,174 @@ +import numpy as np +import pytest + +from pysatl_cpd.core.algorithms.entropies.permutation_entropy import ( + PermutationEntropyAlgorithm, +) + + +def set_seed(): + np.random.seed(1) + + +def construct_permutation_entropy_algorithm(): + return PermutationEntropyAlgorithm(window_size=40, embedding_dimension=3, time_delay=1, threshold=0.2) + + +@pytest.fixture(scope="function") +def data_params(): + return { + "num_of_tests": 10, + "size": 500, + "change_point": 250, + "tolerable_deviation": 30, + } + + +@pytest.fixture +def generate_data(data_params): + def _generate_data(): + set_seed() + t1 = np.linspace(0, 10 * np.pi, data_params["change_point"]) + data1 = np.sin(t1) + np.random.normal(0, 0.1, size=len(t1)) + data2 = np.random.normal(0, 1, size=data_params["size"] - data_params["change_point"]) + return np.concatenate([data1, data2]) + + return _generate_data + + +@pytest.fixture(scope="function") +def outer_permutation_algorithm(): + return construct_permutation_entropy_algorithm() + + +@pytest.fixture +def inner_algorithm_factory(): + def _factory(): + return construct_permutation_entropy_algorithm() + + return _factory + + +def test_online_detection(outer_permutation_algorithm, generate_data, data_params): + for _ in range(data_params["num_of_tests"]): + data = generate_data() + change_detected = False + + for point in data: + if outer_permutation_algorithm.detect(point): + change_detected = True + break + + assert change_detected + + +def test_online_localization(outer_permutation_algorithm, generate_data, data_params): + for _ in range(data_params["num_of_tests"]): + data = generate_data() + change_points = [] + + algorithm = construct_permutation_entropy_algorithm() + + for point_idx, point in enumerate(data): + cp = algorithm.localize(point) + if cp is not None: + change_points.append(cp) + + assert len(change_points) > 0 + + closest_point = min(change_points, key=lambda x: abs(x - data_params["change_point"])) + assert ( + data_params["change_point"] - data_params["tolerable_deviation"] + <= closest_point + <= data_params["change_point"] + data_params["tolerable_deviation"] + ) + + +def test_embedding_dimension_effect(): + t = np.linspace(0, 10 * np.pi, 200) + signal = np.sin(t) + 0.1 * np.random.normal(0, 1, 200) + + algorithm1 = PermutationEntropyAlgorithm( + window_size=40, + embedding_dimension=2, + time_delay=1, + threshold=0.2, + ) + + algorithm2 = PermutationEntropyAlgorithm( + window_size=40, + embedding_dimension=5, + time_delay=1, + threshold=0.2, + ) + + change_points1 = [] + change_points2 = [] + + for point in signal: + if algorithm1.detect(point): + change_points1.append(algorithm1._position) + + if algorithm2.detect(point): + change_points2.append(algorithm2._position) + + assert len(change_points1) <= len(change_points2) + + +def test_time_delay_effect(): + t = np.linspace(0, 20 * np.pi, 400) + + algorithm1 = PermutationEntropyAlgorithm(window_size=40, embedding_dimension=3, time_delay=1, threshold=0.2) + + algorithm2 = PermutationEntropyAlgorithm(window_size=40, embedding_dimension=3, time_delay=1000, threshold=0.2) + + signal = np.concatenate([np.sin(t[:200]), np.random.normal(0, 1, 200)]) + + change_detected1 = False + for point in signal: + if algorithm1.detect(point): + change_detected1 = True + break + + change_detected2 = False + for point in signal: + if algorithm2.detect(point): + change_detected2 = True + break + + assert change_detected1 != change_detected2 + + +def test_edge_cases(): + algorithm = construct_permutation_entropy_algorithm() + + changes_detected = False + for i in range(20): + if algorithm.detect(float(i)): + changes_detected = True + + assert not changes_detected + + algorithm = construct_permutation_entropy_algorithm() + + for _ in range(50): + algorithm.detect(0.0) + + change_detected = False + for _ in range(50): + if algorithm.detect(5.0): + change_detected = True + break + + assert change_detected + + algorithm = construct_permutation_entropy_algorithm() + constant_signal = np.ones(100) + + change_detected = False + for point in constant_signal: + if algorithm.detect(point): + change_detected = True + break + + assert not change_detected diff --git a/tests/test_core/test_algorithms/test_entropies/test_shannon_entropy_algorithm.py b/tests/test_core/test_algorithms/test_entropies/test_shannon_entropy_algorithm.py new file mode 100644 index 0000000..5c7a6da --- /dev/null +++ b/tests/test_core/test_algorithms/test_entropies/test_shannon_entropy_algorithm.py @@ -0,0 +1,164 @@ +import numpy as np +import pytest + +from pysatl_cpd.core.algorithms.entropies.shannon_entropy import ( + ShannonEntropyAlgorithm, +) + + +def set_seed(): + np.random.seed(1) + + +def construct_shannon_entropy_algorithm(): + return ShannonEntropyAlgorithm(window_size=40, bins=10, threshold=0.3) + + +@pytest.fixture(scope="function") +def data_params(): + return { + "num_of_tests": 10, + "size": 500, + "change_point": 250, + "tolerable_deviation": 30, + } + + +@pytest.fixture +def generate_data(data_params): + def _generate_data(): + set_seed() + return np.concatenate( + [ + np.random.normal(loc=0, scale=1, size=data_params["change_point"]), + np.random.normal( + loc=5, + scale=2, + size=data_params["size"] - data_params["change_point"], + ), + ] + ) + + return _generate_data + + +@pytest.fixture(scope="function") +def outer_shannon_algorithm(): + return construct_shannon_entropy_algorithm() + + +@pytest.fixture +def inner_algorithm_factory(): + def _factory(): + return construct_shannon_entropy_algorithm() + + return _factory + + +def test_online_detection(outer_shannon_algorithm, generate_data, data_params): + for _ in range(data_params["num_of_tests"]): + data = generate_data() + change_detected = False + + for point in data: + if outer_shannon_algorithm.detect(point): + change_detected = True + break + + assert change_detected + + +def test_online_localization(outer_shannon_algorithm, generate_data, data_params): + for _ in range(data_params["num_of_tests"]): + data = generate_data() + change_points = [] + + algorithm = construct_shannon_entropy_algorithm() + + for point_idx, point in enumerate(data): + cp = algorithm.localize(point) + if cp is not None: + change_points.append(cp) + + assert len(change_points) > 0 + + closest_point = min(change_points, key=lambda x: abs(x - data_params["change_point"])) + assert ( + data_params["change_point"] - data_params["tolerable_deviation"] + <= closest_point + <= data_params["change_point"] + data_params["tolerable_deviation"] + ) + + +def test_entropy_calculation(): + the_absolute_deviation_between_the_calculated_entropy_and_the_expected_entropy_value = 0.01 + algorithm = construct_shannon_entropy_algorithm() + + uniform_probs = np.ones(8) / 8 + computed_entropy = algorithm._compute_entropy(uniform_probs) + expected_entropy = 3.0 + assert ( + abs(computed_entropy - expected_entropy) + < the_absolute_deviation_between_the_calculated_entropy_and_the_expected_entropy_value + ) + + certain_probs = np.zeros(8) + certain_probs[0] = 1.0 + computed_entropy = algorithm._compute_entropy(certain_probs) + expected_entropy = 0.0 + assert ( + abs(computed_entropy - expected_entropy) + < the_absolute_deviation_between_the_calculated_entropy_and_the_expected_entropy_value + ) + + +def test_change_point_detection(): + algorithm = construct_shannon_entropy_algorithm() + + data1 = np.zeros(60) + for i in range(len(data1)): + algorithm.detect(data1[i]) + + change_detected = False + data2 = np.ones(60) + for i in range(len(data2)): + if algorithm.detect(data2[i]): + change_detected = True + break + + assert change_detected + + +def test_edge_cases(): + algorithm = construct_shannon_entropy_algorithm() + + changes_detected = False + for i in range(20): + if algorithm.detect(float(i)): + changes_detected = True + + assert not changes_detected + + algorithm = construct_shannon_entropy_algorithm() + + for _ in range(50): + algorithm.detect(1.0) + + change_detected = False + for _ in range(50): + if algorithm.detect(10.0): + change_detected = True + break + + assert change_detected + + algorithm = construct_shannon_entropy_algorithm() + constant_signal = np.ones(100) + + change_detected = False + for point in constant_signal: + if algorithm.detect(point): + change_detected = True + break + + assert not change_detected diff --git a/tests/test_core/test_algorithms/test_graph_algorithm.py b/tests/test_core/test_algorithms/test_graph_algorithm.py index d87d582..2eac539 100644 --- a/tests/test_core/test_algorithms/test_graph_algorithm.py +++ b/tests/test_core/test_algorithms/test_graph_algorithm.py @@ -11,7 +11,13 @@ def custom_comparison(node1, node2): class TestGraphAlgorithm: @pytest.mark.parametrize( "alg_param,data,expected", - (((custom_comparison, 1.5), (50, 55, 60, 48, 52, 70, 75, 80, 90, 85, 95, 100, 50), [5]),), + ( + ( + (custom_comparison, 1.5), + (50, 55, 60, 48, 52, 70, 75, 80, 90, 85, 95, 100, 50), + [5], + ), + ), ) def test_localize(self, alg_param, data, expected): algorithm = GraphAlgorithm(*alg_param) @@ -19,7 +25,13 @@ def test_localize(self, alg_param, data, expected): @pytest.mark.parametrize( "alg_param,data,expected", - (((custom_comparison, 1.5), (50, 55, 60, 48, 52, 70, 75, 80, 90, 85, 95, 100, 50), 1),), + ( + ( + (custom_comparison, 1.5), + (50, 55, 60, 48, 52, 70, 75, 80, 90, 85, 95, 100, 50), + 1, + ), + ), ) def test_detect(self, alg_param, data, expected): algorithm = GraphAlgorithm(*alg_param) diff --git a/tests/test_core/test_online_cpd_core.py b/tests/test_core/test_online_cpd_core.py index 4be7952..ea54e50 100644 --- a/tests/test_core/test_online_cpd_core.py +++ b/tests/test_core/test_online_cpd_core.py @@ -3,7 +3,9 @@ from pysatl_cpd.core.online_cpd_core import OnlineCpdCore from pysatl_cpd.core.scrubber.data_providers import ListUnivariateProvider -from tests.test_core.test_algorithms.test_bayesian_online_algorithm import construct_bayesian_online_algorithm +from tests.test_core.test_algorithms.test_bayesian_online_algorithm import ( + construct_bayesian_online_algorithm, +) DATA_PARAMS = { "num_of_tests": 10, @@ -39,7 +41,8 @@ def dataset(request, data_params): @pytest.fixture def online_core(dataset): return OnlineCpdCore( - algorithm=construct_bayesian_online_algorithm(), data_provider=ListUnivariateProvider(list(dataset)) + algorithm=construct_bayesian_online_algorithm(), + data_provider=ListUnivariateProvider(list(dataset)), ) diff --git a/tests/test_core/test_scrubber/test_linear_scrubber.py b/tests/test_core/test_scrubber/test_linear_scrubber.py index 0302c96..d30cd71 100644 --- a/tests/test_core/test_scrubber/test_linear_scrubber.py +++ b/tests/test_core/test_scrubber/test_linear_scrubber.py @@ -15,11 +15,19 @@ def test_get_windows(self, data_length, window_length, shift_factor): cur_index = 0 for window in iter(scrubber): assert len(window.values) == len(window.indices) - assert np.array_equal(window.values, np.fromiter(data[cur_index : cur_index + window_length], np.float64)) + assert np.array_equal( + window.values, + np.fromiter(data[cur_index : cur_index + window_length], np.float64), + ) cur_index += max(1, int(window_length * shift_factor)) @settings(max_examples=1000) - @given(st.integers(0, 100), st.integers(1, 100), st.floats(0.01, 1), st.integers(0, 100)) + @given( + st.integers(0, 100), + st.integers(1, 100), + st.floats(0.01, 1), + st.integers(0, 100), + ) def test_restart(self, data_length, window_length, shift_factor, window_start): data = [i for i in range(data_length)] scrubber = LinearScrubber(ListUnivariateProvider(data), window_length, shift_factor) @@ -27,5 +35,8 @@ def test_restart(self, data_length, window_length, shift_factor, window_start): snd = list(scrubber) assert len(fst) == len(snd) assert all( - map(lambda w: w[0].indices == w[1].indices and np.array_equal(w[0].values, w[1].values), zip(fst, snd)) + map( + lambda w: w[0].indices == w[1].indices and np.array_equal(w[0].values, w[1].values), + zip(fst, snd), + ) ) diff --git a/tests/test_generator/test_distributions.py b/tests/test_generator/test_distributions.py index 798e290..02d888c 100644 --- a/tests/test_generator/test_distributions.py +++ b/tests/test_generator/test_distributions.py @@ -9,7 +9,11 @@ class TestDistributions: [ (dstr.Distributions.NORMAL, {"mean": "0"}, ValueError), (dstr.Distributions.NORMAL, {"mean": "0", "var": "1"}, KeyError), - (dstr.Distributions.NORMAL, {"mean": "0", "variance": "1", "x": "5"}, ValueError), + ( + dstr.Distributions.NORMAL, + {"mean": "0", "variance": "1", "x": "5"}, + ValueError, + ), (dstr.Distributions.NORMAL, {"mean": "0", "variance": "-1"}, ValueError), (dstr.Distributions.EXPONENTIAL, {}, ValueError), (dstr.Distributions.EXPONENTIAL, {"rt": "1"}, KeyError), @@ -17,21 +21,37 @@ class TestDistributions: (dstr.Distributions.EXPONENTIAL, {"rate": "-1"}, ValueError), (dstr.Distributions.WEIBULL, {"shape": "0"}, ValueError), (dstr.Distributions.WEIBULL, {"shape": "0", "var": "1"}, KeyError), - (dstr.Distributions.WEIBULL, {"shape": "1", "scale": "1", "x": "5"}, ValueError), + ( + dstr.Distributions.WEIBULL, + {"shape": "1", "scale": "1", "x": "5"}, + ValueError, + ), (dstr.Distributions.WEIBULL, {"shape": "-1", "scale": "1"}, ValueError), (dstr.Distributions.WEIBULL, {"shape": "1", "scale": "-1"}, ValueError), (dstr.Distributions.UNIFORM, {"min": "0"}, ValueError), (dstr.Distributions.UNIFORM, {"min": "-1", "MAX": "1"}, KeyError), - (dstr.Distributions.UNIFORM, {"min": "-1", "max": "1", "x": "5"}, ValueError), + ( + dstr.Distributions.UNIFORM, + {"min": "-1", "max": "1", "x": "5"}, + ValueError, + ), (dstr.Distributions.UNIFORM, {"min": "1", "max": "-1"}, ValueError), (dstr.Distributions.BETA, {"alpha": "1"}, ValueError), (dstr.Distributions.BETA, {"alpha": "1", "x": "1"}, KeyError), - (dstr.Distributions.BETA, {"alpha": "1", "beta": "1", "x": "5"}, ValueError), + ( + dstr.Distributions.BETA, + {"alpha": "1", "beta": "1", "x": "5"}, + ValueError, + ), (dstr.Distributions.BETA, {"alpha": "-1", "beta": "1"}, ValueError), (dstr.Distributions.BETA, {"alpha": "1", "beta": "-1"}, ValueError), (dstr.Distributions.GAMMA, {"alpha": "1"}, ValueError), (dstr.Distributions.GAMMA, {"alpha": "1", "x": "1"}, KeyError), - (dstr.Distributions.GAMMA, {"alpha": "1", "beta": "1", "x": "5"}, ValueError), + ( + dstr.Distributions.GAMMA, + {"alpha": "1", "beta": "1", "x": "5"}, + ValueError, + ), (dstr.Distributions.GAMMA, {"alpha": "-1", "beta": "1"}, ValueError), (dstr.Distributions.GAMMA, {"alpha": "1", "beta": "-1"}, ValueError), (dstr.Distributions.T, {}, ValueError), @@ -43,8 +63,16 @@ class TestDistributions: (dstr.Distributions.LOGNORM, {"s": "1", "x": "5"}, ValueError), (dstr.Distributions.LOGNORM, {"s": "-1"}, ValueError), (dstr.Distributions.MULTIVARIATIVE_NORMAL, {}, ValueError), - (dstr.Distributions.MULTIVARIATIVE_NORMAL, {"Mean": "[0.0, 0.0]"}, KeyError), - (dstr.Distributions.MULTIVARIATIVE_NORMAL, {"mean": "[0.0, 0.0]", "x": "5"}, ValueError), + ( + dstr.Distributions.MULTIVARIATIVE_NORMAL, + {"Mean": "[0.0, 0.0]"}, + KeyError, + ), + ( + dstr.Distributions.MULTIVARIATIVE_NORMAL, + {"mean": "[0.0, 0.0]", "x": "5"}, + ValueError, + ), (dstr.Distributions.MULTIVARIATIVE_NORMAL, {"mean": "[]"}, ValueError), ], ) diff --git a/tests/test_generator/test_generator.py b/tests/test_generator/test_generator.py index d70a6ae..8986d32 100644 --- a/tests/test_generator/test_generator.py +++ b/tests/test_generator/test_generator.py @@ -19,7 +19,10 @@ class TestGenerator: ScipyDatasetGenerator(), { "20-normal-0-1-20-normal-10-1": [40, [20]], - "20-multivariate_normal-0-0-20-multivariate_normal-10-10": [40, [20]], + "20-multivariate_normal-0-0-20-multivariate_normal-10-10": [ + 40, + [20], + ], "20-normal-0-1-no-change-point": [20, []], "20-exponential-1-no-change-point": [20, []], "20-weibull-1-1-no-change-point": [20, []], diff --git a/tests/test_online_solver.py b/tests/test_online_solver.py index 8b87c0e..e9f6344 100644 --- a/tests/test_online_solver.py +++ b/tests/test_online_solver.py @@ -6,7 +6,9 @@ from pysatl_cpd.icpd_solver import CpdLocalizationResults from pysatl_cpd.labeled_data import LabeledCpdData from pysatl_cpd.online_cpd_solver import OnlineCpdSolver -from tests.test_core.test_algorithms.test_bayesian_online_algorithm import construct_bayesian_online_algorithm +from tests.test_core.test_algorithms.test_bayesian_online_algorithm import ( + construct_bayesian_online_algorithm, +) DATA_PARAMS = { "num_tests": 10, @@ -41,7 +43,10 @@ def _generate(has_cp, test_iteration): @pytest.fixture def labeled_data_factory(data_params): def _factory(data, has_cp): - return LabeledCpdData(raw_data=data, change_points=[data_params["change_point"]] if has_cp else None) + return LabeledCpdData( + raw_data=data, + change_points=[data_params["change_point"]] if has_cp else None, + ) return _factory From c53d10d8403584ea2bcc7b5ab7f92c7b59620dfb Mon Sep 17 00:00:00 2001 From: Wenderlog Date: Thu, 10 Apr 2025 23:17:46 +0300 Subject: [PATCH 2/3] =?UTF-8?q?Fix:=20=D1=83=D1=82=D0=BE=D1=87=D0=BD=D0=B5?= =?UTF-8?q?=D0=BD=D0=B8=D0=B5=20=D1=82=D0=B8=D0=BF=D0=BE=D0=B2=20=D0=B4?= =?UTF-8?q?=D0=BB=D1=8F=20np.ndarray=20=D0=B8=20=D0=B8=D1=81=D0=BF=D1=80?= =?UTF-8?q?=D0=B0=D0=B2=D0=BB=D0=B5=D0=BD=D0=B8=D0=B5=20=D1=84=D0=BE=D1=80?= =?UTF-8?q?=D0=BC=D0=B0=D1=82=D0=B8=D1=80=D0=BE=D0=B2=D0=B0=D0=BD=D0=B8?= =?UTF-8?q?=D1=8F?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- pysatl_cpd/core/algorithms/entropies/bubble_entropy.py | 6 +++--- pysatl_cpd/core/algorithms/entropies/conditional_entropy.py | 6 ++++-- pysatl_cpd/core/algorithms/entropies/permutation_entropy.py | 4 ++-- pysatl_cpd/core/algorithms/entropies/shannon_entropy.py | 4 ++-- 4 files changed, 11 insertions(+), 9 deletions(-) diff --git a/pysatl_cpd/core/algorithms/entropies/bubble_entropy.py b/pysatl_cpd/core/algorithms/entropies/bubble_entropy.py index bcf735e..4fa8997 100644 --- a/pysatl_cpd/core/algorithms/entropies/bubble_entropy.py +++ b/pysatl_cpd/core/algorithms/entropies/bubble_entropy.py @@ -118,7 +118,7 @@ def _process_single_observation(self, observation: float) -> None: if entropy_diff > self._threshold: self._last_change_point = self._position - self._window_size // 2 - def _calculate_bubble_entropy(self, time_series: np.ndarray) -> float: + def _calculate_bubble_entropy(self, time_series: npt.NDArray[np.float64]) -> float: """ Calculates the bubble entropy of a time series by computing the difference in permutation entropy between two different embedding dimensions. @@ -132,9 +132,9 @@ def _calculate_bubble_entropy(self, time_series: np.ndarray) -> float: denom = np.log((self._embedding_dimension + 1) / self._embedding_dimension) bubble_entropy = (H_swaps_m_plus_1 - H_swaps_m) / denom - return bubble_entropy + return float(bubble_entropy) - def _calculate_permutation_entropy(self, time_series: np.ndarray, embedding_dimension: int) -> float: + def _calculate_permutation_entropy(self, time_series: npt.NDArray[np.float64], embedding_dimension: int) -> float: """ Calculates the permutation entropy of a time series based on the given embedding dimension. diff --git a/pysatl_cpd/core/algorithms/entropies/conditional_entropy.py b/pysatl_cpd/core/algorithms/entropies/conditional_entropy.py index 9d54687..b893b7b 100644 --- a/pysatl_cpd/core/algorithms/entropies/conditional_entropy.py +++ b/pysatl_cpd/core/algorithms/entropies/conditional_entropy.py @@ -229,7 +229,9 @@ def _compute_entropy(self, max_entropy_value: int) -> None: if entropy_diff > self._threshold: self._last_change_point = self._position - self._window_size // 2 - def _compute_conditional_entropy(self, time_series_x: np.ndarray, time_series_y: np.ndarray) -> float: + def _compute_conditional_entropy( + self, time_series_x: npt.NDArray[np.float64], time_series_y: npt.NDArray[np.float64] + ) -> float: """ Computes the conditional entropy of the time series using joint and conditional probability distributions. @@ -244,4 +246,4 @@ def _compute_conditional_entropy(self, time_series_x: np.ndarray, time_series_y: H_X_given_Y = -np.nansum( joint_probability_matrix * np.log2(conditional_probability, where=conditional_probability > 0) ) - return H_X_given_Y + return float(H_X_given_Y) diff --git a/pysatl_cpd/core/algorithms/entropies/permutation_entropy.py b/pysatl_cpd/core/algorithms/entropies/permutation_entropy.py index ed582af..5abad78 100644 --- a/pysatl_cpd/core/algorithms/entropies/permutation_entropy.py +++ b/pysatl_cpd/core/algorithms/entropies/permutation_entropy.py @@ -157,7 +157,7 @@ def _process_single_observation(self, observation: float) -> None: if entropy_diff > self._threshold: self._last_change_point = self._position - self._window_size // 2 - def _calculate_permutation_entropy(self, time_series: np.ndarray) -> float: + def _calculate_permutation_entropy(self, time_series: npt.NDArray[np.float64]) -> float: """ Calculates the permutation entropy of a time series using the order of values in the sliding windows. The entropy is computed based on the frequency of different permutations of the state vectors. @@ -184,4 +184,4 @@ def _calculate_permutation_entropy(self, time_series: np.ndarray) -> float: [probability * np.log2(probability) for probability in permutation_probabilities if probability > 0] ) - return permutation_entropy + return float(permutation_entropy) diff --git a/pysatl_cpd/core/algorithms/entropies/shannon_entropy.py b/pysatl_cpd/core/algorithms/entropies/shannon_entropy.py index 1c9c60b..67bcb60 100644 --- a/pysatl_cpd/core/algorithms/entropies/shannon_entropy.py +++ b/pysatl_cpd/core/algorithms/entropies/shannon_entropy.py @@ -144,7 +144,7 @@ def _process_single_observation(self, observation: float) -> None: if entropy_diff > self._threshold: self._last_change_point = self._position - self._window_size // 2 - def _compute_entropy(self, probabilities: np.ndarray) -> float: + def _compute_entropy(self, probabilities: npt.NDArray[np.float64]) -> float: """ Computes Shannon entropy based on a probability distribution. @@ -157,4 +157,4 @@ def _compute_entropy(self, probabilities: np.ndarray) -> float: probabilities = probabilities[probabilities > 0] if len(probabilities) == 0: return 0.0 - return -np.sum(probabilities * np.log2(probabilities)) + return float(-np.sum(probabilities * np.log2(probabilities))) From 7b68994d542e44462f1b5bddb32a78730b66bed6 Mon Sep 17 00:00:00 2001 From: Wenderlog Date: Thu, 10 Apr 2025 23:48:06 +0300 Subject: [PATCH 3/3] Fix: ensure correct float return type for entropy method to pass mypy; apply ruff formatting --- pysatl_cpd/core/algorithms/entropies/bubble_entropy.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pysatl_cpd/core/algorithms/entropies/bubble_entropy.py b/pysatl_cpd/core/algorithms/entropies/bubble_entropy.py index 4fa8997..24b5d01 100644 --- a/pysatl_cpd/core/algorithms/entropies/bubble_entropy.py +++ b/pysatl_cpd/core/algorithms/entropies/bubble_entropy.py @@ -152,11 +152,11 @@ def _calculate_permutation_entropy(self, time_series: npt.NDArray[np.float64], e total_permutations = len(permutation_vectors) if total_permutations == 0: - return 0.0 + return float(0) permutation_probabilities = [count / total_permutations for count in permutation_counts.values()] permutation_entropy = -np.sum( [probability * np.log2(probability) for probability in permutation_probabilities if probability > 0] ) - return permutation_entropy + return float(permutation_entropy)