From eaa84b35b4fd44215302385f6c106e55529f83b8 Mon Sep 17 00:00:00 2001 From: Adam Li Date: Fri, 21 Apr 2023 19:37:50 -0400 Subject: [PATCH] Adding CMI test Signed-off-by: Adam Li --- doc/api.rst | 9 + pywhy_stats/__init__.py | 2 +- pywhy_stats/cmi.py | 436 +++++++++++++++++++++++++++++++++++++ pywhy_stats/monte_carlo.py | 104 +++++++++ 4 files changed, 550 insertions(+), 1 deletion(-) create mode 100644 pywhy_stats/cmi.py create mode 100644 pywhy_stats/monte_carlo.py diff --git a/doc/api.rst b/doc/api.rst index 66a2fba..7ee4157 100644 --- a/doc/api.rst +++ b/doc/api.rst @@ -63,3 +63,12 @@ of many data analysis procedures. fisherz +Monte Carlo Methods For Independence Testing +============================================ + +.. currentmodule:: pywhy_stats.monte_carlo +.. autosummary:: + :toctree: generated/ + + generate_knn_in_subspace + restricted_nbr_permutation diff --git a/pywhy_stats/__init__.py b/pywhy_stats/__init__.py index 97e6c4c..e4cfc25 100644 --- a/pywhy_stats/__init__.py +++ b/pywhy_stats/__init__.py @@ -1,3 +1,3 @@ -from . import fisherz +from . import fisherz, monte_carlo from ._version import __version__ # noqa: F401 from .independence import Methods, independence_test diff --git a/pywhy_stats/cmi.py b/pywhy_stats/cmi.py new file mode 100644 index 0000000..fc19a49 --- /dev/null +++ b/pywhy_stats/cmi.py @@ -0,0 +1,436 @@ +from typing import Optional, Set, Tuple + +import numpy as np +import pandas as pd +import scipy +import scipy.stats +import scipy.spatial +import scipy.special +from numpy.typing import ArrayLike +from sklearn.preprocessing import StandardScaler +import sklearn.utils + +from .monte_carlo import generate_knn_in_subspace, restricted_nbr_permutation +from .pvalue_result import PValueResult + +def ind( + X: ArrayLike, + Y: ArrayLike, + k: float = 0.2, + transform: str = "rank", + n_jobs: int = -1, + n_shuffle_nbrs: int = 5, + n_shuffle: int = 100, + random_seed: Optional[int] = None + ) -> PValueResult: + rng = np.random.default_rng(random_seed) + + +def condind( + X: ArrayLike, + Y: ArrayLike, + condition_on: ArrayLike, + k: float = 0.2, + transform: str = "rank", + n_jobs: int = -1, + n_shuffle_nbrs: int = 5, + n_shuffle: int = 100, + random_seed: Optional[int] = None +) -> PValueResult: + rng = np.random.default_rng(random_seed) + + +def _preprocess_data(data: ArrayLike, transform: str, rng: np.random.Generator, ) -> ArrayLike: + n_samples, n_dims = data.shape + + # make a copy of the data to prevent changing it + data = data.copy() + + # add minor noise to make sure there are no ties + random_noise = rng.standard_normal(size=(n_samples, n_dims)) + data += 1e-5 * random_noise @ data.std(axis=0).to_numpy().reshape(n_dims, 1) + + if transform == "standardize": + # standardize with standard scaling + data = data.astype(np.float64) + scaler = StandardScaler() + data = scaler.fit_transform(data) + elif transform == "uniform": + data = _trafo2uniform(data) + elif transform == "rank": + # rank transform each column + data = scipy.stats.rankddata(data, axis=0) + return data + +class CMIMixin: + random_state: np.random.Generator + random_seed: Optional[int] + n_jobs: Optional[int] + + def _estimate_null_dist( + self, + data: pd.DataFrame, + x_vars: Set[Column], + y_vars: Set[Column], + z_covariates: Set, + n_shuffle_nbrs: int, + n_shuffle: int, + ) -> float: + """Compute pvalue by performing a nearest-neighbor shuffle test. + + XXX: improve with parallelization with joblib + + Parameters + ---------- + data : pd.DataFrame + The dataset. + x_vars : Set[Column] + The X variable column(s). + y_var : Set[Column] + The Y variable column(s). + z_covariates : Set[Column] + The Z variable column(s). + n_shuffle_nbrs : int + The number of nearest-neighbors in feature space to shuffle. + n_shuffle : int + The number of times to generate the shuffled distribution. + + Returns + ------- + pvalue : float + The pvalue. + """ + n_samples, _ = data.shape + + x_cols = list(x_vars) + if len(z_covariates) > 0 and n_shuffle_nbrs < n_samples: + null_dist = np.zeros(n_shuffle) + + # create a copy of the data to store the shuffled array + data_copy = data.copy() + + # Get nearest neighbors around each sample point in Z subspace + z_array = data[list(z_covariates)].to_numpy() + nbrs = generate_knn_in_subspace( + z_array, method="kdtree", k=n_shuffle_nbrs, n_jobs=self.n_jobs + ) + + # we will now compute a shuffled distribution, where X_i is replaced + # by X_j only if the corresponding Z_i and Z_j are "nearby", computed + # using the spatial tree query + for idx in range(n_shuffle): + # Shuffle neighbor indices for each sample index + for i in range(len(nbrs)): + self.random_state.shuffle(nbrs[i]) + + # Select a series of neighbor indices that contains as few as + # possible duplicates + restricted_permutation = restricted_nbr_permutation( + nbrs, random_seed=self.random_seed + ) + + # update the X variable column + data_copy.loc[:, x_cols] = data.loc[restricted_permutation, x_cols].to_numpy() + + # compute the CMI on the shuffled array + null_dist[idx] = self._compute_cmi(data_copy, x_vars, y_vars, z_covariates) + else: + null_dist = self._compute_shuffle_dist( + data, x_vars, y_vars, z_covariates, n_shuffle=n_shuffle + ) + + return null_dist + + def _compute_shuffle_dist( + self, + data: pd.DataFrame, + x_vars: Set[Column], + y_vars: Set[Column], + z_covariates: Set, + n_shuffle: int, + ) -> ArrayLike: + """Compute a shuffled distribution of the test statistic.""" + data_copy = data.copy() + x_cols = list(x_vars) + # initialize the shuffle distribution + shuffle_dist = np.zeros((n_shuffle,)) + for idx in range(n_shuffle): + # compute a shuffled version of the data + x_data = data[x_cols] + shuffled_x = sklearn.utils.shuffle(x_data, random_state=self.random_seed) + + # compute now the test statistic on the shuffle data + data_copy[x_cols] = shuffled_x.values + shuffle_dist[idx] = self._compute_cmi(data_copy, x_vars, y_vars, z_covariates) + + return shuffle_dist + + def _compute_cmi( + self, df: pd.DataFrame, x_vars: Set[Column], y_vars: Set[Column], z_covariates: Set[Column] + ): + raise NotImplementedError("All CMI methods must implement a _compute_cmi function.") + + +class CMITest(CMIMixin): + r"""Conditional mutual information independence test. + + Implements the conditional independence test using conditional + mutual information proposed in :footcite:`Runge2018cmi`. + + Parameters + ---------- + k : float, optional + Number of nearest-neighbors for each sample point. If the number is + smaller than 1, it is computed as a fraction of the number of + samples, by default 0.2. + transform : str, optional + Transform the data by standardizing the data, by default 'rank', which converts + data to ranks. Can be 'rank', 'uniform', 'standardize'. + n_jobs : int, optional + The number of CPUs to use, by default -1, which corresponds to + using all CPUs available. + n_shuffle_nbrs : int, optional + Number of nearest-neighbors within the Z covariates for shuffling, by default 5. + n_shuffle : int + The number of times to shuffle the dataset to generate the null distribution. + By default, 1000. + random_seed : int, optional + The random seed that is used to seed via ``np.random.defaultrng``. + + Notes + ----- + Conditional mutual information (CMI) is defined as: + + .. math:: + + I(X;Y|Z) = \iiint p(z) p(x,y|z) + \log \frac{ p(x,y|z)}{p(x|z)\cdot p(y |z)} \,dx dy dz + + It can be seen that when :math:`X \perp Y | Z`, then CMI is equal to 0. + Hence, CMI is a general measure for conditional dependence. The + estimator for CMI proposed in :footcite:`Runge2018cmi` is a + k-nearest-neighbor based estimator: + + .. math:: + + \widehat{I}(X;Y|Z) = \psi (k) + \frac{1}{T} \sum_{t=1}^T + (\psi(k_{Z,t}) - \psi(k_{XZ,t}) - \psi(k_{YZ,t})) + + where :math:`\psi` is the Digamma (i.e. see `scipy.special.digamma`) + function. :math:`k` determines the + size of hyper-cubes around each (high-dimensional) sample point. Then + :math:`k_{Z,},k_{XZ},k_{YZ}` are the numbers of neighbors in the respective + subspaces. :math:`k` can be viewed as a density smoothing parameter (although + it is data-adaptive unlike fixed-bandwidth estimators). For large :math:`k`, the + underlying dependencies are more smoothed and CMI has a larger bias, + but lower variance, which is more important for significance testing. Note + that the estimated CMI values can be slightly negative while CMI is a non- + negative quantity. + + The estimator implemented here assumes the data is continuous. + + References + ---------- + .. footbibliography:: + """ + + random_state: np.random.Generator + + def __init__( + self, + k: float = 0.2, + transform: str = "rank", + n_jobs: int = -1, + n_shuffle_nbrs: int = 5, + n_shuffle: int = 100, + random_seed: Optional[int] = None, + ) -> None: + self.k = k + self.n_shuffle_nbrs = n_shuffle_nbrs + self.transform = transform + self.n_jobs = n_jobs + self.n_shuffle = n_shuffle + self.random_seed = random_seed + self.random_state = np.random.default_rng(self.random_seed) + + def test( + self, + df: pd.DataFrame, + x_vars: Set[Column], + y_vars: Set[Column], + z_covariates: Optional[Set] = None, + ) -> Tuple[float, float]: + if z_covariates is None: + z_covariates = set() + + self._check_test_input(df, x_vars, y_vars, z_covariates) + + # preprocess and transform the data; called here only once + df = self._preprocess_data(df) + + # compute the estimate of the CMI + val = self._compute_cmi(df, x_vars, y_vars, z_covariates) + + # compute the significance of the CMI value + null_dist = self._estimate_null_dist( + df, + x_vars, + y_vars, + z_covariates, + n_shuffle_nbrs=self.n_shuffle_nbrs, + n_shuffle=self.n_shuffle, + ) + + # compute pvalue + pvalue = (null_dist >= val).mean() + + self.stat_ = val + self.pvalue_ = pvalue + self.null_dist_ = null_dist + return val, pvalue + + def _compute_cmi(self, df, x_vars, y_vars, z_covariates: Set): + n_samples, _ = df.shape + + if self.k < 1: + knn_here = max(1, int(self.k * n_samples)) + else: + knn_here = max(1, int(self.k)) + + # compute the K nearest neighbors in sub-spaces + k_xz, k_yz, k_z = self._get_knn(df, x_vars, y_vars, z_covariates) + + # compute the final CMI value + hxyz = scipy.special.digamma(knn_here) + hxz = scipy.special.digamma(k_xz) + hyz = scipy.special.digamma(k_yz) + hz = scipy.special.digamma(k_z) + val = hxyz - (hxz + hyz - hz).mean() + return val + + def _preprocess_data(self, data: pd.DataFrame) -> pd.DataFrame: + n_samples, n_dims = data.shape + + # make a copy of the data to prevent changing it + data = data.copy() + + # add minor noise to make sure there are no ties + random_noise = self.random_state.random((n_samples, n_dims)) + data += 1e-5 * random_noise @ data.std(axis=0).to_numpy().reshape(n_dims, 1) + + if self.transform == "standardize": + # standardize with standard scaling + data = data.astype(np.float64) + scaler = StandardScaler() + data[data.columns] = scaler.fit_transform(data[data.columns]) + elif self.transform == "uniform": + data = self._trafo2uniform(data) + elif self.transform == "rank": + # rank transform each column + data = data.rank(axis=0) + return data + + def _get_knn( + self, data: pd.DataFrame, x_vars: Set[Column], y_vars: Set[Column], z_covariates: Set + ) -> Tuple[ArrayLike, ArrayLike, ArrayLike]: + """Compute the nearest neighbor in the variable subspaces. + + Parameters + ---------- + data : pd.DataFrame + The dataset. + x_var : Set[Column] + The X variable column(s). + y_var : Set[Column] + The Y variable column(s). + z_covariates : Set[Column] + The Z variable column(s). + + Returns + ------- + k_xz : np.ArrayLike of shape (n_samples,) + Nearest neighbors in subspace of ``x_var`` and the + ``z_covariates``. + k_yz : np.ArrayLike of shape (n_samples,) + Nearest neighbors in subspace of ``y_var`` and the + ``z_covariates``. + k_z : np.ArrayLike of shape (n_samples,) + Nearest neighbors in subspace of ``z_covariates``. + """ + n_samples, _ = data.shape + if self.k < 1: + knn = max(1, int(self.k * n_samples)) + else: + knn = max(1, int(self.k)) + xz_cols = list(x_vars) + for z_var in z_covariates: + xz_cols.append(z_var) + yz_cols = list(y_vars) + for z_var in z_covariates: + yz_cols.append(z_var) + z_columns = list(z_covariates) + columns = list(set(xz_cols).union(set(yz_cols))) + data = data[columns] + + tree_xyz = scipy.spatial.cKDTree(data.to_numpy()) + epsarray = tree_xyz.query( + data.to_numpy(), k=[knn + 1], p=np.inf, eps=0.0, workers=self.n_jobs + )[0][:, 0].astype(np.float64) + + # To search neighbors < eps + epsarray = np.multiply(epsarray, 0.99999) + + # Find nearest neighbors in subspaces of X and Z + xz = data[xz_cols] + tree_xz = scipy.spatial.cKDTree(xz) + k_xz = tree_xz.query_ball_point( + xz, r=epsarray, eps=0.0, p=np.inf, workers=self.n_jobs, return_length=True + ) + + # Find nearest neighbors in subspaces of Y and Z + yz = data[yz_cols] + tree_yz = scipy.spatial.cKDTree(yz) + k_yz = tree_yz.query_ball_point( + yz, r=epsarray, eps=0.0, p=np.inf, workers=self.n_jobs, return_length=True + ) + + # Find nearest neighbors in subspaces of just the Z covariates + if len(z_columns) > 0: + z = data[z_columns] + tree_z = scipy.spatial.cKDTree(z) + k_z = tree_z.query_ball_point( + z, r=epsarray, eps=0.0, p=np.inf, workers=self.n_jobs, return_length=True + ) + else: + # Number of neighbors is n_samples when estimating just standard MI + k_z = np.full(n_samples, n_samples, dtype=np.float64) + + return k_xz, k_yz, k_z + +def _trafo2uniform(X: ArrayLike): + """Transforms input array to uniform marginals. + + Assumes x.shape = (dim, T) + + Parameters + ---------- + X : ArrayLike of shape (n_samples, n_features) + The input data with (n_samples,) rows and (n_features,) columns. + + Returns + ------- + u : array-like + array with uniform marginals. + """ + + def trafo(xi): + xisorted = np.sort(xi) + yi = np.linspace(1.0 / len(xi), 1, len(xi)) + return np.interp(xi, xisorted, yi) + + n_columns = X.shape[1] + # apply a uniform transformation for each feature + for idx in range(len(n_columns)): + marginalized_feature = trafo(X[:, idx].squeeze()) + X[:, idx] = marginalized_feature + return X diff --git a/pywhy_stats/monte_carlo.py b/pywhy_stats/monte_carlo.py new file mode 100644 index 0000000..71349df --- /dev/null +++ b/pywhy_stats/monte_carlo.py @@ -0,0 +1,104 @@ +from typing import Optional + +import numpy as np +import scipy.spatial +from numpy.typing import ArrayLike +from sklearn.neighbors import NearestNeighbors + + +def generate_knn_in_subspace( + z_arr: ArrayLike, method: str = "knn", k: int = 1, n_jobs: Optional[int] = None +) -> ArrayLike: + """Generate kNN in subspace. + + Parameters + ---------- + z_arr : ArrayLike of shape (n_samples, n_features_z) + The covariate space. + method : str, optional + Method to use, by default 'knn'. Can be ('knn', 'kdtree'). + k : int, optional + The number of k-nearest neighbors to query, by default 1. + n_jobs : int, + The number of CPUs to use for joblib. By default, None. + + Returns + ------- + indices : ArrayLike of shape (n_samples, k) + The indices of the k-nearest-neighbors for each sample. + """ + # use a method to generate k-nearest-neighbors in subspace of Z + if method == "knn": + # compute the nearest neighbors in the space of "Z training" using ball-tree alg. + nbrs = NearestNeighbors( + n_neighbors=k + 1, algorithm="ball_tree", metric="l2", n_jobs=n_jobs + ).fit(z_arr) + # then get the K nearest nbrs in the Z space + _, indices = nbrs.kneighbors(z_arr) + elif method == "kdtree": + tree_xyz = scipy.spatial.cKDTree(z_arr) + indices = tree_xyz.query(z_arr, k=k, p=np.inf, eps=0.0, workers=n_jobs)[1].astype(np.int32) + + return indices + + +def restricted_nbr_permutation(nbrs: ArrayLike, random_seed=None) -> ArrayLike: + """Compute a permutation of neighbors with restrictions. + + Parameters + ---------- + nbrs : ArrayLike of shape (n_samples, k) + The k-nearest-neighbors for each sample index. + random_seed : int, optional + Random seed, by default None. + + Returns + ------- + restricted_perm : ArrayLike of shape (n_samples) + The final permutation order of the sample indices. There may be + repeating samples. See Notes for details. + + Notes + ----- + Restricted permutation goes through random samples and looks at the k-nearest + neighbors (columns of ``nbrs``) and shuffles the closest neighbor index only + if it has not been used to permute another sample. If it has been, then the + algorithm looks at the next nearest-neighbor and so on. If all k-nearest + neighbors of a sample has been checked, then a random neighbor is chosen. In this + manner, the algorithm tries to perform permutation without replacement, but + if necessary, will choose a repeating neighbor sample. + """ + n_samples, k_dims = nbrs.shape + rng = np.random.default_rng(seed=random_seed) + + # initialize the final permutation order + restricted_perm = np.zeros((n_samples,)) + + # generate a random order of samples to go through + random_order = rng.permutation(n_samples) + + # keep track of values we have already used + used = set() + + # go through the random order + for idx in random_order: + m = 0 + use_idx = nbrs[idx, m] + + # if the current nbr is already used, continue incrementing + # until we have either found a new sample to use, or if + # we have reach the maximum number of shuffles to consider + while (use_idx in used) and (m < k_dims - 1): + m += 1 + use_idx = nbrs[idx, m] + + # check whether or not we have exhaustively checked all kNN + if use_idx in used and m == k_dims: + # XXX: Note this step is not in the original paper + # choose a random neighbor to permute + restricted_perm[idx] = rng.choice(nbrs[idx, :], size=1) + else: + # permute with the existing neighbor + restricted_perm[idx] = use_idx + used.add(use_idx) + return restricted_perm