import logging
from collections import deque
from itertools import chain
from typing import Any, Dict, List, Optional, Set, Tuple
from warnings import warn
import networkx as nx
import numpy as np
from causal_networkx import PAG
logger = logging.getLogger()
[docs]def possibly_d_sep_sets(graph: PAG, node_x, node_y=None, max_path_length: int = np.inf):
"""Find all PDS sets between node_x and node_y.
Possibly d-separting (PDS) sets are adjacency paths from 'node_x' to
some node 'V', which has the following characteristics for every
subpath triple <X, Y, Z>:
- Y is a collider, or
- Y is a triangle (i.e. X, Y and Z form a complete subgraph)
If Y is a triangle, then it will be uncertain with circular edges
due to the fact that it is a shielded triple, not allowing us to infer
that it is a collider.
Parameters
----------
graph : PAG
_description_
node_x : node
The node 'x'.
node_y : node
The node 'y'.
max_path_length : int
The maximum length of a path to search on.
Returns
-------
pds_set : set
The possibly d-separating set between node_x and node_y.
"""
if max_path_length == np.inf:
max_path_length = 1000
distance = 0
edge = None
# possibly d-sep set
dsep: Set[Any] = set()
# a queue to
q: deque = deque()
seen_edges = set()
node_list: Optional[List[Any]] = []
# keep track of previous nodes along the path for every node
# along a path
previous = {node_x: None}
# get the adjacency graph to perform path searches over
adj_graph = graph.to_adjacency_graph()
if node_y is not None:
# edge case: check that there exists paths between node_x
# and node_y
if not nx.has_path(adj_graph, node_x, node_y):
return dsep
# get a list of all neighbors of node_x that is not y
# and add these as candidates to explore a path
# and also add them to the possibly d-separating set
for node_v in graph.adjacencies(node_x):
# ngbhr cannot be endpoint
if node_v == node_y:
continue
if node_y is not None:
# used for RFCI
# check that node_b is connected to the endpoint if
# the endpoint is passed
if not nx.has_path(adj_graph, node_v, node_y):
continue
# form edge as a tuple
edge = (node_x, node_v)
# this path from node_x - node_v is a candidate path
# that will have a possibly d-separating set
q.append(edge)
# keep track of the edes
seen_edges.add(edge)
# all immediately adjacent nodes are part of the pdsep set
dsep.add(node_v)
while len(q) != 0:
this_edge = q.popleft()
prev_node, this_node = this_edge
# if we get the previous edge, then increment the distance
# and
if this_edge == edge:
edge = None
distance += 1
if distance > 0 and distance > max_path_length:
break
if node_y is not None:
# check that node_b is connected to the endpoint if
# the endpoint is passed
if not nx.has_path(adj_graph, this_node, node_y):
continue
# now add this_node to the pdsep set, since we have
# reached this node
dsep.add(this_node)
# now we want to check the subpath that is created
# using the previous node, the current node and the next node
for next_node in graph.adjacencies(this_node):
# check if 'node_c' in (X, Y, prev_node)
if next_node in (prev_node, node_x, node_y):
continue
# get the previous nodes, and add the previous node
# for this next node
node_list = previous.get(next_node)
if node_list is None:
node_list = []
node_list.append(this_node)
# check that we have a definite collider
# check the edge: prev_node - this_node
# check the edge: this_node - next_node
is_def_collider = graph.is_def_collider(prev_node, this_node, next_node)
# check that there is a triangle, meaning
# prev_node is adjacent to next_node
is_triangle = graph.has_adjacency(prev_node, next_node)
# if we have a collider, or triangle, then this edge
# is a candidate on a pdsep path
if is_def_collider or is_triangle:
next_edge = (prev_node, next_node)
if next_edge in seen_edges:
continue
seen_edges.add(next_edge)
q.append(next_edge)
if edge is None:
edge = next_edge
return dsep
[docs]def discriminating_path(graph: PAG, u, a, c, max_path_length: int):
"""Find the discriminating path for <..., a, u, c>.
A discriminating path, p = <v, ..., a, u, c>, is one
where:
- p has at least 3 edges
- u is non-endpoint and u is adjacent to c
- v is not adjacent to c
- every vertex between v and u is a collider on p and parent of c
Parameters
----------
graph : PAG
PAG to orient.
u : node
A node in the graph.
a : node
A node in the graph.
c : node
A node in the graph.
max_path_length : int
The maximum distance to check in the graph.
Returns
-------
explored_nodes : dict
A hash map of explored nodes.
found_discriminating_path : bool
Whether or not a discriminating path was found.
disc_path : list
The discriminating path starting from node c.
"""
if max_path_length == np.inf:
max_path_length = 1000
explored_nodes: Dict[Any, None] = dict()
found_discriminating_path = False
disc_path: List[Any] = []
# parents of c form the discriminating path
cparents = graph.parents(c)
# keep track of the distance searched
distance = 0
# keep track of the previous nodes, i.e. to build a path
# from node (key) to its child along the path (value)
descendant_nodes = dict()
descendant_nodes[u] = c
descendant_nodes[a] = u
# keep track of paths of certain nodes that were already explored
# start off with the valid triple <a, u, c>
# - u is adjacent to c
# - u has an arrow pointing to a
# - TBD a is a definite collider
# - TBD endpoint is not adjacent to c
explored_nodes[c] = None
explored_nodes[u] = None
explored_nodes[a] = None
print(c, u, a)
# a must be a parent of c
if not graph.has_edge(a, c):
return explored_nodes, found_discriminating_path, disc_path
# a and u must be connected by a bidirected edge, or with an edge towards a
# for a to be a definite collider
if not graph.has_bidirected_edge(a, u) and not graph.has_edge(u, a):
return explored_nodes, found_discriminating_path, disc_path
# now add 'a' to the queue and begin exploring
# adjacent nodes that are connected with bidirected edges
path = deque([a])
while not len(path) == 0:
this_node = path.popleft()
# check distance criterion to prevent checking very long paths
distance += 1
if distance > 0 and distance > max_path_length:
warn(
f"Did not finish checking discriminating path in {graph} because the path "
f"length exceeded {max_path_length}."
)
return explored_nodes, found_discriminating_path, disc_path
# now we check all neighbors to this_node that are pointing to it
# either with a direct edge, or a bidirected edge
node_iterator = chain(graph.possible_parents(this_node), graph.parents(this_node))
if this_node in graph.c_component_graph:
node_iterator = chain(node_iterator, graph.c_component_graph.neighbors(this_node))
# 'next_node' is either a parent, possible parent, or in a bidrected
# edge with 'this_node'.
# 'this_node' is a definite collider since there was
# confirmed an arrow pointing towards 'this_node'
# and 'next_node' is connected to it via a bidirected arrow.
for next_node in node_iterator:
# if we have already explored this neighbor, then it is
# already along the potentially discriminating path
if next_node in explored_nodes:
continue
# keep track of explored_nodes
explored_nodes[next_node] = None
# Check if 'next_node' is now the end of the discriminating path.
# Note we now have 3 edges in the path by construction.
if not graph.has_adjacency(next_node, c) and next_node != c:
logger.info(f"Reached the end of the discriminating path with {next_node}.")
explored_nodes[next_node] = None
descendant_nodes[next_node] = this_node
found_discriminating_path = True
break
# If we didn't reach the end of the discriminating path,
# then we can add 'next_node' to the path. This only occurs
# if 'next_node' is a valid new entry, which requires it
# to be a part of the parents of 'c'.
if next_node in cparents and graph.has_bidirected_edge(this_node, next_node):
# check that the next_node is a possible collider with at least
# this_node -> next_node
# since it is a parent, we can now add it to the path queue
path.append(next_node)
descendant_nodes[next_node] = this_node
explored_nodes[next_node] = None
# return the actual uncovered pd path
if found_discriminating_path:
disc_path = deque([]) # type: ignore
disc_path.append(next_node)
while disc_path[-1] != c:
disc_path.append(descendant_nodes[disc_path[-1]])
# disc_path.appendleft(next_node)
# while disc_path[0] != c:
# disc_path.appendleft(descendant_nodes[disc_path[0]])
return explored_nodes, found_discriminating_path, disc_path
[docs]def uncovered_pd_path(
graph: PAG, u, c, max_path_length: int, first_node=None, second_node=None
) -> Tuple[List, bool]:
"""Compute uncovered potentially directed path from u to c.
An uncovered pd path is one where: u o-> ... -> c. There are no
bidirected arrows, bidirected circle arrows, or opposite arrows.
In addition, every node beside the endpoints are unshielded,
meaning V(i-1) and V(i+1) are not adjacent.
Parameters
----------
graph : ADMG
PAG to orient.
u : node
A node in the graph to start the uncovered path.
c : node
A node in the graph.
max_path_length : int
The maximum distance to check in the graph.
first_node : node, optional
The node previous to 'u'. If it is before 'u', then we will check
that 'u' is unshielded. If it is not passed, then 'u' is considered
the first node in the path and hence does not need to be unshielded.
Both 'first_node' and 'second_node' cannot be passed.
second_node : node, optional
The node after 'u' that the path must traverse. Both 'first_node'
and 'second_node' cannot be passed.
Notes
-----
Typically uncovered potentially directed paths are defined by two nodes. However,
in its common use case within the FCI algorithm, it is usually defined relative
to an adjacent third node that comes before 'u'.
"""
if first_node is not None and second_node is not None:
raise RuntimeError(
"Both first and second node cannot be set. Only set one of them. "
"Read the docstring for more information."
)
if (
any(node not in graph for node in (u, c))
or (first_node is not None and first_node not in graph)
or (second_node is not None and second_node not in graph)
):
raise RuntimeError("Some nodes are not in graph... Double check function arguments.")
if max_path_length == np.inf:
max_path_length = 1000
explored_nodes: Dict[Any, None] = dict()
found_uncovered_pd_path = False
uncov_pd_path: List[Any] = []
# keep track of the distance searched
distance = 0
start_node = u
# keep track of the previous nodes, i.e. to build a path
# from node (key) to its child along the path (value)
descendant_nodes = dict()
if first_node is not None:
descendant_nodes[u] = first_node
if second_node is not None:
descendant_nodes[second_node] = u
# keep track of paths of certain nodes that were already explored
# start off with the valid triple <a, u, c>
# - u is adjacent to c
# - u has an arrow pointing to a
# - TBD a is a definite collider
# - TBD endpoint is not adjacent to c
explored_nodes[u] = None
if first_node is not None:
explored_nodes[first_node] = None
if second_node is not None:
explored_nodes[second_node] = None
# we now want to start on the second_node
start_node = second_node
# now add 'a' to the queue and begin exploring
# adjacent nodes that are connected with bidirected edges
path = deque([start_node])
while not len(path) == 0:
this_node = path.popleft()
prev_node = descendant_nodes.get(this_node)
# check distance criterion to prevent checking very long paths
distance += 1
if distance > 0 and distance > max_path_length:
warn(
f"Did not finish checking discriminating path in {graph} because the path "
f"length exceeded {max_path_length}."
)
return uncov_pd_path, found_uncovered_pd_path
# get all adjacent nodes to 'this_node'
for next_node in graph.adjacencies(this_node):
# if we have already explored this neighbor, then ignore
if next_node in explored_nodes:
continue
# now check that the next_node is uncovered by comparing
# with the previous node, because the triple is shielded
if prev_node is not None:
if graph.has_adjacency(prev_node, next_node):
continue
# now check that the triple is potentially directed, else
# we skip this node
if not graph.has_edge(this_node, next_node):
continue
# now this next node is potentially directed, does not
# form a shielded triple, so we add it to the path
explored_nodes[next_node] = None
descendant_nodes[next_node] = this_node
# if we have reached our end node, then we have found an
# uncovered possibly-directed path
if next_node == c:
logger.info(f"Reached the end of the uncovered pd path with {next_node}.")
found_uncovered_pd_path = True
break
path.append(next_node)
# return the actual uncovered pd path
if first_node is None:
first_node = u
if found_uncovered_pd_path:
uncov_pd_path = deque([]) # type: ignore
uncov_pd_path.appendleft(c) # type: ignore
while uncov_pd_path[0] != first_node:
uncov_pd_path.appendleft(descendant_nodes[uncov_pd_path[0]]) # type: ignore
uncov_pd_path = list(uncov_pd_path)
return uncov_pd_path, found_uncovered_pd_path
def compute_all_discriminating_paths():
pass