Source code for tsl.datasets.prototypes.tabular_dataset

from contextlib import contextmanager
from copy import deepcopy
from typing import Dict, List, Mapping, Optional, Tuple, Union

import numpy as np
import pandas as pd
from numpy import ndarray
from pandas import Index

from tsl import logger
from tsl.ops.framearray import aggregate, fill_nan, framearray_to_numpy, reduce
from tsl.typing import (FillOptions, FrameArray, OptFrameArray, Scalar,
                        TemporalIndex)
from tsl.utils.python_utils import ensure_list

from ...ops.pattern import broadcast, outer_pattern
from . import casting
from .dataset import Dataset
from .mixin import TabularParsingMixin


[docs]class TabularDataset(Dataset, TabularParsingMixin): r"""Base :class:`~tsl.datasets.prototypes.Dataset` class for tabular data. Tabular data are assumed to be 3-dimensional arrays where the dimensions represent time, nodes and features, respectively. They can be either :class:`~pandas.DataFrame` or :class:`~numpy.ndarray`. Args: target (FrameArray): :class:`~pandas.DataFrame` or :class:`numpy.ndarray` containing the data related to the target signals. The first dimension (or the DataFrame index) is considered as the temporal dimension. The second dimension represents nodes, the last one denotes the number of channels. If the input array is bi-dimensional (or the DataFrame's columns are not a :class:`~pandas.MultiIndex`), the sequence is assumed to be univariate (number of channels = 1). If DataFrame's columns are a :class:`~pandas.MultiIndex` with two levels, we assume nodes are at first level, channels at second. covariates (dict, optional): named mapping of :class:`~pandas.DataFrame` or :class:`numpy.ndarray` representing covariates. Examples of covariates are exogenous signals (in the form of dynamic, multidimensional data) or static attributes (e.g., graph/node metadata). You can specify what each axis refers to by providing a :obj:`pattern` for each item in the mapping. Every item can be: + a :class:`~pandas.DataFrame` or :class:`~numpy.ndarray`: in this case the pattern is inferred from the shape (if possible). + a :class:`dict` with keys 'value' and 'pattern' indexing the covariate object and the relative pattern, respectively. (default: :obj:`None`) mask (FrameArray, optional): Boolean mask denoting if values in target are valid (:obj:`True`) or not (:obj:`False`). (default: :obj:`None`) similarity_score (str): Default method to compute the similarity matrix with :obj:`compute_similarity`. It must be inside dataset's :obj:`similarity_options`. (default: :obj:`None`) temporal_aggregation (str): Default temporal aggregation method after resampling. (default: :obj:`sum`) spatial_aggregation (str): Default spatial aggregation method for :obj:`aggregate`, i.e., how to aggregate multiple nodes together. (default: :obj:`sum`) default_splitting_method (str, optional): Default splitting method for the dataset, i.e., how to split the dataset into train/val/test. (default: :obj:`temporal`) force_synchronization (bool): Synchronize all time-varying covariates with target. (default: :obj:`True`) name (str, optional): Optional name of the dataset. (default: :obj:`class_name`) precision (int or str, optional): numerical precision for data: 16 (or "half"), 32 (or "full") or 64 (or "double"). (default: :obj:`32`) """ def __init__(self, target: FrameArray, mask: OptFrameArray = None, covariates: Optional[Mapping[str, Union[FrameArray, Mapping, Tuple]]] = None, similarity_score: Optional[str] = None, temporal_aggregation: str = 'sum', spatial_aggregation: str = 'sum', default_splitting_method: Optional[str] = 'temporal', force_synchronization: bool = True, name: str = None, precision: Union[int, str] = 32): super().__init__(name=name, similarity_score=similarity_score, temporal_aggregation=temporal_aggregation, spatial_aggregation=spatial_aggregation, default_splitting_method=default_splitting_method) # Set data precision before parsing objects self.precision = precision self.force_synchronization = force_synchronization # Set dataset's main signal self.target = self._parse_target(target) from .datetime_dataset import DatetimeDataset if not isinstance(self, DatetimeDataset) \ and casting.is_datetime_like_index(self.index): logger.warn( "It seems you have timestamped data. You may " "consider to use tsl.datasets.DatetimeDataset instead.") self.mask: Optional[np.ndarray] = None self.set_mask(mask) # Store covariates (e.g., exogenous and attributes) self._covariates = dict() if covariates is not None: for name, value in covariates.items(): self.add_covariate(name, **self._value_to_kwargs(value)) def __getattr__(self, item): if '_covariates' in self.__dict__ and item in self._covariates: return self._covariates[item]['value'] raise AttributeError("'{}' object has no attribute '{}'".format( self.__class__.__name__, item)) def __delattr__(self, item): if item == 'mask': self.set_mask(None) elif item in self._covariates: del self._covariates[item] else: super(TabularDataset, self).__delattr__(item) # Dataset properties ###################################################### @property def length(self) -> int: """Number of time steps in the dataset.""" return self.target.shape[0] @property def n_nodes(self) -> int: """Number of nodes in the dataset.""" if self.is_target_dataframe: return len(self.nodes) return self.target.shape[1] @property def n_channels(self) -> int: """Number of channels in dataset's target.""" if self.is_target_dataframe: return len(self.channels) return self.target.shape[2] @property def shape(self) -> tuple: return self.length, self.n_nodes, self.n_channels @property def index(self) -> Union[pd.Index, TemporalIndex, np.ndarray]: if self.is_target_dataframe: return self.target.index return np.arange(self.length) @property def nodes(self) -> Union[pd.Index, np.ndarray]: if self.is_target_dataframe: return self.target.columns.unique(0) return np.arange(self.n_nodes) @property def channels(self) -> Union[pd.Index, np.ndarray]: if self.is_target_dataframe: return self.target.columns.unique(1) return np.arange(self.n_channels) @property def patterns(self) -> dict: """Shows the dimension of the data in the dataset in a more informative way. The pattern mapping can be useful to glimpse on how data are arranged. The convention we use is the following: * 't' stands for “number of time steps” * 'n' stands for “number of nodes” * 'f' stands for “number of features” (per node) """ patterns = {'target': 't n f'} if self.mask is not None: patterns['mask'] = 't n f' patterns.update( {name: attr['pattern'] for name, attr in self._covariates.items()}) return patterns # Covariates properties @property def covariates(self) -> dict: return {name: attr['value'] for name, attr in self._covariates.items()} @property def exogenous(self): """Time-varying covariates of the dataset's target.""" return { name: attr['value'] for name, attr in self._covariates.items() if 't' in attr['pattern'] } @property def attributes(self): """Static features related to the dataset.""" return { name: attr['value'] for name, attr in self._covariates.items() if 't' not in attr['pattern'] } @property def n_covariates(self) -> int: """Number of covariates in the dataset.""" return len(self._covariates) # flags @property def is_target_dataframe(self) -> bool: return isinstance(self.target, pd.DataFrame) @property def has_mask(self) -> bool: return self.mask is not None @property def has_covariates(self) -> bool: return self.n_covariates > 0 # Setters #################################################################
[docs] def set_target(self, value: FrameArray): r"""Set sequence of target channels at :obj:`self.target`.""" self.target = self._parse_target(value)
[docs] def set_mask(self, mask: OptFrameArray): r"""Set mask of target channels, i.e., a bool for each (node, time step, feature) triplet denoting if corresponding value in target is observed (obj:`True`) or not (obj:`False`).""" if mask is not None: mask = self._parse_target(mask).astype('bool') with self.synchronize(True): mask, _ = self._parse_covariate(mask, 't n f') mask = framearray_to_numpy(mask) # check mask features are broadcastable to target's features if mask.shape[-1] not in [1, self.n_channels]: raise RuntimeError(f"Mask features ({mask.shape[-1]}) cannot " "be broadcasted to target's number of " f"features {self.n_channels}.") self.mask = mask
# Setters for covariates
[docs] def add_covariate(self, name: str, value: FrameArray, pattern: Optional[str] = None): r"""Add covariate to the dataset. Examples of covariate are exogenous signals (in the form of dynamic multidimensional data) or static attributes (e.g., graph/node metadata). Parameter :obj:`pattern` specifies what each axis refers to: - 't': temporal dimension; - 'n': node dimension; - 'c'/'f': channels/features dimension. For instance, the pattern of a node-level covariate is 't n f', while a pairwise metric between nodes has pattern 'n n'. Args: name (str): the name of the object. You can then access the added object as :obj:`dataset.{name}`. value (FrameArray): the object to be added. pattern (str, optional): the pattern of the object. A pattern specifies what each axis refers to: - 't': temporal dimension; - 'n': node dimension; - 'c'/'f': channels/features dimension. If :obj:`None`, the pattern is inferred from the shape. (default :obj:`None`) """ # name cannot be an attribute of self, but allow override invalid_names = set(dir(self)) if name in invalid_names: raise ValueError(f"Cannot add object with name '{name}', " f"{self.__class__.__name__} contains already an " f"attribute named '{name}'.") value, pattern = self._parse_covariate(value, pattern) self._covariates[name] = dict(value=value, pattern=pattern)
[docs] def add_exogenous(self, name: str, value: FrameArray, node_level: bool = True): """Shortcut method to add a time-varying covariate.""" if name.startswith('global_'): name = name[7:] node_level = False pattern = 't n f' if node_level else 't f' self.add_covariate(name, value, pattern)
# Getters ################################################################# def get_mask(self, dtype: Union[type, str, np.dtype] = None, as_dataframe: bool = False) -> FrameArray: mask = self.mask if self.has_mask else ~np.isnan(self.numpy()) if dtype is not None: assert dtype in ['bool', 'uint8', bool, np.uint8] mask = mask.astype(dtype) if as_dataframe: data = mask.reshape(self.length, -1) mask = pd.DataFrame(data, index=self.index, columns=self._columns_multiindex()) return mask def expand_frame(self, key: str, pattern: str, time_index: Union[List, np.ndarray] = None, node_index: Union[List, np.ndarray] = None, channel_index: Union[List, np.ndarray] = None) \ -> np.ndarray: obj = getattr(self, key) x = framearray_to_numpy(obj) in_pattern = self.patterns[key] if channel_index is not None: assert in_pattern.count('f') == 1, \ "Can select channels only in frames with just one " \ "channel dimension." dim = in_pattern.strip().split(' ').index('f') if isinstance(obj, pd.DataFrame): axis = 'columns' if dim > 0 else 'index' level = dim - 1 if dim > 0 else 0 channels = getattr(obj, axis).unique(level) channel_indexer = channels.get_indexer(channel_index) if any(channel_indexer < 0): unmatch = channel_index[channel_indexer < 0] raise KeyError(f"Channels {unmatch} not in {key}.") channel_index = channel_indexer x = x.take(channel_index, dim) pattern = in_pattern + ' -> ' + pattern x = broadcast(x, pattern, t=self.length, n=self.n_nodes, time_index=time_index, node_index=node_index) return x def get_frame(self, channels: Union[str, List, Dict[str, Union[str, int, List, None]]] = None, node_index: Union[List, np.ndarray] = None, time_index: Union[List, np.ndarray] = None, cat_dim: Optional[int] = -1, return_pattern: bool = True, as_numpy: bool = True): # parse channels if channels is None: # defaults to all data channels = list(self.patterns.keys()) elif isinstance(channels, str): channels = [channels] # build a channel index for each queried key if not isinstance(channels, dict): channels = {key: None for key in channels} else: channels = {key: ensure_list(chn) for key, chn in channels.items()} time_index = self._get_time_index(time_index) node_index = self._get_node_index(node_index) pattern = outer_pattern([self.patterns[key] for key in channels]) frames = [ self.expand_frame(key, pattern, time_index, node_index, channel_index=channel_index) for key, channel_index in channels.items() ] if cat_dim is not None: frames = np.concatenate(frames, axis=cat_dim) if not as_numpy: time_index = self._get_time_index(time_index, layout="slice") node_index = self._get_node_index(node_index, layout="slice") assert self.is_target_dataframe idxs, names = [], [] for dim in pattern.replace(' ', ''): if dim == 't': idxs.append(self.index[time_index]) names.append('index') elif dim == 'n': idxs.append(self.nodes[node_index]) names.append('nodes') else: # dim = 'f' channel_index = [] for key, chn in channels.items(): if chn is None: obj = getattr(self, key) dim = self.patterns[key].split(' ').index('f') if isinstance(obj, pd.DataFrame): axis = 'columns' if dim > 0 else 'index' level = dim - 1 if dim > 0 else 0 chn = getattr(obj, axis).unique(level) else: chn = np.arange(obj.shape[dim]) channel_index.extend([f"{key}/{c}" for c in chn]) idxs.append(channel_index) names.append('nodes') index = pd.Index(idxs.pop(0), name=names.pop(0)) columns = pd.MultiIndex.from_product(idxs, names=names) frames = pd.DataFrame(frames.reshape(frames.shape[0], -1), index=index, columns=columns) if return_pattern: return frames, pattern return frames # Private getters ######################################################### def _get_time_index(self, time_index=None, layout='index'): if time_index is None: return slice(None) if layout == 'slice' else None if isinstance(time_index, slice): if layout == 'slice': return time_index time_index = np.arange( max(time_index.start or 0, 0), min(time_index.stop or len(self), len(self)), time_index.step or 1) elif isinstance(time_index, pd.Index): assert self.is_target_dataframe time_indexer = self.index.get_indexer(time_index) if any(time_indexer < 0): unmatch = time_index[time_indexer < 0] raise KeyError(f"Indices {unmatch} not in index.") time_index = time_indexer time_index = np.asarray(time_index) if layout == 'mask': mask = np.zeros_like(self.index, dtype=bool) mask[time_index] = True return mask return time_index def _get_node_index(self, node_index=None, layout='index'): if node_index is None: return slice(None) if layout == 'slice' else None if isinstance(node_index, slice): if layout == 'slice': return node_index node_index = np.arange( max(node_index.start or 0, 0), min(node_index.stop or self.n_nodes, self.n_nodes), node_index.step or 1) elif isinstance(node_index, pd.Index): assert self.is_target_dataframe node_indexer = self.nodes.get_indexer(node_index) if any(node_indexer < 0): unmatch = node_index[node_indexer < 0] raise KeyError(f"Indices {unmatch} not in nodes.") node_index = node_indexer node_index = np.asarray(node_index) if layout == 'mask': mask = np.zeros_like(self.nodes, dtype=bool) mask[node_index] = True return mask return node_index # Aggregation methods ##################################################### def aggregate_(self, node_index: Optional[Union[Index, Mapping]] = None, aggr: str = None, mask_tolerance: float = 0.): # get aggregation function among numpy functions aggr = aggr if aggr is not None else self.spatial_aggregation aggr_fn = getattr(np, aggr) # node_index parsing: eventually must be an n_nodes-sized array where # value at position i is the cluster id of i-th node if node_index is None: # if not provided, aggregate all nodes together, with cluster id 0 node_index = np.zeros(self.n_nodes) # otherwise, node_index can be a mapping {cluster_id: [nodes]} # the set of all nodes in mapping values must be equal to dataset nodes elif isinstance(node_index, Mapping): ids, groups = [], [] for group_id, group in node_index.items(): ids += [group_id] * len(group) groups += list(group) assert set(groups) == set(self.nodes) # reorder node_index according to nodes order in dataset ids, groups = np.array(ids), np.array(groups) _, order = np.where(self.nodes[:, None] == groups) node_index = ids[order] else: node_index = np.asarray(node_index) assert len(node_index) == self.n_nodes # aggregate main dataframe self.target = aggregate(self.target, node_index, aggr_fn) # aggregate mask (if node-wise) and threshold aggregated value if self.has_mask: mask = aggregate(self.mask, node_index, np.mean) mask = mask >= (1. - mask_tolerance) self.set_mask(mask) # aggregate all node-level exogenous for name, attr in self._covariates.items(): value, pattern = attr['value'], attr['pattern'] dims = pattern.strip().split(' ') if dims[0] == 'n': value = aggregate(value, node_index, aggr_fn, axis=0) for lvl, dim in enumerate(dims[1:]): if dim == 'n': value = aggregate(value, node_index, aggr_fn, axis=1, level=lvl) self._covariates[name]['value'] = value return self
[docs] def aggregate(self, node_index: Optional[Union[Index, Mapping]] = None, aggr: str = None, mask_tolerance: float = 0.): ds = deepcopy(self) ds.aggregate_(node_index, aggr, mask_tolerance) return ds
def reduce_(self, time_index=None, node_index=None): time_index = self._get_time_index(time_index, layout='mask') node_index = self._get_node_index(node_index, layout='mask') try: self.target = reduce(self.target, time_index, axis=0) self.target = reduce(self.target, node_index, axis=1, level=0) if self.has_mask: self.mask = reduce(self.mask, time_index, axis=0) self.mask = reduce(self.mask, node_index, axis=1, level=0) for name, attr in self._covariates.items(): value, pattern = attr['value'], attr['pattern'] dims = pattern.strip().split(' ') if dims[0] == 't': value = reduce(value, time_index, axis=0) elif dims[0] == 'n': value = reduce(value, node_index, axis=0) for lvl, dim in enumerate(dims[1:]): if dim == 't': value = reduce(value, time_index, axis=1, level=lvl) elif dim == 'n': value = reduce(value, node_index, axis=1, level=lvl) self._covariates[name]['value'] = value except Exception as e: raise e return self def reduce(self, time_index=None, node_index=None): self_copy = deepcopy(self) self_copy.reduce_(time_index, node_index) return self_copy @contextmanager def synchronize(self, flag=True): try: is_synch = self.force_synchronization self.force_synchronization = flag yield self finally: self.force_synchronization = is_synch # Preprocessing def fill_nan_(self, value: Optional[Union[Scalar, FrameArray]] = None, method: FillOptions = None, axis: int = 0): self.target = fill_nan(self.target, value, method, axis) # Representations
[docs] def dataframe(self) -> pd.DataFrame: if self.is_target_dataframe: return self.target.copy() data = self.target.reshape(self.length, -1) df = pd.DataFrame(data, columns=self._columns_multiindex()) return df
[docs] def numpy(self, return_idx=False) -> Union[ndarray, Tuple[ndarray, Index]]: if return_idx: return self.numpy(return_idx=False), self.index if self.is_target_dataframe: return self.dataframe().values.reshape(self.shape) return self.target
def copy(self) -> 'TabularDataset': return deepcopy(self)