Source code for tsl.ops.connectivity

from types import ModuleType
from typing import List, Optional, Tuple, Union

import numpy as np
import pandas as pd
import torch
import torch_sparse
from scipy.sparse import coo_matrix
from torch import Tensor
from torch_geometric.data.storage import recursive_apply
from torch_geometric.nn.conv.gcn_conv import gcn_norm
from torch_geometric.typing import Adj, OptTensor
from torch_geometric.utils import add_remaining_self_loops, subgraph
from torch_sparse import SparseTensor, fill_diag

from tsl.typing import (DataArray, OptTensArray, ScipySparseMatrix,
                        SparseTensArray, TensArray, TorchConnectivity)
from tsl.utils import casting


def maybe_num_nodes(edge_index, num_nodes=None):
    if num_nodes is not None:
        return num_nodes
    elif isinstance(edge_index, np.ndarray):
        return int(edge_index.max()) + 1 if edge_index.size > 0 else 0
    elif isinstance(edge_index, Tensor):
        return int(edge_index.max()) + 1 if edge_index.numel() > 0 else 0
    else:
        return max(edge_index.size(0), edge_index.size(1))


def infer_backend(obj, backend: ModuleType = None):
    if backend is not None:
        return backend
    elif isinstance(obj, Tensor):
        return torch
    elif isinstance(obj, np.ndarray):
        return np
    elif isinstance(obj, SparseTensor):
        return torch_sparse
    raise RuntimeError(f"Cannot infer valid backed from {type(obj)}.")


def convert_torch_connectivity(connectivity: TorchConnectivity,
                               target_layout: str,
                               input_layout: Optional[str] = None,
                               num_nodes: Optional[int] = None):
    formats = [None, 'dense', 'sparse', 'edge_index']
    assert input_layout in formats and target_layout in formats[1:]

    edge_attr = None

    # infer input_layout from data
    if input_layout is None:
        if isinstance(connectivity, SparseTensor):
            input_layout = 'sparse'
        elif isinstance(connectivity, (list, tuple)):
            connectivity, edge_attr = connectivity
        if isinstance(connectivity, Tensor):
            if connectivity.size(0) == connectivity.size(1):
                if connectivity.size(0) == 2 and connectivity.ndim == 2:
                    raise RuntimeError("Cannot infer input_format from [2, 2] "
                                       "connectivity matrix.")
                input_layout = 'dense'
            elif connectivity.size(0) == 2 and connectivity.ndim == 2:
                input_layout = 'edge_index'
                connectivity = (connectivity, edge_attr)
    # if input_layout is still None, it cannot be inferred
    if input_layout is None:
        raise RuntimeError("Cannot infer input_format from connectivity.")

    # check input_layout matches data
    if input_layout == 'dense':
        assert isinstance(connectivity, Tensor) \
               and connectivity.size(-2) == connectivity.size(-1)
    elif input_layout == 'sparse':
        assert isinstance(connectivity, SparseTensor) \
               and connectivity.dim() == 2
    elif input_layout == 'edge_index':
        if isinstance(connectivity, (list, tuple)):
            connectivity, edge_attr = connectivity
        assert isinstance(connectivity, Tensor) \
               and connectivity.size(0) == 2 and connectivity.ndim == 2
        connectivity = (connectivity, edge_attr)

    if input_layout == target_layout:
        return connectivity

    # edge index -> sparse tensor
    if input_layout == 'edge_index' and target_layout == 'sparse':
        edge_index, edge_attr = connectivity
        return SparseTensor.from_edge_index(edge_index, edge_attr,
                                            (num_nodes, num_nodes)).t()
    # edge index -> dense tensor
    if input_layout == 'edge_index' and target_layout == 'dense':
        edge_index, edge_weights = connectivity
        return edge_index_to_adj(edge_index, edge_weights, num_nodes=num_nodes)
    # dense tensor -> sparse tensor
    if input_layout == 'dense' and target_layout == 'sparse':
        return SparseTensor.from_dense(connectivity)
    # dense tensor -> edge index
    if input_layout == 'dense' and target_layout == 'edge_index':
        return adj_to_edge_index(connectivity)
    # sparse tensor -> dense tensor
    if input_layout == 'sparse' and target_layout == 'dense':
        return connectivity.to_dense()
    # sparse tensor -> edge index
    if input_layout == 'sparse' and target_layout == 'edge_index':
        row, col, edge_attr = connectivity.t().coo()
        edge_index = torch.stack([row, col], dim=0)
        return edge_index, edge_attr


