Source code for causal_networkx.ci.kernel_test

import numpy as np
from scipy import stats
from sklearn.metrics import pairwise_distances, pairwise_kernels
from sklearn.metrics.pairwise import PAIRWISE_KERNEL_FUNCTIONS

from .base import BaseConditionalIndependenceTest


[docs]class KernelCITest(BaseConditionalIndependenceTest): def __init__( self, kernel_x: str = "rbf", kernel_y: str = "rbf", kernel_z: str = "rbf", null_size: int = 1000, approx_with_gamma: bool = True, kwidth_x=None, kwidth_y=None, kwidth_z=None, threshold: float = 1e-5, n_jobs: int = None, ): """Kernel (Conditional) Independence Test. For testing (conditional) independence on continuous data, we leverage kernels :footcite:`Zhang2011` that are computationally efficient. Parameters ---------- kernel_x : str, optional The kernel function for data 'X', by default "rbf". kernel_y : str, optional The kernel function for data 'Y', by default "rbf". kernel_z : str, optional The kernel function for data 'Z', by default "rbf". null_size : int, optional The number of samples to generate for the bootstrap distribution to approximate the pvalue, by default 1000. approx_with_gamma : bool, optional Whether to use the Gamma distribution approximation for the pvalue, by default True. kwidth_x : _type_, optional _description_, by default None kwidth_y : _type_, optional _description_, by default None kwidth_z : _type_, optional _description_, by default None threshold : float, optional The threshold set on the value of eigenvalues, by default 1e-5. Used to regularize the method. n_jobs : int, optional The number of CPUs to use, by default None. Notes ----- Valid strings for ``compute_kernel`` are, as defined in :func:`sklearn.metrics.pairwise.pairwise_kernels`, [``"additive_chi2"``, ``"chi2"``, ``"linear"``, ``"poly"``, ``"polynomial"``, ``"rbf"``, ``"laplacian"``, ``"sigmoid"``, ``"cosine"``] References ---------- .. footbibliography:: """ if isinstance(kernel_x, str) and kernel_x not in PAIRWISE_KERNEL_FUNCTIONS: raise ValueError( f"The kernels that are currently supported are {PAIRWISE_KERNEL_FUNCTIONS}. " f"You passed in {kernel_x} for kernel_x." ) if isinstance(kernel_y, str) and kernel_y not in PAIRWISE_KERNEL_FUNCTIONS: raise ValueError( f"The kernels that are currently supported are {PAIRWISE_KERNEL_FUNCTIONS}. " f"You passed in {kernel_y} for kernel_y." ) if isinstance(kernel_z, str) and kernel_z not in PAIRWISE_KERNEL_FUNCTIONS: raise ValueError( f"The kernels that are currently supported are {PAIRWISE_KERNEL_FUNCTIONS}. " f"You passed in {kernel_z} for kernel_z." ) self.kernel_x = kernel_x self.kernel_y = kernel_y self.kernel_z = kernel_z self.null_size = null_size self.approx_with_gamma = approx_with_gamma self.threshold = threshold self.n_jobs = n_jobs # hyperparameters of the kernsl self.kwidth_x = kwidth_x self.kwidth_y = kwidth_y self.kwidth_z = kwidth_z
[docs] def test(self, df, x_var, y_var, z_covariates=None): self._check_test_input(df, x_var, y_var, z_covariates) if z_covariates is None or len(z_covariates) == 0: Z = None else: z_covariates = list(z_covariates) Z = df[z_covariates].to_numpy().reshape((-1, len(z_covariates))) X = df[x_var].to_numpy()[:, np.newaxis] Y = df[y_var].to_numpy()[:, np.newaxis] # first normalize the data to have zero mean and unit variance # along the columns of the data X = stats.zscore(X, axis=0) Y = stats.zscore(Y, axis=0) if Z is not None: Z = stats.zscore(Z, axis=0) # when running CI, \ddot{X} comprises of (X, Z) X = np.concatenate((X, Z), axis=1) Kz, sigma_z = self._compute_kernel( Z, distance_metric="l2", metric=self.kernel_z, kwidth=self.kwidth_z, centered=True, ) # compute the centralized kernel matrices of each the datasets Kx, sigma_x = self._compute_kernel( X, distance_metric="l2", metric=self.kernel_x, kwidth=self.kwidth_x, centered=True, ) Ky, sigma_y = self._compute_kernel( Y, distance_metric="l2", metric=self.kernel_y, kwidth=self.kwidth_y, centered=True ) if Z is None: # test statistic is just the normal bivariate independence # test statistic test_stat = self._compute_V_statistic(Kx, Ky) if self.approx_with_gamma: # approximate the pvalue using the Gamma distribution k_appr, theta_appr = self._approx_gamma_params_ind(Kx, Ky) pvalue = 1 - stats.gamma.cdf(test_stat, k_appr, 0, theta_appr) else: null_samples = self._compute_null_ind(Kx, Ky, n_samples=self.null_size) pvalue = np.sum(null_samples > test_stat) / float(self.null_size) else: # compute the centralizing matrix for the kernels according to # conditioning set Z epsilon = 1e-7 n = Kx.shape[0] Rz = epsilon * np.linalg.pinv(Kz + epsilon * np.eye(n)) # compute the centralized kernel matrices KxzR = Rz.dot(Kx).dot(Rz) KyzR = Rz.dot(Ky).dot(Rz) # compute the conditional independence test statistic test_stat = self._compute_V_statistic(KxzR, KyzR) # compute the product of the eigenvectors uu_prod = self._compute_prod_eigvecs(KxzR, KyzR, threshold=self.threshold) if self.approx_with_gamma: # approximate the pvalue using the Gamma distribution k_appr, theta_appr = self._approx_gamma_params_ci(uu_prod) pvalue = 1 - stats.gamma.cdf(test_stat, k_appr, 0, theta_appr) else: null_samples = self._compute_null_ci(uu_prod, self.null_size) pvalue = np.sum(null_samples > test_stat) / float(self.null_size) return test_stat, pvalue
def _approx_gamma_params_ind(self, Kx, Ky): T = Kx.shape[0] mean_appr = np.trace(Kx) * np.trace(Ky) / T var_appr = 2 * np.trace(Kx.dot(Kx)) * np.trace(Ky.dot(Ky)) / T / T k_appr = mean_appr**2 / var_appr theta_appr = var_appr / mean_appr return k_appr, theta_appr def _approx_gamma_params_ci(self, uu_prod): """Get parameters of the approximated Gamma distribution. Parameters ---------- uu_prod : np.ndarray of shape (n_features, n_features) The product of the eigenvectors of Kx and Ky, the kernels on the input data, X and Y. Returns ------- k_appr : float The shape parameter of the Gamma distribution. theta_appr : float The scale parameter of the Gamma distribution. Notes ----- X ~ Gamma(k, theta) with a probability density function of the following: .. math:: f(x; k, \\theta) = \\frac{x^{k-1} e^{-x / \\theta}}{\\theta^k \\Gamma(k)} where $\\Gamma(k)$ is the Gamma function evaluated at k. In this scenario k governs the shape of the pdf, while $\\theta$ governs more how spread out the data is. """ # approximate the mean and the variance mean_appr = np.trace(uu_prod) var_appr = 2 * np.trace(uu_prod.dot(uu_prod)) k_appr = mean_appr**2 / var_appr theta_appr = var_appr / mean_appr return k_appr, theta_appr def _compute_prod_eigvecs(self, Kx, Ky, threshold=None): T = Kx.shape[0] wx, vx = np.linalg.eigh(0.5 * (Kx + Kx.T)) wy, vy = np.linalg.eigh(0.5 * (Ky + Ky.T)) if threshold is not None: # threshold eigenvalues that are below a certain threshold # and remove their corresponding values and eigenvectors vx = vx[:, wx > np.max(wx) * threshold] wx = wx[wx > np.max(wx) * threshold] vy = vy[:, wy > np.max(wy) * threshold] wy = wy[wy > np.max(wy) * threshold] # scale the eigenvectors by their eigenvalues vx = vx.dot(np.diag(np.sqrt(wx))) vy = vy.dot(np.diag(np.sqrt(wy))) # compute the product of the scaled eigenvectors num_eigx = vx.shape[1] num_eigy = vy.shape[1] size_u = num_eigx * num_eigy uu = np.zeros((T, size_u)) for i in range(0, num_eigx): for j in range(0, num_eigy): # compute the dot product of eigenvectors uu[:, i * num_eigy + j] = vx[:, i] * vy[:, j] # now compute the product if size_u > T: uu_prod = uu.dot(uu.T) else: uu_prod = uu.T.dot(uu) return uu_prod def _compute_V_statistic(self, KxR, KyR): # n = KxR.shape[0] # compute the sum of the two kernsl Vstat = np.sum(KxR * KyR) return Vstat def _compute_null_ind(self, Kx, Ky, n_samples, max_num_eigs=1000): n = Kx.shape[0] # get the eigenvalues in ascending order, smallest to largest eigvals_x = np.linalg.eigvalsh(Kx) eigvals_y = np.linalg.eigvalsh(Ky) # optionally only keep the largest "N" eigenvalues eigvals_x = eigvals_x[-max_num_eigs:] eigvals_y = eigvals_y[-max_num_eigs:] num_eigs = len(eigvals_x) # compute the entry-wise product of the eigenvalues and store it as a vector eigvals_prod = np.dot( eigvals_x.reshape(num_eigs, 1), eigvals_y.reshape(1, num_eigs) ).reshape((-1, 1)) # only keep eigenvalues above a certain threshold eigvals_prod = eigvals_prod[eigvals_prod > eigvals_prod.max() * self.threshold] # generate chi-square distributed values z_{ij} with degree of freedom 1 f_rand = np.random.chisquare(df=1, size=(len(eigvals_prod), n_samples)) # compute the null distribution consisting now of (n_samples) # of chi-squared random variables weighted by the eigenvalue products null_dist = 1.0 / n * eigvals_prod.T.dot(f_rand) return null_dist def _compute_null_ci(self, uu_prod, n_samples): # the eigenvalues of ww^T eig_uu = np.linalg.eigvalsh(uu_prod) eig_uu = eig_uu[eig_uu > eig_uu.max() * self.threshold] # generate chi-square distributed values z_{ij} with degree of freedom 1 f_rand = np.random.chisquare(df=1, size=(eig_uu.shape[0], n_samples)) # compute the null distribution consisting now of (n_samples) # of chi-squared random variables weighted by the eigenvalue products null_dist = eig_uu.T.dot(f_rand) return null_dist def _compute_kernel( self, X, Y=None, distance_metric="l2", metric="rbf", kwidth=None, centered=True ): # if the width of the kernel is not set, then use the median trick to set the # kernel width based on the data X if kwidth is None: # Note: sigma = 1 / np.sqrt(kwidth) # compute N x N pairwise distance matrix dists = pairwise_distances(X, metric=distance_metric, n_jobs=self.n_jobs) # compute median of off diagonal elements med = np.median(dists[dists > 0]) # prevents division by zero when used on label vectors med = med if med else 1 else: med = kwidth extra_kwargs = dict() if metric == "rbf": # compute the normalization factor of the width of the Gaussian kernel gamma = 1.0 / (2 * (med**2)) extra_kwargs["gamma"] = gamma elif metric == "polynomial": degree = 2 extra_kwargs["degree"] = degree # compute the potentially pairwise kernel kernel = pairwise_kernels(X, Y=Y, metric=metric, n_jobs=self.n_jobs, **extra_kwargs) if centered: kernel = self._center_kernel(kernel) return kernel, med def _center_kernel(self, K): """Centers the kernel matrix. Applies a transformation H * K * H, where H is a diagonal matrix with 1/n along the diagonal. """ n = K.shape[0] H = np.eye(n) - 1.0 / n return H.dot(K).dot(H)