Source code for causal_networkx.discovery.pcalg

import logging
from collections import defaultdict
from copy import copy
from itertools import combinations, permutations
from typing import Any, Callable, Dict, Set, Tuple, Union

import networkx as nx
import numpy as np
import pandas as pd

from causal_networkx import CPDAG, DAG
from causal_networkx.ci.base import BaseConditionalIndependenceTest

from .classes import ConstraintDiscovery

logger = logging.getLogger()


[docs]class PC(ConstraintDiscovery): def __init__( self, ci_estimator: BaseConditionalIndependenceTest, alpha: float = 0.05, init_graph: Union[nx.Graph, CPDAG] = None, fixed_edges: nx.Graph = None, min_cond_set_size: int = None, max_cond_set_size: int = None, max_iter: int = 1000, max_combinations: int = None, apply_orientations: bool = True, **ci_estimator_kwargs, ): """Peter and Clarke (PC) algorithm for causal discovery. Assumes causal sufficiency, that is, all confounders in the causal graph are observed variables. See :footcite:`Spirtes1993` for full details on the algorithm. Parameters ---------- ci_estimator : Callable The conditional independence test function. The arguments of the estimator should be data, node, node to compare, conditioning set of nodes, and any additional keyword arguments. alpha : float, optional The significance level for the conditional independence test, by default 0.05. init_graph : nx.Graph | ADMG, optional An initialized graph. If ``None``, then will initialize PC using a complete graph. By default None. fixed_edges : nx.Graph, optional An undirected graph with fixed edges. If ``None``, then will initialize PC using a complete graph. By default None. min_cond_set_size : int, optional Minimum size of the conditioning set, by default None, which will be set to '0'. Used to constrain the computation spent on the algorithm. max_cond_set_size : int, optional Maximum size of the conditioning set, by default None. Used to limit the computation spent on the algorithm. max_iter : int The maximum number of iterations through the graph to apply orientation rules. max_combinations : int, optional Maximum number of tries with a conditioning set chosen from the set of possible parents still, by default None. If None, then will not be used. If set, then the conditioning set will be chosen lexographically based on the sorted test statistic values of 'ith Pa(X) -> X', for each possible parent node of 'X'. apply_orientations : bool Whether or not to apply orientation rules given the learned skeleton graph and separating set per pair of variables. If ``True`` (default), will apply Meek's orientation rules R0-3, orienting colliders and certain arrowheads :footcite:`Meek1995`. ci_estimator_kwargs : dict Keyword arguments for the ``ci_estimator`` function. Attributes ---------- graph_ : CPDAG The graph discovered. separating_sets_ : dict The dictionary of separating sets, where it is a nested dictionary from the variable name to the variable it is being compared to the set of variables in the graph that separate the two. References ---------- .. footbibliography:: """ super().__init__( ci_estimator, alpha, init_graph, fixed_edges, min_cond_set_size=min_cond_set_size, max_cond_set_size=max_cond_set_size, max_combinations=max_combinations, **ci_estimator_kwargs, ) self.max_iter = max_iter self.apply_orientations = apply_orientations
[docs] def convert_skeleton_graph(self, graph: nx.Graph) -> CPDAG: """Convert skeleton graph as undirected networkx Graph to CPDAG. Parameters ---------- graph : nx.Graph Converts a skeleton graph to the representation needed for PC algorithm, a CPDAG. Returns ------- graph : CPDAG The CPDAG class. """ # convert Graph object to a CPDAG object with # all undirected edges graph = CPDAG(incoming_uncertain_data=graph) return graph
[docs] def orient_edges(self, skel_graph: CPDAG, sep_set: Dict[str, Dict[str, Set]]) -> CPDAG: """Orient edges in a skeleton graph to estimate the causal DAG, or CPDAG. Uses the separation sets to orient edges via conditional independence testing. These are known as the Meek rules :footcite:`Meek1995`. Parameters ---------- skel_graph : causal_networkx.CPDAG A skeleton graph. If ``None``, then will initialize PC using a complete graph. By default None. sep_set : Dict[Dict[Set]] The separating set between any two nodes. """ node_ids = skel_graph.nodes() # for all pairs of non-adjacent variables with a common neighbor # check if we can orient the edge as a collider self._orient_colliders(skel_graph, sep_set) # For all the combination of nodes i and j, apply the following # rules. idx = 0 finished = False while idx < self.max_iter and not finished: # type: ignore change_flag = False for (i, j) in permutations(node_ids, 2): if i == j: continue # Rule 1: Orient i-j into i->j whenever there is an arrow k->i # such that k and j are nonadjacent. r1_add = self._apply_meek_rule1(skel_graph, i, j) # Rule 2: Orient i-j into i->j whenever there is a chain # i->k->j. r2_add = self._apply_meek_rule2(skel_graph, i, j) # Rule 3: Orient i-j into i->j whenever there are two chains # i-k->j and i-l->j such that k and l are nonadjacent. r3_add = self._apply_meek_rule3(skel_graph, i, j) # Rule 4: Orient i-j into i->j whenever there are two chains # i-k->l and k->l->j such that k and j are nonadjacent. # # However, this rule is not necessary when the PC-algorithm # is used to estimate a DAG. if any([r1_add, r2_add, r3_add]) and not change_flag: change_flag = True if not change_flag: finished = True logger.info(f"Finished applying R1-3, with {idx} iterations") break idx += 1 return skel_graph
def _orient_colliders(self, graph: CPDAG, sep_set: Dict[str, Dict[str, Set]]): """Orient colliders given a graph and separation set. Parameters ---------- graph : CPDAG The CPDAG. sep_set : Dict[Dict[Set]] The separating set between any two nodes. """ # for every node in the PAG, evaluate neighbors that have any edge for u in graph.nodes: for v_i, v_j in combinations(graph.adjacencies(u), 2): # Check that there is no edge of any type between # v_i and v_j, else this is a "shielded" collider. # Then check to see if 'u' is in the separating # set. If it is not, then there is a collider. if not graph.has_adjacency(v_i, v_j) and u not in sep_set[v_i][v_j]: logger.info( f"orienting collider: {v_i} -> {u} and {v_j} -> {u} to make {v_i} -> {u} <- {v_j}." ) if graph.has_undirected_edge(v_i, u): graph.orient_undirected_edge(v_i, u) if graph.has_undirected_edge(v_j, u): graph.orient_undirected_edge(v_j, u) def _apply_meek_rule1(self, graph: CPDAG, i, j): """Apply rule 1 of Meek's rules. Looks for i - j such that k -> i, such that (k,i,j) is an unshielded triple. Then can orient i - j as i -> j. """ added_arrows = False # Check if i-j. if graph.has_undirected_edge(i, j): for k in graph.predecessors(i): # Skip if k and j are adjacent because then it is a # shielded triple if graph.has_adjacency(k, j): continue # Make i-j into i->j logger.info(f"R1: Removing edge ({i}, {j}) and orienting as {k} -> {i} -> {j}.") graph.orient_undirected_edge(i, j) added_arrows = True break return added_arrows def _apply_meek_rule2(self, graph: CPDAG, i, j): """Apply rule 2 of Meek's rules. Check for i - j, and then looks for i -> k -> j triple, to orient i - j as i -> j. """ added_arrows = False # Check if i-j. if graph.has_undirected_edge(i, j): # Find nodes k where k is i->k succs_i = set() for k in graph.successors(i): if not graph.has_edge(k, i): succs_i.add(k) # Find nodes j where j is k->j. preds_j = set() for k in graph.predecessors(j): if not graph.has_edge(j, k): preds_j.add(k) # Check if there is any node k where i->k->j. if len(succs_i.intersection(preds_j)) > 0: # Make i-j into i->j logger.info(f"R2: Removing edge {i}-{j} to form {i}->{j}.") graph.orient_undirected_edge(i, j) added_arrows = True return added_arrows def _apply_meek_rule3(self, graph: CPDAG, i, j): """Apply rule 3 of Meek's rules. Check for i - j, and then looks for k -> j <- l collider, and i - k and i - l, then orient i -> j. """ added_arrows = False # Check if i-j first if graph.has_undirected_edge(i, j): # For all the pairs of nodes adjacent to i, # look for (k, l), such that j -> l and k -> l for (k, l) in combinations(graph.adjacencies(i), 2): # Skip if k and l are adjacent. if graph.has_adjacency(k, l): continue # Skip if not k->j. if graph.has_edge(j, k) or (not graph.has_edge(k, j)): continue # Skip if not l->j. if graph.has_edge(j, l) or (not graph.has_edge(l, j)): continue # if i - k and i - l, then at this point, we have a valid path # to orient if graph.has_undirected_edge(k, i) and graph.has_undirected_edge(l, i): logger.info(f"R3: Removing edge {i}-{j} to form {i}->{j}") graph.orient_undirected_edge(i, j) added_arrows = True break return added_arrows
[docs]class RobustPC(PC): def __init__( self, ci_estimator: BaseConditionalIndependenceTest, alpha: float = 0.05, init_graph: Union[nx.Graph, DAG, CPDAG] = None, fixed_edges: nx.Graph = None, min_cond_set_size: int = None, max_cond_set_size: int = None, max_iter: int = 1000, max_combinations: int = None, apply_orientations: bool = True, mci_alpha: float = 0.05, max_conds_x: int = None, max_conds_y: int = None, size_inclusive: bool = False, mci_ci_estimator: Callable = None, partial_knowledge: object = None, only_mci: bool = False, use_children: bool = False, use_parents: bool = True, # TODO: remove cuz temporary skip_first_stage: bool = False, **ci_estimator_kwargs, ): super().__init__( ci_estimator, alpha, init_graph, fixed_edges, min_cond_set_size, max_cond_set_size, max_iter, max_combinations, apply_orientations, **ci_estimator_kwargs, ) self.max_conds_x = max_conds_x self.max_conds_y = max_conds_y self.size_inclusive = size_inclusive self.mci_alpha = mci_alpha self.partial_knowledge = partial_knowledge self.only_mci = only_mci if mci_ci_estimator is None: self.mci_ci_estimator = ci_estimator self.use_children = use_children self.skip_first_stage = skip_first_stage self.use_parents = use_parents def _learn_first_phase(self, X, graph, sep_set, fixed_edges): # learn skeleton using original PC algorithm graph, sep_set, test_stat_dict, pvalue_dict = super().learn_skeleton( X, graph, sep_set, fixed_edges ) # convert graph to a CPDAG graph = self.convert_skeleton_graph(graph) # orient the edges of the skeleton graph to build up a set of # "definite" parents graph = self.orient_edges(graph, sep_set) return graph, sep_set, test_stat_dict, pvalue_dict
[docs] def learn_skeleton( self, X: pd.DataFrame, graph: nx.Graph = None, sep_set: Dict[str, Dict[str, Set[Any]]] = None, fixed_edges: Set = None, ) -> Tuple[ nx.Graph, Dict[str, Dict[str, Set[Any]]], Dict[Any, Dict[Any, float]], Dict[Any, Dict[Any, float]], ]: from causal_networkx.discovery import learn_skeleton_graph_with_order if graph is None: nodes = X.columns graph = nx.complete_graph(nodes, create_using=nx.Graph) if sep_set is None: # keep track of separating sets sep_set = defaultdict(lambda: defaultdict(set)) orig_graph = graph.copy() orig_sep_set = sep_set.copy() test_stat_dict = dict() pvalue_dict = dict() if not self.skip_first_stage: graph, sep_set, test_stat_dict, pvalue_dict = self._learn_first_phase( X, graph, sep_set, fixed_edges ) # store the estimated "definite" parents/children for each node def_parent_dict = dict() def_children_dict = dict() # now obtain the definite parents for every node in the set either using # partial knowledge oracle, or using the existing graph oriented after initial PC if hasattr(self.partial_knowledge, "get_parents"): test_stat_dict = dict() for node in graph.nodes: parents = self.partial_knowledge.get_parents(node) # type: ignore children = self.partial_knowledge.get_children(node) # type: ignore def_parent_dict[node] = parents def_children_dict[node] = children test_stat_dict[node] = {_node: np.inf for _node in parents} if self.use_children: test_stat_dict[node] = {_node: np.inf for _node in children} else: # use the estimated parents/children def_parent_dict = self._estimate_definite_parents(graph) def_children_dict = self._estimate_definite_children(graph) # use definite parents to filter the dependencies in the test statistics / pvalue # removing them from the possible adjacency list for node, parents in def_parent_dict.items(): # create a copy of the possible parents, since we will be removing # certain keys in the nested dictionary possible_parents = copy(list(test_stat_dict[node].keys())) children = def_children_dict[node] for adj_node in possible_parents: # now remove any adjacent nodes from consideration if they # are not part of parent set check_condition = True if self.use_parents: check_condition = adj_node not in parents # optionally, also include children if self.use_children: check_condition = check_condition and (adj_node not in children) if check_condition: test_stat_dict[node].pop(adj_node) # pvalue_dict[node].pop(parent) self._inter_test_stat_dict = test_stat_dict self.def_parents_ = def_parent_dict self.def_children_ = def_children_dict # now we will re-learn the skeleton using the MCI condition skel_graph, sep_set, test_stat_dict, pvalue_dict = learn_skeleton_graph_with_order( # type: ignore X, self.mci_ci_estimator, adj_graph=orig_graph, sep_set=orig_sep_set, fixed_edges=fixed_edges, alpha=self.mci_alpha, min_cond_set_size=self.min_cond_set_size, max_cond_set_size=self.max_cond_set_size, max_combinations=1, keep_sorted=False, with_mci=True, max_conds_x=self.max_conds_x, max_conds_y=self.max_conds_y, parent_dep_dict=test_stat_dict, size_inclusive=self.size_inclusive, only_mci=self.only_mci, **self.ci_estimator_kwargs, ) return skel_graph, sep_set, test_stat_dict, pvalue_dict
def _estimate_definite_parents(self, graph: CPDAG): def_parent_dict = dict() for node in graph.nodes: # get all predecessors of the node that have an arrowhead into node def_parent_dict[node] = list(graph.predecessors(node)) return def_parent_dict def _estimate_definite_children(self, graph: CPDAG): def_children_dict = dict() for node in graph.nodes: # get all predecessors of the node that have an arrowhead into node def_children_dict[node] = list(graph.successors(node)) return def_children_dict