import os.path
from copy import deepcopy
from typing import List, Optional, Tuple, Union
import numpy as np
import torch
from einops import rearrange
from scipy import stats
from torch import Tensor
from torch.nn import Module
from torch_geometric.data.storage import recursive_apply
import tsl
from tsl.ops.pattern import check_pattern, outer_pattern, take
from tsl.typing import TensArray
__all__ = [
'Scaler',
'StandardScaler',
'MinMaxScaler',
'RobustScaler',
'ScalerModule',
]
def zeros_to_one_(scale):
"""Set to 1 scales of near constant features, detected by identifying
scales close to machine precision, in place.
Adapted from :class:`sklearn.preprocessing._data._handle_zeros_in_scale`
"""
if np.isscalar(scale):
return 1.0 if np.isclose(scale, 0.) else scale
eps = 10 * np.finfo(scale.dtype).eps
zeros = np.isclose(scale, 0., atol=eps, rtol=eps)
scale[zeros] = 1.0
return scale
def fit_wrapper(fit_function):
def fit(obj: "Scaler", x, *args, **kwargs) -> "Scaler":
x_type = type(x)
x = np.asarray(x)
fit_function(obj, x, *args, **kwargs)
if x_type is Tensor:
obj.torch()
return obj
return fit
[docs]class Scaler:
r"""Base class for linear :class:`~tsl.data.SpatioTemporalDataset` scalers.
A :class:`~tsl.data.preprocessing.Scaler` is the base class for
linear scaler objects. A linear scaler apply a linear transformation to the
input using parameters `bias` :math:`\mu` and `scale` :math:`\sigma`:
.. math::
f(x) = (x - \mu) / \sigma.
Args:
bias (float): the offset of the linear transformation.
(default: 0.)
scale (float): the scale of the linear transformation.
(default: 1.)
"""
def __init__(self, bias=0., scale=1.):
self.bias = bias
self.scale = scale
super(Scaler, self).__init__()
def __repr__(self) -> str:
sizes = []
for k, v in self.params().items():
param = f"{k}={tuple(v.shape) if hasattr(v, 'shape') else v}"
sizes.append(param)
return "{}({})".format(self.__class__.__name__, ', '.join(sizes))
def __call__(self, *args, **kwargs):
return self.transform(*args, **kwargs)
[docs] def params(self) -> dict:
"""Dictionary of the scaler parameters `bias` and `scale`.
Returns:
dict: Scaler's parameters `bias` and `scale.`
"""
return dict(bias=self.bias, scale=self.scale)
def torch(self, inplace=True):
scaler = self
if not inplace:
scaler = deepcopy(self)
for name, param in scaler.params().items():
if not isinstance(param, Tensor):
param = torch.as_tensor(param, dtype=torch.float32)
param = torch.atleast_1d(param)
setattr(scaler, name, param)
return scaler
[docs] def numpy(self, inplace=True):
r"""Transform all tensors to numpy arrays."""
scaler = self
if not inplace:
scaler = deepcopy(self)
for name, param in scaler.params().items():
if isinstance(param, Tensor):
param = param.detach().cpu().numpy()
setattr(scaler, name, param)
return scaler
@fit_wrapper
def fit(self, x: TensArray, *args, **kwargs):
"""Fit scaler's parameters using input :obj:`x`."""
raise NotImplementedError()
[docs] def save(self, filename: str, make_dir: bool = True) -> str:
"""Save the scaler to disk.
Args:
filename (str): The path to the filename for storage.
make_dir (bool): If :obj:`True`, then create non-existing
directories in :attr:`filename`.
(default: :obj:`True`)
Returns:
str: The absolute path to the saved file.
"""
params = self.params()
is_torch = any(
[isinstance(param, Tensor) for param in params.values()])
filename = os.path.abspath(filename)
if make_dir:
os.makedirs(os.path.dirname(filename), exist_ok=True)
if is_torch:
if not filename.endswith('.pt'):
filename = filename + '.pt'
torch.save(params, filename)
else:
np.savez_compressed(filename, **params)
return os.path.abspath(filename)
[docs] @classmethod
def load(cls, filename: str) -> "Scaler":
"""Load instance of this type of scaler from disk.
Args:
filename (str): The path to the scaler file.
"""
ext = filename.split('.')[-1]
if ext == 'npz':
params = np.load(filename)
elif ext == 'pt':
params = torch.load(filename)
else:
raise RuntimeError(
f"Filename {filename} is not in a valid format.")
return cls(**params)
[docs]class StandardScaler(Scaler):
"""Apply standardization to data by removing mean and scaling to unit
variance.
Args:
axis (int): dimensions of input to fit parameters on.
(default: 0)
"""
def __init__(self, axis: Union[int, Tuple] = 0):
super(StandardScaler, self).__init__()
self.axis = axis
@fit_wrapper
def fit(self, x: TensArray, mask=None, keepdims=True):
r"""Fit scaler's parameters `bias` :math:`\mu` and `scale`
:math:`\sigma` as the mean and the standard deviation of :obj:`x`,
respectively.
Args:
x: array-like input
mask (optional): boolean mask to denote elements of :obj:`x` on
which to fit the parameters.
(default: :obj:`None`)
keepdims (bool): whether to keep the same dimensions as :obj:`x` in
the parameters.
(default: :obj:`True`)
"""
if mask is not None:
x = np.where(mask, x, np.nan)
self.bias = np.nanmean(x.astype(np.float32),
axis=self.axis,
keepdims=keepdims).astype(x.dtype)
self.scale = np.nanstd(x.astype(np.float32),
axis=self.axis,
keepdims=keepdims).astype(x.dtype)
else:
self.bias = x.mean(axis=self.axis, keepdims=keepdims)
self.scale = x.std(axis=self.axis, keepdims=keepdims)
self.scale = zeros_to_one_(self.scale)
return self
[docs]class MinMaxScaler(Scaler):
"""Rescale data such that all lay in the specified range (default is
:math:`[0,1]`).
Args:
axis (int): dimensions of input to fit parameters on.
(default: 0)
out_range (tuple): output range of transformed data.
(default: :obj:`(0, 1)`)
"""
def __init__(self,
axis: Union[int, Tuple] = 0,
out_range: Tuple[float, float] = (0., 1.)):
super(MinMaxScaler, self).__init__()
self.axis = axis
self.out_range = out_range
@fit_wrapper
def fit(self, x: TensArray, mask=None, keepdims=True):
r"""Fit scaler's parameters `bias` :math:`\mu` and `scale`
:math:`\sigma` as the mean and the standard deviation of :obj:`x`.
Args:
x: array-like input
mask (optional): boolean mask to denote elements of :obj:`x` on
which to fit the parameters.
(default: :obj:`None`)
keepdims (bool): whether to keep the same dimensions as :obj:`x` in
the parameters.
(default: :obj:`True`)
"""
out_min, out_max = self.out_range
if out_min >= out_max:
raise ValueError(
"Output range minimum must be smaller than maximum. Got {}.".
format(self.out_range))
if mask is not None:
x = np.where(mask, x, np.nan)
x_min = np.nanmin(x.astype(np.float32),
axis=self.axis,
keepdims=keepdims).astype(x.dtype)
x_max = np.nanmax(x.astype(np.float32),
axis=self.axis,
keepdims=keepdims).astype(x.dtype)
else:
x_min = x.min(axis=self.axis, keepdims=keepdims)
x_max = x.max(axis=self.axis, keepdims=keepdims)
scale = (x_max - x_min) / (out_max - out_min)
scale = zeros_to_one_(scale)
bias = x_min - out_min * scale
self.bias, self.scale = bias, scale
return self
[docs]class RobustScaler(Scaler):
r"""Removes the median and scales the data according to the quantile range.
Default range is the Interquartile Range (IQR), i.e., the range between the
1st quartile (25th quantile) and the 3rd quartile (75th quantile).
Args:
axis (int): dimensions of input to fit parameters on.
(default: 0)
quantile_range (tuple): quantile range :math:`(q_{\min}, q_{\max})`,
with :math:`0.0 < q_{\min} < q_{\max} < 100.0`, used to calculate
:obj:`scale`.
(default: :obj:`(25.0, 75.0)`)
"""
def __init__(self,
axis: Union[int, Tuple] = 0,
quantile_range: Tuple[float, float] = (25.0, 75.0),
unit_variance: bool = False):
super(RobustScaler, self).__init__()
self.axis = axis
self.quantile_range = quantile_range
self.unit_variance = unit_variance
@fit_wrapper
def fit(self, x: TensArray, mask=None, keepdims=True):
r"""Fit scaler's parameters `bias` :math:`\mu` and `scale`
:math:`\sigma` as the median and difference between quantiles of
:obj:`x`, respectively.
Args:
x: array-like input
mask (optional): boolean mask to denote elements of :obj:`x` on
which to fit the parameters.
(default: :obj:`None`)
keepdims (bool): whether to keep the same dimensions as :obj:`x` in
the parameters.
(default: :obj:`True`)
"""
q_min, q_max = self.quantile_range
if not 0 <= q_min <= q_max <= 100:
raise ValueError("Invalid quantile range: {}".format(
self.quantile_range))
dtype = x.dtype
if mask is not None:
x = np.where(mask, x, np.nan).astype(np.float32)
self.bias = np.nanmedian(x, axis=self.axis,
keepdims=keepdims).astype(dtype)
min_q, max_q = np.nanpercentile(x,
self.quantile_range,
axis=self.axis,
keepdims=keepdims)
else:
self.bias = np.median(x, axis=self.axis, keepdims=keepdims)
min_q, max_q = np.percentile(x,
self.quantile_range,
axis=self.axis,
keepdims=keepdims)
self.scale = (max_q - min_q).astype(dtype)
self.scale = zeros_to_one_(self.scale)
if self.unit_variance:
adjust = stats.norm.ppf(q_max / 100.0) - stats.norm.ppf(
q_min / 100.0)
self.scale = self.scale / adjust
return self
[docs]class ScalerModule(Module):
r"""Converts a :class:`Scaler` to a :class:`torch.nn.Module`, to insert
transformation parameters and functions into the minibatch."""
def __init__(self,
scaler: Optional[Union[Scaler, "ScalerModule"]] = None,
*,
bias: Union[Tensor, float] = 0.,
scale: Union[Tensor, float] = 1.,
pattern: Optional[str] = None):
super(ScalerModule, self).__init__()
self.training = False
self.inherited_from = None
self.t_axis = None # axis of time dimension
self.n_axis = None # axis of node dimension
# initialize from scaler (if any)
if isinstance(scaler, Scaler):
scaler = scaler.torch()
self.inherited_from = scaler.__class__
elif isinstance(scaler, ScalerModule):
self.inherited_from = scaler.inherited_from
pattern = scaler.pattern
if scaler is not None:
bias = scaler.bias
scale = scaler.scale
# register scaling params as non-trainable parameters
self.bias = bias
self.scale = scale
self.pattern = pattern
def __call__(self, *args, **kwargs):
return self.transform(*args, **kwargs)
def __setattr__(self, key, value):
if key in ['bias', 'scale']:
if isinstance(value, Tensor):
value = value.clone().detach()
else:
value = torch.tensor(value)
value = torch.atleast_1d(value)
self.register_buffer(key, value)
elif key == 'pattern':
self.set_pattern(value)
else:
super(ScalerModule, self).__setattr__(key, value)
def _get_name(self):
if self.inherited_from is not None:
return self.inherited_from.__name__ + 'Module'
return self.__class__.__name__
[docs] def params(self) -> dict:
"""Dictionary of the scaler parameters `bias` and `scale`.
Returns:
dict: Scaler's parameters `bias` and `scale.`
"""
return dict(bias=self.bias, scale=self.scale)
@property
def t(self) -> int or None:
"""Size of temporal dimension (:obj:`None` if time-invariant)."""
if self.pattern is not None and 't' in self.pattern:
# 't' is always in first dimension
return max(self.scale.size(0), self.bias.size(0))
@property
def n(self) -> int or None:
"""Size of node dimension (:obj:`None` if node-invariant)."""
if self.pattern is not None and 'n' in self.pattern:
return max(self.scale.size(self.n_axis),
self.bias.size(self.n_axis))
def set_pattern(self, value, check: bool = False):
if value is not None:
value = value.strip()
if value.count('n') > 1:
raise RuntimeError("ScalerModule does not support data with "
"multiple 'n' dimensions.")
if check:
value = check_pattern(value,
ndim=self.bias.ndim,
include_batch=True)
self.t_axis = 0 if 't' in value else None
if 'n' in value:
self.n_axis = value.split(' ').index('n')
self.__dict__['pattern'] = value
[docs] def numpy(self):
r"""Transform :class:`~tsl.data.preprocessing.ScalerModule` to
:class:`~tsl.data.preprocessing.Scaler`."""
b = self.bias.detach().cpu().numpy()
s = self.scale.detach().cpu().numpy()
return Scaler(bias=b, scale=s)
[docs] def rearrange(self, pattern: str, inplace=False, **axes_lengths) \
-> "ScalerModule":
r"""Rearrange parameters in the scaler according to the provided patter
using `einops.rearrange <https://einops.rocks/api/rearrange/>`_."""
if '->' in pattern:
start_pattern, end_pattern = pattern.split('->')
start_pattern = start_pattern.strip()
if self.pattern is not None and self.pattern != start_pattern:
raise RuntimeError(
f"Starting pattern {start_pattern} does not "
f"match with scaler patter {self.pattern}.")
self.pattern = start_pattern
else:
end_pattern = pattern
pattern = self.pattern + ' -> ' + pattern
b = rearrange(self.bias, pattern, **axes_lengths)
s = rearrange(self.scale, pattern, **axes_lengths)
if inplace:
self.bias, self.scale = b, s
self.pattern = end_pattern
return self
return ScalerModule(bias=b, scale=s, pattern=end_pattern.strip())
[docs] def slice(self,
time_index: Union[List, Tensor] = None,
node_index: Union[List, Tensor] = None):
"""Slice the parameters of the scaler with the given time and node
indices.
The scaler must have a pattern defining the dimensions of the
parameters. This operation is not in place, it always returns a new
:class:`~tsl.data.preprocessing.ScalerModule`. The parameters of the
new scaler have same size of the indices provided along the slicing
axes or 1 for the params with a single, broadcastable, value."""
if self.pattern is None:
raise RuntimeError("You are trying to slice a scaler with no "
"pattern.")
# move to new object
scaler = ScalerModule(self)
# shortcut for when scaler is time-unvarying and node_index is None
if time_index is None and node_index is None:
return scaler
# if time-unvarying scaler, just apply unsqueezing indexing
new_axes, pattern = None, scaler.pattern
if time_index is not None and time_index.ndim == 2:
new_axes = torch.zeros(1, 1, dtype=torch.long)
pattern = 'b ' + scaler.pattern
# compute actual slicing for each param
t, n = self.t_axis, self.n_axis # axis of time and node dimensions
ti_bias = ti_scale = time_index
ni_bias = ni_scale = node_index
if self.t_axis is not None:
ti_bias = time_index if self.bias.size(t) > 1 else new_axes
ti_scale = time_index if self.scale.size(t) > 1 else new_axes
if self.n_axis is not None:
ni_bias = node_index if self.bias.size(n) > 1 else None
ni_scale = node_index if self.scale.size(n) > 1 else None
# slice params
scaler.bias = take(scaler.bias,
self.pattern,
time_index=ti_bias,
node_index=ni_bias)
scaler.scale = take(scaler.scale,
self.pattern,
time_index=ti_scale,
node_index=ni_scale)
# update pattern
scaler.pattern = pattern
return scaler
@staticmethod
def cat_tensors(scalers, sizes, key, dim, fill_value):
# arrange tensors in numbered dictionary where if tensors[i] exists then
# the i-th scaler is not None and has a tensor at {scaler}.{key}
tensors = {
i: getattr(s, key)
for i, s in enumerate(scalers)
if s is not None and getattr(s, key) is not None
}
# if no valid tensor return
if len(tensors) == 0:
return None
# get dtype and device of first tensor and assume equal for all
elem = next(iter(tensors.values()))
dtype, device = elem.dtype, elem.device
# for each scaler (also the ones with no tensor to be concatenated)
# retrieve the tensor (or create one if not present) and the broadcast
# shape
out, shapes = [], []
for i, scaler in enumerate(scalers):
# retrieve tensor
tensor = tensors.get(i)
if tensor is None: # i.e., if scaler is None or has key=None
shape = [1] * len(sizes[i])
tensor = torch.full(shape,
fill_value,
dtype=dtype,
device=device)
out.append(tensor)
# compute broadcast shape
shape = list(tensor.size())
shape[dim] = sizes[i][dim]
shapes.append(shape)
# compute out shape as maximum shape in all dims but concat dim
expand_dims = list(np.max(shapes, 0))
# expand each tensor for output shape
for i, shape in enumerate(shapes):
expand_dims[dim] = shape[dim]
out[i] = out[i].expand(*expand_dims)
return torch.cat(out, dim=dim)
@classmethod
def cat(cls,
scalers: Union[List, Tuple],
dim: int = -1,
sizes: Union[List, Tuple] = None):
# if all scalers are None, return None
if all([scaler is None for scaler in scalers]):
return None
# if there are at least one scaler and one 'None', sizes must be a list
# containing the shape of the corresponding tensors
if None in scalers:
assert sizes is not None
# scale
scale = cls.cat_tensors(scalers, sizes, 'scale', dim, 1)
# bias
bias = cls.cat_tensors(scalers, sizes, 'bias', dim, 0)
# pattern
pattern = outer_pattern([
scaler.pattern for scaler in scalers
if scaler is not None and scaler.pattern is not None
])
return cls(bias=bias, scale=scale, pattern=pattern)