Source code for causal_networkx.algorithms.d_separation
from typing import Union
import numpy as np
from networkx.algorithms import d_separated as nx_d_separated
from causal_networkx import ADMG, DAG
[docs]def d_separated(G: Union[DAG, ADMG], x, y, z=None):
"""Check d-separation among 'x' and 'y' given 'z' in graph G.
This algorithm wraps ``networkx.algorithms.d_separated``, but
allows one to pass in a ``ADMG`` instance instead.
It first converts all bidirected edges into explicit unobserved
confounding nodes in an explicit ``networkx.DiGraph``, which then
calls ``networkx.algorithms.d_separated`` to determine d-separation.
This inherently increases the runtime cost if there are many
bidirected edges, because many nodes must be added.
Parameters
----------
G : ADMG
Causal graph.
x : set
First set of nodes in ``G``.
y : set
Second set of nodes in ``G``.
z : set
Set of conditioning nodes in ``G``. Can be empty set.
See Also
--------
causal_networkx.ADMG
networkx.algorithms.d_separation.d_separated
Notes
-----
This wraps the networkx implementation, which only allows DAGs. Since
``ADMG`` is not represented.
"""
if z is None:
z = set()
# run d-separation
if isinstance(x, np.ndarray):
x = set(list(x))
elif isinstance(x, str):
x = set([x])
elif type(x) == int or float:
x = set([x])
if isinstance(y, np.ndarray):
y = set(list(y))
elif isinstance(y, str):
y = set([y])
elif type(y) == int or float:
y = set([y])
if isinstance(z, np.ndarray):
z = set(list(z))
elif isinstance(z, str):
z = set([z])
elif type(z) in (int, float):
z = set([z])
if type(G).__name__ == "DAG":
return nx_d_separated(G.to_networkx(), x, y, z)
# get the full graph by converting bidirected edges into latent confounders
# and keeping the directed edges
explicit_G = G.compute_full_graph(to_networkx=True)
# make sure there are always conditioned on the conditioning set
z = z.union(G._cond_set)
return nx_d_separated(explicit_G, x, y, z)