Source code for pathcensus.core.graph

"""Simple JIT-compiled graph class for calculating path census."""
from typing import Optional, Tuple
import numpy as np
import numba
from numba.typed import List   # pylint: disable=no-name-in-module
from numba.experimental import jitclass
from .types import UInt, Float
from ..definitions import PathDefinitionsUnweighted, PathDefinitionsWeighted


_NPATHS_UNWEIGHTED = PathDefinitionsUnweighted().npaths
_NPATHS_WEIGHTED   = PathDefinitionsWeighted().npaths


[docs] @jitclass([ ("E", UInt[:, ::1]), ("D", UInt[::1]), ("W", numba.optional(Float[::1])), ("S", numba.optional(Float[::1])), ("_strides", UInt[::1]) ]) class Graph: """Graph represented as an edgelist. The edgelist representation is efficient for edge-oriented algorithms such as path census. Attributes ---------- n_nodes Number of nodes in the graph. E Edgelist as 2D C-contiguous array of unsigned 64-bit integers. It has to be sorted by head indices. W Optional 1D array of 64-bit floats storing edge weights. """ def __init__( self, n_nodes: int, E: np.ndarray, W: Optional[np.ndarray] = None ) -> None: # Sort edge array so i <= j # and save sorting order indices for sorting weight array o1, o2 = self._get_ij_ordering(E) E = E[o1][o2] # Add edge ids eid = np.arange(len(E), dtype=UInt) _E = np.ascontiguousarray(np.column_stack((eid, E))) self.E = _E # Make strides array for efficient access # to node neighborhoods. self._strides = self._make_strides(n_nodes, self.E) # Set node degree array self.D = self.degree() if W is not None: if self.E.shape[0] != W.shape[0]: raise AttributeError("'E' and 'W' have to be of the same length") _W = W[o1][o2] self.W = np.ascontiguousarray(_W) # Set node strength array self.S = self.strength() else: self.W = None self.S = None # Properties -------------------------------------------------------------- @property def vcount(self): """Number of nodes.""" return len(self._strides) - 1 @property def n_nodes(self): return self.vcount @property def ecount(self): """Number of edges.""" return len(self.E) // 2 @property def n_edges(self): return self.ecount @property def vids(self): return np.arange(self.vcount, dtype=UInt) @property def eids(self): return np.arange(self.ecount, dtype=UInt) @property def directed(self) -> bool: return False @property def weighted(self) -> bool: return self.W is not None @property def npaths(self) -> int: if self.weighted: return _NPATHS_WEIGHTED return _NPATHS_UNWEIGHTED # Methods ----------------------------------------------------------------- def get_edges(self) -> np.ndarray: """Get undirected edges without self-loops (`i < j`).""" E = self.E mask = E[:, 1] < E[:, 2] return E[mask] def get_min_di_edges(self) -> np.ndarray: """Get undirected edges without self loops (`di <= dj`).""" E = self.E D = self.D i = E[:, 1] j = E[:, 2] di = D[i] dj = D[j] mask = (di < dj) | ((di == dj) & (i < j)) return E[mask] def degree(self) -> np.ndarray: """Get node degrees.""" return self._strides[1:] - self._strides[:-1] def strength(self) -> np.ndarray: """Get node strengths.""" S = np.empty((self.vcount,), dtype=Float) for i, seq in enumerate(zip(self._strides[:-1], self._strides[1:])): start, end = seq S[i] = self.W[start:end].sum() return S def N(self, i: int) -> np.ndarray: """Get 1-neighborhood of ``i``.""" start, end = self._strides[i:i+2] return self.E[start:end, np.array([0, 2])] # Path counting methods --------------------------------------------------- def _count_paths_unweighted( self, E: np.ndarray ) -> Tuple[np.ndarray, np.ndarray]: """Count unweighted paths. Parameters ---------- E Array with edge and source/target indices to consider in the calculations. Returns ------- E 2D array of edges as source and target indices (only i < j). counts 2D array with path counts per edge. """ # pylint: disable=too-many-locals,too-many-branches,too-many-statements counts = np.empty((len(E), self.npaths), dtype=Float) # Array for keeping track of node roles role = np.zeros((self.n_nodes,), dtype=np.uint8) D = self.D # degree sequence # Main loop for idx, edge in enumerate(E): _, i, j = edge _zero = Float(0) t = tw = th = _zero q = qw = qh = _zero star_i = set(List.empty_list(UInt)) star_j = set(List.empty_list(UInt)) tri_ij = set(List.empty_list(UInt)) for _, k in self.N(i): if k == j: continue star_i.add(k) role[k] = 1 # Count wedge triples tw += 1 for _, k in self.N(j): if k == i: continue if role[k] == 1: # Count triangles t += 1 star_i.remove(k) tri_ij.add(k) role[k] = 3 else: star_j.add(k) role[k] = 2 # Count head triples th += 1 for k in star_i: for _, l in self.N(k): if l == i: continue # Count strong quadrangles if role[l] == 2: q += 1 # Count wedge and head quadruples # and clear `role` vector. for k in star_i: qw += D[k] - 1 role[k] = 0 for k in star_j: qh += D[k] - 1 role[k] = 0 for k in tri_ij: n = D[k] - 2 qw += n qh += n role[k] = 0 counts[idx] = ( t, tw, th, q, qw, qh ) return E, counts def _count_paths_weighted( self, E: np.ndarray ) -> Tuple[np.ndarray, np.ndarray]: """Count weighted paths. Parameters ---------- E Array with edge and source/target indices to consider in the calculations. Returns ------- E 2D array of edges as source and target indices (only i < j). counts 2D array with weighted path counts per edge. """ # pylint: disable=too-many-locals,too-many-branches,too-many-statements counts = np.zeros((len(E), self.npaths), dtype=Float) # Array for keeping track of node roles role = np.zeros((self.n_nodes,), dtype=np.uint8) D = self.D # degree sequence S = self.S # strength sequence W = self.W # edge weights array # Weight arrays for links from `i` and `j` to their neighbors Wi = np.empty((self.n_nodes,), dtype=Float) Wj = np.empty((self.n_nodes,), dtype=Float) # Main loop for idx, edge in enumerate(E): ij, i, j = edge wij = W[ij] _zero = Float(0) twc = thc = tw = th = _zero q0wc = q0hc = qw = qh = _zero star_i = set(List.empty_list(UInt)) star_j = set(List.empty_list(UInt)) tri_ij = set(List.empty_list(UInt)) for ik, k in self.N(i): if k == j: continue star_i.add(k) role[k] = 1 wik = W[ik] Wi[k] = wik # Count wedge triples w = (wij + wik) / 2 tw += w for jk, k in self.N(j): if k == i: continue wjk = W[jk] Wj[k] = wjk w = (wij + wjk) / 2 # Handle triangles if role[k] == 1: star_i.remove(k) tri_ij.add(k) role[k] = 3 # Count triangles # both with wedge and head weighting twc += (wij + Wi[k]) / 2 thc += w else: star_j.add(k) role[k] = 2 # Count head triples th += w for k in star_i: wik = Wi[k] for kl, l in self.N(k): if l == i: continue # Count strong quadrangles if role[l] == 2: wkl = W[kl] wjl = Wj[l] q0wc += (wij + wik + wkl) / 3 q0hc += (wij + wjl + wkl) / 3 # Count wedge and head quadruples of `i` # and clear `role` vector. for k in star_i: wik = Wi[k] qw += ((D[k]-1)*(wij+wik) + S[k] - wik) / 3 role[k] = 0 for k in star_j: wjk = Wj[k] qh += ((D[k]-1)*(wij+wjk) + S[k] - wjk) / 3 role[k] = 0 for k in tri_ij: wik = Wi[k] wjk = Wj[k] dk = D[k] sk = S[k] qw += ((dk-2)*(wij+wik) + sk - wjk - wik) / 3 qh += ((dk-2)*(wij+wjk) + sk - wjk - wik) / 3 role[k] = 0 counts[idx] = ( twc, thc, tw, th, q0wc, q0hc, qw, qh ) return E, counts def count_paths( self, E: np.ndarray ) -> Tuple[np.ndarray, np.ndarray]: """Count paths. Parameters ---------- E Array with edge and source/target indices to consider in the calculations. Returns ------- E 2D array of edges as source and target indices (only i < j). paths 2D array with weighted path counts per edge. """ if self.weighted: E, counts = self._count_paths_weighted(E) else: E, counts = self._count_paths_unweighted(E) return ( np.ascontiguousarray(E[:, 1:]), np.ascontiguousarray(counts) ) # Internals --------------------------------------------------------------- def _make_strides(self, n_nodes, E): strides = np.zeros((n_nodes+1,), dtype=UInt) last = eid = UInt(0) for eid, i in E[:, :2]: diff = i - last if diff > 0: strides[i] = eid if diff > 1: for k in range(last+1, i): strides[k] = eid last = i for k in range(last+1, len(strides)): strides[k] = len(E) return strides def _get_ij_ordering(self, E): o1 = np.argsort(E[:, 1]) o2 = np.argsort(E[o1, 0]) return o1, o2