Source code for causal_networkx.ci.parcorr

import math
import sys

import numpy as np
import scipy.stats
from sklearn.preprocessing import StandardScaler

from .base import BaseConditionalIndependenceTest


[docs]class PartialCorrelation(BaseConditionalIndependenceTest): def __init__( self, method="analytic", fixed_threshold=0.1, bootstrap_n_samples=1000, random_state=None, block_length=1, verbose=False, ): self.method = method self.fixed_threshold = fixed_threshold self.bootstrap_n_samples = bootstrap_n_samples self.block_length = block_length self.verbose = verbose if random_state is None: random_state = np.random.RandomState() else: random_state = np.random.RandomState(random_state) self.random_state = random_state
[docs] def test(self, df, x_var, y_var, z_covariates=None): """Perform CI test of X, Y given optionally Z. Parameters ---------- x : str | int The first variable in the column of ``df``. y : str | int The second variable in the column of ``df``. z : str | int The conditioning dataset, by default None. """ self._check_test_input(df, x_var, y_var, z_covariates) if z_covariates is None or len(z_covariates) == 0: Z = None else: z_covariates = list(z_covariates) Z = df[z_covariates].to_numpy().reshape((-1, len(z_covariates))) X = df[x_var].to_numpy()[:, np.newaxis] Y = df[y_var].to_numpy()[:, np.newaxis] # stack the X and Y arrays XY = np.hstack((X, Y)) # handle if conditioning set is passed in or not if Z is None: data_arr = XY else: data_arr = np.hstack((XY, Z)) # Ensure it is a valid array if np.isnan(data_arr).sum() != 0: raise ValueError("nans in the data array.") if self.method == "pingouin": import pandas as pd import pingouin as pg if Z is not None: z_dims = Z.shape[1] covar = [f"z{idx}" for idx in range(z_dims)] z_list = covar else: covar = None z_list = [] df = pd.DataFrame(data_arr) columns = ["x", "y"] columns.extend(z_list) df.columns = columns stats = pg.partial_corr(df, x="x", y="y", covar=covar) return stats["r"].values[0], stats["p-val"].values[0] # compute the dependence measure of the data vs indicator function val = self._compute_parcorr(data_arr, xvalue=0, yvalue=1) # get the dimension of the dataset n_samples, n_dims = data_arr.shape # compute the pvalue pvalue = self.compute_significance(val, data_arr, n_samples, n_dims) return val, pvalue
[docs] def compute_significance(self, val, array, n_samples, n_dims, sig_override=None): """Compute pvalue of the partial correlation using bootstrap sampling. Returns the p-value from whichever significance function is specified for this test. If an override is used, then it will call a different function then specified by self.significance Parameters ---------- val : float Test statistic value. array : array-like data array with X, Y, Z in rows and observations in columns n_samples : int Sample length n_dims : int Dimensionality, ie, number of features. sig_override : string Must be in 'analytic', 'shuffle_test', 'fixed_thres' Returns ------- pval : float or numpy.nan P-value. """ # Defaults to the self.significance member value use_sig = self.method if sig_override is not None: use_sig = sig_override # Check if we are using the analytic significance if use_sig == "analytic": pval = self._compute_analytic_significance( value=val, n_samples=n_samples, n_dims=n_dims ) # Check if we are using the shuffle significance elif use_sig == "shuffle_test": pval = self._compute_shuffle_significance(array=array, value=val) # Check if we are using the fixed_thres significance elif use_sig == "fixed_threshold": pval = self._compute_fixed_threshold_significance( value=val, fixed_threshold=self.fixed_threshold ) else: raise ValueError("%s not known." % self.method) # Return the calculated value return pval
def _compute_parcorr(self, array, xvalue, yvalue): """Compute the partial correlation.""" # compute residuals when regressing Z on X and Z on Y x_resid = self._compute_ols_residuals(array, target_var=xvalue) y_resid = self._compute_ols_residuals(array, target_var=yvalue) # then compute the correlation using Pearson method val, _ = scipy.stats.pearsonr(x_resid[:5], y_resid[:5]) return val def _compute_ols_residuals(self, array, target_var, standardize=True, return_means=False): """Compute residuals of linear multiple regression. Performs a OLS regression of the variable indexed by ``target_var`` on the conditions Z. Here array is assumed to contain X and Y as the first two rows with the remaining rows (if present) containing the conditions Z. Optionally returns the estimated regression line. Parameters ---------- array : np.ndarray of shape (n_samples, n_vars) Data array with X, Y, Z in rows and observations in columns. target_var : int The variable to regress out conditions from. This should be the value of the X or Y indicator (0, or 1) in this case indicating the row in ``array`` for those two datas. standardize : bool, optional Whether to standardize the array beforehand. Must be used for partial correlation. Default is True. return_means : bool, optional Whether to return the estimated regression line. Default is False. Returns ------- resid : np.ndarray of shape (n_samples) The residual of the regression and optionally the estimated line. mean : np.ndarray of shape """ _, dim = array.shape dim_z = dim - 2 # standardize with z-score transformation if standardize: scaler = StandardScaler() array = scaler.fit_transform(array) y = array[:, target_var] if dim_z > 0: # get the (n_samples, zdim) array z = array[:, 2:].copy() # compute the least squares regression of z @ \beta = y # - y is a (n_samples, ydim) array # - beta is a (zdim, ydim) array of values beta_hat = np.linalg.lstsq(z, y, rcond=None)[0] # compute the residuals of the model predictions vs actual values y_pred = np.dot(z, beta_hat) resid = y - y_pred else: resid = y mean = None if return_means: return (resid, mean) return resid def _compute_shuffle_significance(self, array, value, return_null_dist=False): """Returns p-value for shuffle significance test. For residual-based test statistics only the residuals are shuffled. Parameters ---------- array : np.ndarray of shape (n_samples, n_dims) Data array with X, Y, Z in columns and observations in rows. xyz : array of ints XYZ identifier array of shape (dim,). value : number Value of test statistic for unshuffled estimate. Returns ------- pval : float p-value """ x_vals = self._compute_ols_residuals(array, target_var=0) y_vals = self._compute_ols_residuals(array, target_var=1) array_resid = np.array([x_vals, y_vals]) xyz_resid = np.array([0, 1]) null_dist = self._get_shuffle_dist( array_resid, xyz_resid, sig_samples=self.bootstrap_n_samples, sig_blocklength=self.block_length, ) pval = (null_dist >= np.abs(value)).mean() # Adjust p-value for two-sided measures # if pval < 1.0: # pval *= 2.0 if return_null_dist: return pval, null_dist return pval def _get_shuffle_dist( self, array, xyz, sig_samples, sig_blocklength=None, ): """Returns shuffle distribution of test statistic. The rows in array corresponding to the X-variable are shuffled using a block-shuffle approach. Parameters ---------- array : np.ndarray of shape (n_samples, n_dims) Data array with X, Y, Z in columns and observations in rows. xyz : np.ndarray of shape (n_xyz_dims) XYZ identifier array of shape (dim,). dependence_measure : object Dependence measure function must be of form dependence_measure(array, xyz) and return a numeric value sig_samples : int, optional (default: 100) Number of samples for shuffle significance test. sig_blocklength : int, optional (default: None) Block length for block-shuffle significance test. If None, the block length is determined from the decay of the autocovariance as explained in [1]_. Returns ------- null_dist : array of shape (sig_samples,) Contains the sorted test statistic values estimated from the shuffled arrays. """ n_samples, n_dims = array.shape # x_indices = np.where(xyz == 0)[0] # dim_x = len(x_indices) # if sig_blocklength is None: # sig_blocklength = self._get_block_length(array, xyz, mode="significance") n_blks = int(math.floor(float(n_samples) / sig_blocklength)) if self.verbose: print("Significance test with block-length = %d " "..." % (sig_blocklength)) array_shuffled = np.copy(array) # new number of rows M = array_shuffled.shape[0] // n_blks # initialize the null distribution null_dist = np.zeros(sig_samples) for idx in range(sig_samples): # now create a shuffling of the array for the given number of blocks array_shuffled = array_shuffled.reshape(M, -1, n_dims)[ np.random.permutation(M) ].reshape(-1, n_dims) # do not shuffle non x-indices columns array_shuffled[:, 1:] = array[:, 1:].copy() # compute the test statistic null distribution null_dist[idx] = self._compute_parcorr(array=array_shuffled, xvalue=0, yvalue=1) return null_dist def _compute_analytic_significance(self, value, n_samples, n_dims): """Analytic p-value from Student's t-test for Pearson correlation coefficient. Assumes two-sided correlation. If the degrees of freedom are less than 1, numpy.nan is returned. Parameters ---------- value : float Test statistic value. n_samples : int Sample length n_dims : int Dimensionality, ie, number of features. Returns ------- pval : float | numpy.nan P-value. """ # Get the number of degrees of freedom deg_f = n_samples - n_dims if deg_f < 1: pval = np.nan elif abs(abs(value) - 1.0) <= sys.float_info.min: pval = 0.0 else: trafo_val = value * np.sqrt(deg_f / (1.0 - value * value)) # Two sided significance level pval = scipy.stats.t.sf(np.abs(trafo_val), deg_f) * 2 return pval def _compute_fixed_threshold_significance(self, value, fixed_threshold): """Returns signficance for thresholding test. Returns 0 if numpy.abs(value) is smaller than ``fixed_threshold`` and 1 else. Parameters ---------- value : float Value of test statistic for unshuffled estimate. fixed_threshold : float Fixed threshold, is made positive. Returns ------- pval : float Returns 0 if numpy.abs(value) is smaller than fixed_thres and 1 else. """ if np.abs(value) < np.abs(fixed_threshold): pval = 1.0 else: pval = 0.0 return pval