[docs]def adj_to_edge_index(adj: TensArray, backend: ModuleType = None) \ -> Tuple[TensArray, TensArray]: """Convert adjacency matrix from dense layout to (:obj:`edge_index`, :obj:`edge_weight`) tuple. The input adjacency matrix is transposed before conversion. Args: adj (TensArray): dense adjacency matrix as :class:`~torch.Tensor` or :class:`~numpy.ndarray`. backend (ModuleType, optional): backend matching :obj:`adj` type (either :mod:`numpy` or :mod:`torch`), if :obj:`None` it is inferred from :obj:`adj` type. (default :obj:`None`) Returns: tuple: (:obj:`edge_index`, :obj:`edge_weight`) tuple of same type of :obj:`adj` (:class:`~torch.Tensor` or :class:`~numpy.ndarray`). """ backend = infer_backend(adj, backend) assert backend in [torch, np] assert 2 <= adj.ndim <= 3 assert adj.shape[-1] == adj.shape[-2] if backend is torch: adj = torch.transpose(adj, -2, -1) index = adj.nonzero(as_tuple=True) else: adj = np.swapaxes(adj, -2, -1) # transpose index = adj.nonzero() edge_attr = adj[index] if len(index) == 3: batch = index[0] * adj.shape[-1] index = (batch + index[1], batch + index[2]) edge_index = backend.stack(index, 0) return edge_index, edge_attr
def edge_index_to_adj(edge_index: TensArray, edge_weights: OptTensArray = None, num_nodes: Optional[int] = None) -> TensArray: N = maybe_num_nodes(edge_index, num_nodes) if isinstance(edge_index, Tensor): if edge_weights is None: edge_weights = torch.ones(edge_index.size(1), dtype=torch.float32, device=edge_index.device) adj = torch.zeros((N, N), dtype=edge_weights.dtype, device=edge_weights.device) else: if edge_weights is None: edge_weights = np.ones(edge_index.shape[1], dtype=np.float32) adj = np.zeros((N, N), dtype=edge_weights.dtype) adj[edge_index[0], edge_index[1]] = edge_weights return adj.T def transpose(edge_index: SparseTensArray, edge_weights: OptTensArray = None) \ -> Union[TensArray, Tuple[TensArray, TensArray]]: if isinstance(edge_index, SparseTensor): return edge_index.t() if edge_weights is not None: return edge_index[[1, 0]], edge_weights return edge_index[[1, 0]]
[docs]def reduce_graph(subset: Union[Tensor, List[int]], edge_index: SparseTensArray, num_nodes: Optional[int] = None, backend: ModuleType = None) \ -> Tuple[TensArray, OptTensArray]: """Returns the subgraph with all nodes in :attr:`subset` and only the edges between them. Args: subset: The index of the nodes in the output subgraph. edge_index: Adjacency matrix as COO :obj:`edge_index` or :class:`torch_sparse.SparseTensor`. num_nodes: The number of nodes. (default: :obj:`None`) backend (ModuleType, optional): Backend matching :obj:`edge_index` type (either :mod:`numpy` or :mod:`torch`), if :obj:`None` it is inferred from :obj:`edge_index` type. (default :obj:`None`) Returns: tuple: edge_index, edge_mask """ backend = infer_backend(edge_index, backend) if backend is torch: edge_index, _, edge_mask = subgraph(subset, edge_index, return_edge_mask=True, relabel_nodes=True, num_nodes=num_nodes) return edge_index, edge_mask else: edge_index = edge_index[subset, subset] return edge_index, None
[docs]def weighted_degree(index: TensArray, weights: OptTensArray = None, num_nodes: Optional[int] = None) -> TensArray: r"""Computes the weighted degree of a given one-dimensional index tensor. Args: index (LongTensor): Index tensor. weights (Tensor): Edge weights tensor. num_nodes (int, optional): The number of nodes, *i.e.* :obj:`max_val + 1` of :attr:`index`. (default: :obj:`None`) """ N = maybe_num_nodes(index, num_nodes) if isinstance(index, Tensor): if weights is None: weights = torch.ones((index.size(0), ), device=index.device, dtype=torch.int) out = torch.zeros((N, ), dtype=weights.dtype, device=weights.device) out.scatter_add_(0, index, weights) else: if weights is None: weights = np.ones(index.shape[0], dtype=np.int) out = np.zeros(N, dtype=weights.dtype) np.add.at(out, index, weights) return out
[docs]def asymmetric_norm(edge_index: SparseTensArray, edge_weight: OptTensArray = None, dim: int = 0, num_nodes: Optional[int] = None, add_self_loops: bool = False) \ -> Tuple[SparseTensArray, OptTensArray]: r"""Normalize edge weights across dimension :obj:`dim`. .. math:: e_{i,j} = \frac{e_{i,j}}{deg_{i}\ \text{if dim=0 else}\ deg_{j}} Args: edge_index (LongTensor): Edge index tensor. edge_weight (Tensor): Edge weights tensor. dim (int): Dimension over which to compute normalization. num_nodes (int, optional): The number of nodes, *i.e.* :obj:`max_val + 1` of :attr:`index`. (default: :obj:`None`) add_self_loops: Whether to add self loops to the adjacency matrix. """ backend = infer_backend(edge_index) if backend is torch_sparse: assert edge_weight is None if add_self_loops: edge_index = fill_diag(edge_index, 1) deg = edge_index.sum(dim=dim).to(torch.float) deg_inv = deg.pow(-1.0) deg_inv[deg_inv == float('inf')] = 0 edge_index = deg_inv.view(-1, 1) * edge_index return edge_index, None if add_self_loops: edge_index, tmp_edge_weight = add_remaining_self_loops( edge_index, edge_weight, 1, num_nodes) assert tmp_edge_weight is not None edge_weight = tmp_edge_weight index = edge_index[dim] degree = weighted_degree(index, edge_weight, num_nodes=num_nodes) norm_weight = (1 if edge_weight is None else edge_weight) / degree[index] return edge_index, norm_weight
def normalize_connectivity(edge_index, edge_weight, norm, num_nodes) -> Tuple[Adj, OptTensor]: if norm == 'sym': norm_edge_index = gcn_norm(edge_index, edge_weight, num_nodes, add_self_loops=False) if isinstance(edge_index, SparseTensor): return norm_edge_index, None return norm_edge_index elif (norm == 'asym') or (norm == 'mean'): return asymmetric_norm(edge_index, edge_weight, dim=1, num_nodes=num_nodes, add_self_loops=False) elif norm == 'gcn': norm_edge_index = gcn_norm(edge_index, edge_weight, num_nodes, add_self_loops=True) if isinstance(edge_index, SparseTensor): return norm_edge_index, None return norm_edge_index elif (norm == 'none') or (norm is None): if (edge_weight is None) and not isinstance(edge_index, SparseTensor): edge_weight = torch.ones((edge_index.size(1), ), device=edge_index.device) return edge_index, edge_weight else: raise NotImplementedError(f'Normalization {norm} not implemented.')
[docs]def power_series(edge_index: TensArray, edge_weights: OptTensArray = None, k: int = 2, num_nodes: Optional[int] = None) \ -> Tuple[TensArray, TensArray]: r"""Compute order :math:`k` power series of sparse adjacency matrix (:math:`A^k`). Args: edge_index (LongTensor): Edge index tensor. edge_weights (Tensor): Edge weights tensor. k (int): Order of power series. num_nodes (int, optional): The number of nodes, *i.e.* :obj:`max_val + 1` of :attr:`index`. (default: :obj:`None`) """ N = maybe_num_nodes(edge_index, num_nodes) if isinstance(edge_index, Tensor): from torch_geometric.utils import (from_scipy_sparse_matrix, to_scipy_sparse_matrix) coo = to_scipy_sparse_matrix(edge_index, edge_weights, N) coo = coo**k return from_scipy_sparse_matrix(coo) else: if edge_weights is None: edge_weights = np.ones(edge_index.shape[1], dtype=np.float32) coo = coo_matrix((edge_weights, tuple(edge_index)), (N, N)) coo = (coo**k).tocoo() return np.stack([coo.row, coo.col], 0).astype(np.int64), coo.data
[docs]def get_dummy_edge_index(dummy: str, num_nodes: int, edge_prob: float = 0.1, directed: bool = True, device=None): r"""Create an edge index corresponding to a certain dummy connectivity (e.g., full graph). Args: dummy (str): The dummy connectivity, can be one of :obj:`"identity"` (`A`=`I`), :obj:`"full"` (`A = np.ones(N, N)`), :obj:`"random"` or `:obj:`"none"` (returns :obj:`None`). num_nodes (int): Number of nodes in the graph. edge_prob (float): Edge probability for the random graph. (default :obj:`0.1`) directed (bool): Whether to generate a directed/undirected graph. (default :obj:`True`) device (optional): Device for the created tensor. (default :obj:`None`) """ if dummy == 'identity': nodes = torch.arange(num_nodes, device=device) edge_index = torch.stack([nodes, nodes]) elif dummy == 'random': from torch_geometric.utils import erdos_renyi_graph edge_index = erdos_renyi_graph(num_nodes, edge_prob, directed=directed).to(device) elif dummy == 'full': nodes = torch.arange(num_nodes, device=device) edge_index = torch.cartesian_prod(nodes, nodes).T elif dummy == 'none': edge_index = None else: raise NotImplementedError return edge_index
def parse_connectivity( connectivity: Union[SparseTensArray, Tuple[DataArray]], target_layout: Optional[str] = None, num_nodes: Optional[int] = None ) -> Tuple[Optional[Adj], Optional[Tensor]]: # Convert to torch # from np.ndarray, pd.DataFrame or torch.Tensor if isinstance(connectivity, (pd.DataFrame, np.ndarray, Tensor)): connectivity = casting.copy_to_tensor(connectivity) elif isinstance(connectivity, (list, tuple)): connectivity = recursive_apply(connectivity, casting.copy_to_tensor) # from scipy sparse matrix elif isinstance(connectivity, ScipySparseMatrix.__args__): connectivity = SparseTensor.from_scipy(connectivity) elif not isinstance(connectivity, SparseTensor): raise TypeError("`connectivity` must be a dense matrix or in " "COO format (i.e., an `edge_index`).") if target_layout is not None: connectivity = convert_torch_connectivity(connectivity, target_layout, num_nodes=num_nodes) return connectivity