"""Approximate inference for arbitrary graph statistics,
including structural coefficients, can be conducted using
samples from appropriate Exponential Random Graph Models.
The following generic algorithm can be used to solve a wide
range of inferential problems:
#. Calculate statistics of interest on an observed graph.
#. Sample ``R`` randomized instances from an appropriate null model.
#. Calculate graph statistics on null model samples.
#. Compare observed and null model values.
:class:`Inference` class implements the above approach.
It is comaptible with any registered class of graph-like objects
and any properly implemented subclass of
:class:`pathcensus.nullmodels.base.ERGM` representing a null model
to sample from.
.. seealso::
:mod:`pathcensus.graph` for seemless ``pathcensus`` integration
with arbitrary graph-like classes.
:mod:`pathcensus.nullmodels` for available null models.
This simulation-based approach is relatively efficient for graph-
and node-level statistics but can be very computationally expensive
when used for edge-level analyses. Hence, is this case it is often
useful to use various coarse-graining strategies to reduce the number
of unique combinations of values of sufficient statistics.
.. seealso::
:class:`pathcensus.graph.GraphABC` for the abstract class
for graph-like objects.
:mod:`pathcensus.nullmodels` for compatible ERGM classes.
:meth:`pathcensus.inference.Inference.coarse_grain`
for coarse-graining methods.
Below is a simple example of an estimation of p-values of node-wise
structural similarity coefficients in an Erdős–Rényi random graph.
The result, of course, should not be statistically significant.
We use the default significance level of :math:`\\alpha = 0.05` and
Benjamini-Hochberg FDR correction for multiple testing.
.. testsetup:: inference
import numpy as np
np.random.seed(34)
.. doctest:: inference
>>> import numpy as np
>>> from scipy import sparse as sp
>>> from pathcensus import PathCensus
>>> from pathcensus.inference import Inference
>>> from pathcensus.nullmodels import UBCM
>>> np.random.seed(34)
>>> # Generate ER random graph (roughly)
>>> A = sp.random(100, 100, density=0.05, dtype=int, data_rvs=lambda n: np.ones(n))
>>> A = (A + A.T).astype(bool).astype(int)
>>> ubcm = UBCM(A)
>>> err = ubcm.fit()
>>> infer = Inference(A, ubcm, lambda g: PathCensus(g).similarity())
>>> data, null = infer.init_comparison(100)
>>> pvals = infer.estimate_pvalues(data, null, alternative="greater")
>>> # Structural similarity coefficient values
>>> # should not be significant more often than 5% of times
>>> # (BH FDR correction is used)
>>> (pvals <= 0.05).mean() <= 0.05
True
"""
from __future__ import annotations
from typing import Any, Optional, Mapping
from typing import Sequence, Tuple, Dict, Callable, Literal, Union
from dataclasses import dataclass
import numpy as np
import pandas as pd
from statsmodels.stats.multitest import fdrcorrection_twostage
from tqdm.auto import tqdm
from .nullmodels.base import ERGM
from .types import GraphABC, Data
[docs]
class Inference:
"""Generic approximate statistical inference based on
arbitrary null models with node-level sufficient statistics.
The methods implemented by this class are based on sampling
from null models so they are may not be very efficient, in particular
for edge-level statistics. On the other hand, they allow to conduct
statistical inferency for arbitrary graph statistics.
Attributes
----------
graph
Graph-like object representing an observed network.
model
Fitted instance of a subclass of
:py:class:`pathcensus.nullmodels.ERGM`.
statistics
Function for calculating graph statistics of interest with the
following signature::
(graph, **kwds) -> DataFrame / Series
The first argument must be a graph-like object (e.g. a sparse matrix),
``**kwds`` can be used to pass additional arguments
(only keyword args are allowed) if necessary.
The return value must be either a :py:class:`pandas.DataFrame`
or :py:class:`pandas.Series`.
aggregate_by
Mode of aggregation for determining null distribution.
If ``"stats"`` then null distribution is aggregated within
unique combinations of values of the sufficient statistics
(possibly coarse-grained, see :py:meth:`init_comparison`).
If ``"units"`` then null distribution is aggregated within individual
units (e.g. nodes). This is often useful in analyses at the level
of nodes but may require too many samples for edge-level analyses.
"""
_alternative = ("greater", "less")
_aggregate_by = ("stats", "units")
_filter_index = ("values", "range")
index_names = ("i", "j")
[docs]
@dataclass
class Levels:
"""Container class for storing information on unit,
sufficient statistics and other index levels in observed
and null model data.
"""
units: Tuple[str, ...]
stats: Tuple[str, ...]
other: Tuple[str, ...]
def __bool__(self) -> bool:
return bool(self.units or self.stats or self.other)
def __init__(
self,
graph: GraphABC,
model: ERGM,
statistics: Callable[[GraphABC], Data],
*,
aggregate_by: Literal[_aggregate_by] = _aggregate_by[0] # type: ignore
) -> None:
"""Initialization method."""
self.graph = graph
self.model = model
self.statistics = statistics
self.aggregate_by = self._check_vals(
aggregate_by=aggregate_by,
allowed=self._aggregate_by
)
[docs]
def __call__(
self,
graph: GraphABC,
_stats: Optional[np.ndarray] = None,
**kwds: Any
) -> Data:
"""This method should be called to actually calculate graph statistics.
Parameters
----------
graph
Graph-like object to calculate statistics for.
stats
Array of sufficient statistics for nodes.
If ``None`` then `self.model.statistics` is used.
**kwds
Passed to graph statistics function.
"""
data = self.statistics(graph, **kwds)
if isinstance(data, pd.Series) and all(isinstance(n ,str) for n in data.index):
# Dirty hack to handle global coefs passed as a series
data = data.to_frame().T
if _stats is None:
_stats = self.model.extract_statistics(graph)
# _stats = self.model.statistics
out = self._postprocess_data(data, _stats)
return out
@property
def n_nodes(self) -> int:
"""Number of nodes in the observed network."""
return self.model.n_nodes
[docs]
def get_levels(self, data: Data) -> Levels:
"""Get index levels descriptor from a data object."""
nodes = tuple(l for l in data.index.names if l in self.index_names)
allowed_stats = [ f"{l}{i}" for l in self.model.labels for i in nodes ]
stats = tuple(l for l in data.index.names if l in allowed_stats)
other = tuple(
l for l in data.index.names
if l and l not in (*nodes, *stats)
)
return self.Levels(nodes, stats, other)
[docs]
def simulate_null(
self,
n: int,
*,
progress: bool = False,
progress_kws: Optional[Dict] = None,
use_observed_stats: bool = True,
**kwds: Any
) -> Data:
"""Get data frame of null model samples of strucutral coefficients.
Parameters
----------
n
Number of samples.
progress
Should progress bar be showed.
progress_kws
Keyword arguments for customizing progress bar when
``progress=True``. Passed to :py:func:`tqdm.tqdm`.
use_observed_stats
If ``True`` then simulated data is indexed with
sufficient statistics from the observed network.
This often helps to accumulate enough observations
faster at the expense of not fully exact conditioning.
**kwds
Keyword arguments passed to :py:meth:`statistics`.
Returns
-------
null
Data frame with simulated null distribution.
"""
rand = []
keys = []
simulator = self.model.sample(n)
progress_kws = {
**(progress_kws or {}),
"disable": not progress,
"total": n
}
for i, graph in tqdm(enumerate(simulator), **progress_kws):
keys.append(i)
_stats = self.model.statistics if use_observed_stats else None
rand.append(self(graph, _stats=_stats, **kwds))
null = pd.concat(rand, keys=keys, names=["_sample"])
return null
[docs]
def filter_index(
self,
data: Data,
target: Data,
*,
how: Literal[_filter_index] = _filter_index[0], # type: ignore
levels: Optional[Union[Sequence[str], Sequence[int]]] = None
) -> Data:
"""Filter ``data`` by index with respect to ``target``.
Parameters
----------
data
Data to filter.
target
Dataset with target index.
how
How index should be filtered.
Either by unique combinations of values or just contained
to the range of values for separate levels in ``target``.
levels
Levels to use for filtering. If ``None`` then either
``self.levels.units`` or ``self.levels.stats`` is used depending
on the value of ``self.aggregate_by``.
Returns
-------
data
Filtered copy of ``data``.
"""
how = self._check_vals(how=how, allowed=self._filter_index)
l = self.get_levels(target)
if levels is None:
levels = l.stats if self.aggregate_by == "stats" else l.units
# Filter by unique combinations of values in `target`
if how == "values":
# Determinex index values in `data`
remove = [ n for n in data.index.names if n not in levels ]
didx = data.reset_index(remove).index if remove else data.index
# Determine index values in `target`
remove = [n for n in target.index.names if n not in levels ]
tidx = target.reset_index(remove).index if remove else target.index
data = data[didx.isin(tidx)]
# Filter down to ranges of levels in `target`
else:
for level in levels:
tidx = target.index.get_level_values(level).values
didx = data.index.get_level_values(level).values
minx = min(tidx)
maxx = max(tidx)
data = data[(didx >= minx) & (didx <= maxx)]
return data
[docs]
def init_comparison(
self,
n: int,
*,
filter_index: Union[bool, Literal[_filter_index]] = False, # type: ignore
sample_index: bool = False,
null_kws: Optional[Dict] = None,
**kwds: Any
) -> Tuple[Data, Data, Levels]:
"""Initialize data for a comparison with a null model
and determine index level names.
Parameters
----------
n
Number of null model samples.
filter_index
If ``True`` or ``"values"`` then ``null`` will be filtered to
contain only observations with index values matching those in
``data`` with levels used for the comparison selected based on
``self.aggregate_by``. If ``"range"`` then null model samples
will be filtered to be in the range of index values in the
observed data.
sample_index
If ``False`` then ``_sample`` index with sample ids
is dropped from ``null`` data frame with null model samples.
null_kws:
Keyword args passed to :py:meth:`simulate_null`.
**kwds
Passed to :py:meth:`statistics` method used for calculating
statistics of interest.
Notes
-----
Estimating distributions of edge-wise statistics conditional
on sufficient statistics of the participating nodes may
require really large number of samples and in general is
not really feasible for large networks (in particular weighted).
The same applies, although probably to slightly lesser degree,
to node-wise statistics when ``use_observed_stats=False``
is passed to :py:meth:`simulate_null`.
Efficient methods for solving these problems will be implemented
in the future.
Returns
-------
data
Observed graph statistics.
null
Null distribution samples.
"""
if isinstance(filter_index, bool):
if filter_index:
filter_index = "range"
else:
filter_index = self._check_vals(
filter_index=filter_index,
allowed=self._filter_index
)
data = self(self.graph, **kwds)
self._validate_data(data)
null_kws = (null_kws or {})
null = self.simulate_null(n, **(null_kws or {}), **kwds)
levels = self.get_levels(data)
remove = levels.units if self.aggregate_by == "stats" else levels.stats
remove = [ *remove, *levels.other ]
if remove:
null.reset_index(remove, drop=True, inplace=True)
data = pd.concat([data], keys=[0], names=["_"])
null = pd.concat([null], keys=[0], names=["_"])
data = self._remove_unnamed_indexes(data)
null = self._remove_unnamed_indexes(null)
if not sample_index:
null.reset_index("_sample", drop=True, inplace=True)
if filter_index:
null = self.filter_index(null, data, how=filter_index)
return data, null
[docs]
def postprocess(self, data: Data, target: Data) -> Data:
"""Postprocess data after running a comparison.
This mainly involves sanitizing index names after aggregation
as well as setting proper shape and types for outputs
so ``data`` has the same general form as ``target``.
"""
if isinstance(data, pd.Series) and isinstance(target, pd.DataFrame):
data = data.to_frame().T
data.index = target.index
if isinstance(data, (pd.Series, pd.DataFrame)):
data = self.postprocess_index(data)
return data
[docs]
def postprocess_index(self, data: Data) -> Data:
"""Postprocess index after running a comparison.
This involves getting rid of temporary index names
used when running comparisons as well as any unnamed indexes.
Moreover sufficient statistics indexes are removed
if ``self.aggregate_by == "unit"`` or ensuring that observed values of
sufficient statistics are used in the index
(instead of coarse-grained values) if ``self.aggregate == "stats"``.
"""
levels = self.get_levels(data)
# Remove sufficient statistics indexes
remove = [ l for l in levels.stats if l in data.index.names ]
# Remove auxiliary index `_`
if "_" in data.index.names:
remove.append("_")
if remove:
data.reset_index(remove, drop=True, inplace=True)
# Remove unnamed indexes
remove = [ i for i, n in enumerate(data.index.names) if n is None ]
if remove:
data.reset_index(remove, drop=True, inplace=True)
# Add indexes if necessary
if self.aggregate_by == "stats":
data = self.add_stats_index(data, self.model.statistics)
return data
[docs]
def estimate_pvalues(
self,
data: pd.DataFrame,
null: pd.DataFrame,
*,
alternative: Literal[_alternative] = _alternative[0], # type: ignore
adjust: bool = True,
resolution: int = 1000,
**kwds: Any
) -> Data:
"""Estimate p-values of node/edge/global coefficients
based on sampling from a configuraiton model
(as returned by :py:meth:`init_comparison`).
Parameters
----------
data
Data frame with observed graph statistics.
null
Data frame with simulated null distribution
of graph statistics.
alternative
Type of test two perform.
Currently only one-sided tests are supported.
adjust
Should p-values be adjusted. Benjamini-Hochberg FDR correction is
used by default when ``True``.
resolution
Resolution of p-value estimation. It specifies the number
of quantiles to comapre observed values against.
For instance, if ``resolution=100`` then p-values
will be accurate only up to ``0.01``.
This parameter controls the amount of memory consumed
by the estimation process.
**kwds
Passed as additional arguents to :meth:`adjust_pvalues`
when ``adjust=True``.
See Also
--------
adjust_pvalues : p-value adjustment method
Returns
-------
pvalues
P-values for statistics as :py:class:`pandas.Series`
(for one graph statistic) or :py:class:`pandas.DataFrame`
(for multiple statistics).
"""
alternative = self._check_vals(
alternative=alternative,
allowed=self._alternative
)
levels = self.get_levels(data)
if levels.units:
null = self.filter_index(null, data, how="values")
# Quantile data frame
if self.aggregate_by == "units":
keys = levels.units
else:
keys = levels.stats
if keys:
qdf = null.groupby(level=keys)
else:
qdf = null
qdf = qdf.quantile(np.arange(0, resolution+1) / resolution) \
.reset_index(level=-1, drop=True)
idx = qdf.index.to_frame().rename(columns={0: None})
idx.insert(0, "_", 0)
idx = pd.MultiIndex.from_frame(idx)
qdf.index = idx
# Estimate p-values
if alternative == "greater":
pvals = data.le(qdf)
else:
pvals = data.ge(qdf)
pvals = pvals.fillna(True)
if levels.units:
pvals = pvals.groupby(level=levels.units)
pvals = pvals.mean()
if adjust and levels.units:
adjust_kws = { **kwds, "copy": False }
pvals = self.adjust_pvalues(pvals, **adjust_kws)
return self.postprocess(pvals, target=data)
[docs]
@staticmethod
def adjust_pvalues(
pvals: Data,
*,
alpha: float = 0.05,
copy: True = bool,
**kwds: Any
) -> Data:
"""Adjust p-values for multiple testing.
Benjamini-Hochberg-Yekuteli two-stage procedure implemented in
:py:func:`statsmodels.multitest.fdrcorrection_twostage`
is used.
Parameters
----------
pvals
Data frame / series with p-values for different coefficients
in columns.
alpha
Desired type I error rate after the adjustement.
copy
Should copy of ``pvals`` be returned.
**kwds
Additional arguments passed to
:py:func:`statsmodels.multitest.fdrcorrection_twostage`
"""
if copy:
pvals = pvals.copy()
shape = pvals.shape
pv = pvals.values.flatten()
_, pv, *_ = fdrcorrection_twostage(pv, alpha=alpha, **kwds)
pvals.values[:] = np.clip(pv.reshape(shape), 0, 1)
return pvals
[docs]
@staticmethod
def add_index(
data: Data,
idx: Mapping[str, Sequence],
*,
prepend: bool = False,
drop_unnamed: bool = True,
copy: bool = True
) -> Data:
"""Add index to a data frame or series.
Parameters
----------
data
Data frame or series.
idx
Mapping from index names to sequences of values.
prepend
Should new indexes be prepended or appended
to the existing indexes.
drop_unnamed
Should unnamed indexes be droppped during the process.
Unnamed indexes are usually generic indexes which are
redundant after adding additional indexes.
copy
Should a copy be returned.
"""
idx = pd.DataFrame(idx)
if copy:
data = data.copy()
idf = data.index.to_frame(index=False)
if drop_unnamed:
use = [ i for i, n in enumerate(data.index.names) if n is not None ]
idf = idf.iloc[:, use]
objs = [ idx, idf ] if prepend else [ idf, idx ]
objs = [ d for d in objs if not d.empty ]
if objs:
idf = pd.concat(objs, axis=1, ignore_index=False)
else:
# Return if there no resulting indexes
return data.reset_index(drop=True)
if len(idf) == 0:
return data
if len(idf) == 1:
idx = pd.Index(idf.iloc[:, 0])
else:
idx = pd.MultiIndex.from_frame(idf)
data.index = idx
return data
[docs]
def add_stats_index(
self,
data: Data,
stats: Optional[np.ndarray] = None
) -> Data:
"""Add indexes with sufficient statistics.
Parameters
----------
data
Data frame or series with graph statistics.
stats
Array of sufficient statistics.
Use ``self.model.statistics`` if ``None``.
"""
idx = {}
levels = self.get_levels(data)
if stats is None:
stats = self.model.statistics
for u in levels.units:
for i, l in enumerate(self.model.labels):
vals = stats[data.index.get_level_values(u), i]
name = f"{l}{u}"
idx[name] = vals
return self.add_index(
data=data,
idx=idx,
prepend=False,
drop_unnamed=True,
copy=False
)
# Internals ---------------------------------------------------------------
def _postprocess_data(
self,
data: Data,
stats: np.ndarray
) -> Data:
"""Post-process data with calculated graph statistics.
Parameters
----------
data
Calculate graph statistics.
stats
Array with sufficient statistics for nodes.
"""
if np.isscalar(data):
data = pd.Series([data])
if stats.ndim == 1:
stats = stats[:, None]
if self.aggregate_by == "stats":
data = self.add_stats_index(data, stats)
return data
def _remove_unnamed_indexes(
self,
data: Data
) -> Data:
"""Remove unnamed indexes."""
remove = []
for i, name in enumerate(data.index.names):
if name is None:
remove.append(i)
return data.reset_index(level=remove, drop=True)
def _check_vals(
self,
*,
allowed: Sequence[str],
**kwds: Any
) -> str:
"""Check if value is okay and return."""
if len(kwds) != 1:
raise ValueError("exactly two keyword arguments are expected")
allowed = tuple(allowed)
key = list(kwds.keys())[0]
val = list(kwds.values())[0]
if val not in allowed:
raise ValueError(f"'{key}' has to be one of {allowed}")
return val
def _validate_data(self, data: Data) -> None:
"""Check if `data` has correct indexes and shape."""
if not isinstance(data, (pd.Series, pd.DataFrame)):
m = "'data' has to be either 'Series' or 'DataFrame' instance"
raise TypeError(m)
levels = self.get_levels(data)
if levels.stats and not levels.units:
raise AttributeError(
"'data' has sufficient statistics "
"but no node/edge indexes"
)
if self.aggregate_by == "stats" and levels.units and not levels.stats:
raise AttributeError(
"'data' has node/edge indexes but no "
"index with sufficient statistics"
)