Source code for tsl.ops.az_test

from collections import namedtuple

import numpy as np
import scipy.stats

from tsl.typing import Optional, OptTensArray, TensArray, Tuple, Union


def _twosided_std_gaussian_pval(stat: float):
    """Return the two-sided p-value associated with statistic `stat`
    distributed as a standard Gaussian."""
    return 2 * (1 - scipy.stats.norm.cdf(np.abs(stat)))


def _to_numpy(o: Union[TensArray, list, int, float, None]):
    """Cast the object `o` to `numpy.ndarray` or a `float`. If it is `None`,
    it will be left as `None`."""
    if isinstance(o, np.ndarray):
        return o
    if isinstance(o, list):
        return np.array(o)
    if isinstance(o, int) or isinstance(o, float):
        return float(o)
    if o is None:
        return o
    import torch
    if isinstance(o, torch.Tensor):
        return o.numpy()
    raise NotImplementedError(
        f"I don't know how to convert {type(o)} to numpy")


def _to_undirected_no_selfloops(
    edge_index: np.ndarray, edge_weight: Optional[Union[np.ndarray, int,
                                                        float]]
) -> Tuple[np.ndarray, Optional[np.ndarray]]:
    """Remove self-loops, make the graph undirected, remove duplicated edges,
    and sum the weights corresponding to duplicated edges; it works with
    `numpy.ndarray`."""

    # Check inputs
    assert edge_index.shape[1] > 0
    if isinstance(edge_weight, int) or isinstance(edge_weight, float):
        edge_weight = edge_weight * np.ones(edge_index.shape[1])
    if edge_weight is not None:
        assert edge_index.shape[1] == edge_weight.shape[0]

    # Remove self-loops
    selfloop_mask = edge_index[0] != edge_index[1]
    edge_index_ = edge_index[:, selfloop_mask]
    edge_weight_ = edge_weight[selfloop_mask]

    # Sort edges to have an undirected graph
    edge_index_.sort(axis=0)
    order = np.lexsort(edge_index_)
    # edges
    edge_index_ = edge_index_[:, order]
    # weights
    if edge_weight is not None:
        edge_weight_ = edge_weight_[order]

    # Mask of unique edges (or the first of the duplicates) and non self-loops
    unique_mask = np.any(edge_index_[:, 1:] != edge_index_[:, :-1], axis=0)
    unique_mask = np.append(True, unique_mask)
    unique_mask_inds, = np.nonzero(unique_mask)
    # edges
    edge_index_ = edge_index_[:, unique_mask]
    # weights
    if edge_weight_ is not None:
        edge_weight_ = np.add.reduceat(edge_weight_,
                                       unique_mask_inds,
                                       dtype=edge_weight_.dtype)

    return edge_index_, edge_weight_


AZWhitenessTestResult = namedtuple('AZWhitenessTestResult',
                                   ('statistic', 'pvalue'))
AZWhitenessMultiTestResult = namedtuple(
    'AZWhitenessMultiTestResult',
    ('statistic', 'pvalue', 'componentwise_tests'))


[docs]def az_whiteness_test( x: TensArray, edge_index: TensArray, mask: OptTensArray = None, pattern: str = "t n f", edge_weight: Optional[Union[TensArray, float]] = None, edge_weight_temporal: Optional[float] = None, lamb: float = 0.5, multivariate: bool = False, remove_median: bool = False ) -> Union[AZWhitenessTestResult, AZWhitenessMultiTestResult]: """Implementation of the AZ-whiteness test from the paper `"AZ-whiteness test: a test for uncorrelated noise on spatio-temporal graphs" <https://arxiv.org/abs/2204.11135>`_ (D. Zambon and C. Alippi, NeurIPS 2022). Args: x (TensArray): graph signal, typically with pattern "t n f" and representing the prediction residuals. edge_index (TensArray): indices of the spatial edges with shape (2, E). Current implementation supports only a static topology. mask (TensArray, optional): boolean mask of signal :obj:`x`, with same size of :obj:`x`. The mask is :obj:`True` where the observations in :obj:`x` are valid and :obj:`False` otherwise. (default: :obj:`None`) pattern (str): string encoding the index pattern of `x`, typically "t n f" representing time, nodes and node features dimensions, respectively. (default: :obj:`"t n f"`) edge_weight (TensArray or float, optional): positive weights of the spatial edges. It can be a :obj:`TensArray` of shape (E,), or a scalar value (same weight for all edges). (default: :obj:`None`) edge_weight_temporal (float, optional): positive scalar weight for all temporal edges. If :obj:`None` or :obj:`"auto"`, the weight is computed to balance the contribution of the spatial and temporal components (see `Zambon and Alippi, 2022 <https://arxiv.org/abs/2204.11135>`_). (default: :obj:`None`) lamb (float, optional): scalar factor in within :math:`0.0` and :math:`1.0` defining a convex combination of the spatial and temporal components; if :obj:`lamb == 1.0` the test is applied on the spatial topology only, for :obj:`lamb == 0.0` only the serial correlation is considered. (default: :obj:`0.5`) multivariate (bool): whether to run a single test on a multivariate signal or combine multiple scalar tests, one for each of the :obj:`f` features. It applies only when :obj:`f > 1`. (default: :obj:`False`) remove_median (bool): whether to manually fulfill --- where possible --- the assumption of null median or not. (default: :obj:`False`) Returns: AZWhitenessTestResult or AZWhitenessMultiTestResult: The test statistics. """ # retrieve pattern dims = pattern.strip().split(' ') T_DIM, N_DIM, F_DIM = dims.index("t"), dims.index("n"), dims.index("f") # data to numpy.ndarray x = _to_numpy(x) assert x.ndim == 3 mask = _to_numpy(mask) edge_index_spatial = _to_numpy(edge_index) edge_weight_spatial = _to_numpy(edge_weight) if remove_median: x_ = x.copy() if mask is not None: x_[np.logical_not(mask)] = np.nan x_median = np.nanmedian(x_, axis=[T_DIM, N_DIM], keepdims=True) x -= x_median F = x.shape[F_DIM] if F == 1: multivariate = True az_test_args = dict(x=x, mask=mask, pattern=pattern, edge_index_spatial=edge_index_spatial, edge_weight_spatial=edge_weight_spatial, edge_weight_temporal=edge_weight_temporal, lamb=lamb) if multivariate: # Single test with edge statistic: `sign( (xu * xv).sum() )`. return _az_whiteness_test(**az_test_args) else: # Multiple scalar tests based on `sign( xu[f] * xv[f] )`, i.e., a test # for each feature dimension. res = [] for f in range(F): x_ = x[..., f:f + 1] if mask is None: mask_ = None else: mask_ = mask[..., f:f + 1] az_test_args["x"] = x_ az_test_args["mask"] = mask_ res.append(_az_whiteness_test(**az_test_args)) C_multi = np.sum([r.statistic for r in res]) / np.sqrt(len(res)) pval = _twosided_std_gaussian_pval(C_multi) return AZWhitenessMultiTestResult(C_multi, pval, res)
def _az_whiteness_test(x, mask, pattern, edge_index_spatial, edge_weight_spatial, edge_weight_temporal, lamb): """Core computation of the AZ-whiteness test. All parameters are assumed to be `numpy.ndarray` or `float`. """ # retrieve pattern dims = pattern.strip().split(' ') T_DIM, N_DIM, F_DIM = dims.index("t"), dims.index("n"), dims.index("f") T, N = x.shape[T_DIM], x.shape[N_DIM] # --- Spatial edges and weight --- # Parse weight if edge_weight_spatial is None: edge_weight_spatial = 1.0 if (isinstance(edge_weight_spatial, int) or isinstance(edge_weight_spatial, float)): edge_weight_spatial = edge_weight_spatial * np.ones( edge_index_spatial.shape[1]) # Check dims assert edge_weight_spatial.shape[0] == edge_index_spatial.shape[ 1], "Dimension mismatch between edge_weight and edge_index." assert np.all(edge_weight_spatial > 0), \ "Edge weights are not all positive." assert N == edge_index_spatial.max() + 1, \ "Is the input signal given with pattern (T, N, F)?" # Make the graph undirected and without self-loops edge_index_spatial, edge_weight_spatial = _to_undirected_no_selfloops( edge_index=edge_index_spatial, edge_weight=edge_weight_spatial) # Node mask if mask is None: mask = np.ones_like(x) mask = mask.astype(int) assert np.all(np.logical_or(mask == 0, mask == 1)) mask_node = mask.max(axis=F_DIM) # Mask data x = x * mask # Edge mask: # - repeat node mask for every source node # - repeat node mask for every target node # - compare the two mask_edge_spatial = np.where( np.logical_and(mask_node[:, edge_index_spatial[0]], mask_node[:, edge_index_spatial[1]])) # Spatial normalization factor # sums over all unmasked edges (it considers already the dynamic graph # with all "repeated" edges) W_spatial = np.sum(edge_weight_spatial[mask_edge_spatial[1]]**2) # --- Temporal edges and weight --- # Parse temporal edge weight if T == 1: num_temporal_edge_masked = 0 edge_weight_temporal = 1 else: assert T_DIM == 0 # num of temporal edges num_temporal_edge_masked = (mask[1:] * mask[:-1]).sum() # default temporal weight if edge_weight_temporal == "auto" or edge_weight_temporal is None: edge_weight_temporal = np.sqrt(W_spatial / num_temporal_edge_masked) assert isinstance(edge_weight_temporal, int) or isinstance( edge_weight_temporal, float) assert edge_weight_temporal > 0 # Temporal normalization factor W_temporal = (edge_weight_temporal**2) * num_temporal_edge_masked # --- Test statistics --- # Inner products assert T_DIM == 0 and F_DIM == 2 # (T, E, F) * (T, E, F) -> (T, E, F) xxs = x[:, edge_index_spatial[0]] * x[:, edge_index_spatial[1]] # (T, E, F) -> (T, E) xxs = xxs.sum(axis=F_DIM) # (T-1, N, F) * (T-1, N, F) -> (T-1, N, F) xxt = x[1:] * x[:-1] # (T-1, N, F) -> (T-1, N) xxt = xxt.sum(axis=F_DIM) # Weighted signs and Ctilde # (1, E) * (T, E) -> (T, E) w_sgn_xxs = edge_weight_spatial[None, ...] * np.sign(xxs) Ctilde_spatial = w_sgn_xxs.sum() sgn_xxt = np.sign(xxt) Ctilde_temporal = edge_weight_temporal * sgn_xxt.sum() # Normalize Ctilde: C assert 0 <= lamb <= 1 Ctilde = lamb * Ctilde_spatial + (1 - lamb) * Ctilde_temporal W = (lamb**2) * W_spatial + ((1 - lamb)**2) * W_temporal C = Ctilde / np.sqrt(W) # p-value pval = _twosided_std_gaussian_pval(C) return AZWhitenessTestResult(C, pval)