Source code for tsl.ops.framearray

from typing import Any, Callable, Optional, Union

import numpy as np
import pandas as pd
import torch

import tsl
from tsl.typing import FillOptions, FrameArray, Index, Scalar


def framearray_to_numpy(x: FrameArray) -> np.ndarray:
    if isinstance(x, pd.DataFrame):
        if x.columns.nlevels == 1:
            return x.to_numpy()
        cols = [x.columns.unique(i) for i in range(x.columns.nlevels)]
        cols = pd.MultiIndex.from_product(cols)
        if not x.columns.equals(cols):
            x = x.reindex(columns=cols)
        return x.values.reshape((-1, *cols.levshape))
    return np.asarray(x)


def framearray_to_tensor(x: FrameArray) -> torch.Tensor:
    x_numpy = framearray_to_numpy(x)
    return torch.Tensor(x_numpy)


def framearray_to_dataframe(x: FrameArray, index=None, columns=None) \
        -> pd.DataFrame:
    if isinstance(x, pd.DataFrame):
        return x
    x = np.asarray(x)
    h, *w = x.shape
    x = x.reshape((h, -1))
    if columns is None and len(w) > 1:
        columns = pd.MultiIndex.from_product([range(size) for size in w])
    x = pd.DataFrame(x, index, columns)
    return x


def framearray_shape(x: FrameArray) -> tuple:
    if not isinstance(x, pd.DataFrame):
        return np.asarray(x).shape
    elif x.columns.nlevels > 1:
        return (len(x), ) + x.columns.levshape
    return x.shape


[docs]def aggregate(x: FrameArray, index: Index, aggr_fn: Callable = np.sum, axis: int = 1, level: int = 0) -> FrameArray: """Aggregate rows/columns in (MultiIndexed) DataFrame according to a new index. Args: x (pd.DataFrame): :class:`~pandas.DataFrame` to be aggregated. index (Index): A sequence of :obj:`cluster_id` with length equal to the index over which aggregation is performed. The :obj:`i`-th element of index at :obj:`axis` and :obj:`level` will be mapped to :obj:`index[i]`-th position in new index. aggr_fn (Callable): Function to be used for aggregation. axis (int): Axis over which performing aggregation, :obj:`0` for index, :obj:`1` for columns. (default :obj:`1`) level (int): Level over which performing aggregation if :obj:`axis` is a :class:`~pandas.MultiIndex`. (default :obj:`0`) """ to_numpy = False if not isinstance(x, pd.DataFrame): x = framearray_to_dataframe(x) if axis > 1: axis, level = 1, axis - 1 to_numpy = True if axis == 0: x = x.groupby(index, axis=0).aggregate(aggr_fn) elif axis == 1: cols = [x.columns.unique(i).values for i in range(x.columns.nlevels)] cols[level] = index grouper = pd.MultiIndex.from_product(cols, names=x.columns.names) x = x.groupby(grouper, axis=1).aggregate(aggr_fn) x.columns = pd.MultiIndex.from_tuples(x.columns, names=grouper.names) if to_numpy: x = framearray_to_numpy(x) return x
def reduce(x: FrameArray, index: Index, axis: int = 0, level: int = 0) -> FrameArray: if index is None: return x elif not isinstance(index, (pd.Index, slice)): index: np.ndarray = np.asarray(index) if isinstance(x, pd.DataFrame): if axis == 0: return x.loc[index] n_levels = x.columns.nlevels if n_levels > 1: if index.dtype == bool: index = x.columns.unique(level)[index] index = tuple([ index if i == level else slice(None) for i in range(n_levels) ]) return x.loc[:, index] else: axis = axis + level index = tuple( [index if i == axis else slice(None) for i in range(x.ndim)]) return x[index] def fill_nan(x: FrameArray, value: Optional[Union[Scalar, FrameArray]] = None, method: FillOptions = None, axis: int = 0) -> FrameArray: assert axis in [0, 1] to_numpy = False if not isinstance(x, pd.DataFrame): x = framearray_to_dataframe(x) to_numpy = True if method == 'mean': x = x.fillna(value=x.mean(axis=axis), axis=axis, inplace=False) elif method == 'linear': x = x.interpolate("linear", axis=axis, inplace=False) else: x = x.fillna(value=value, method=method, axis=axis, inplace=False) if to_numpy: x = framearray_to_numpy(x) return x
[docs]def temporal_mean(x: FrameArray, index: pd.DatetimeIndex = None) \ -> FrameArray: """Compute the mean values for each row. The mean is first computed hourly over the week of the year. Further :obj:`NaN` values are imputed using hourly mean over the same month through the years. If other :obj:`NaN` are present, they are replaced with the mean of the sole hours. Remaining missing values are filled with :obj:`ffill` and :obj:`bfill`. Args: x (np.array | pd.Dataframe): Array-like with missing values. index (pd.DatetimeIndex, optional): Temporal index if x is not a :obj:'~pandas.Dataframe' with a temporal index. Must have same length as :obj:`x`. (default :obj:`None`) """ if index is not None: if not isinstance(index, pd.DatetimeIndex): # try casting index = pd.to_datetime(index) assert len(index) == len(x) if isinstance(x, pd.DataFrame): # override index of x df_mean = x.copy().set_index(index) else: # try casting to np.ndarray x = np.asarray(x) shape = x.shape # x can be N-dimensional, we flatten all but the first dimensions x = x.reshape((shape[0], -1)) df_mean = pd.DataFrame(x, index=index) elif isinstance(x, pd.DataFrame): df_mean = x.copy() else: raise TypeError("`x` must be a pd.Dataframe or a np.ndarray.") cond0 = [ df_mean.index.year, df_mean.index.isocalendar().week, df_mean.index.hour ] cond1 = [df_mean.index.year, df_mean.index.month, df_mean.index.hour] conditions = [cond0, cond1, cond1[1:], cond1[2:]] while df_mean.isna().values.sum() and len(conditions): nan_mean = df_mean.groupby(conditions[0]).transform(np.nanmean) df_mean = df_mean.fillna(nan_mean) conditions = conditions[1:] if df_mean.isna().values.sum(): df_mean = df_mean.fillna(method='ffill') df_mean = df_mean.fillna(method='bfill') if isinstance(x, np.ndarray): df_mean = df_mean.values.reshape(shape) return df_mean
[docs]def get_trend(df, period='week', train_len=None, valid_mask=None): """Perform detrending on a time series by subtrating from each value of the input dataframe the average value computed over the training dataset for each hour/weekday. Args: df: dataframe period: period of the trend ('day', 'week', 'month') train_len: train length Returns: tuple: the detrended dataset and the trend values """ df = df.copy() if train_len is not None: df[train_len:] = np.nan if valid_mask is not None: df[~valid_mask] = np.nan idx = [df.index.hour, df.index.minute] if period == 'week': idx = [ df.index.weekday, ] + idx elif period == 'month': idx = [df.index.month, df.index.weekday] + idx elif period != 'day': raise NotImplementedError("Period must be in ('day', 'week', 'month')") means = df.groupby(idx).transform(np.nanmean) return df - means, means
[docs]def normalize(x: FrameArray, by: Any = None, axis: int = 0, level: int = 0): r"""Normalize input :class:`~numpy.ndarray` or :class:`~pandas.DataFrame` using mean and standard deviation. If :obj:`x` is a :class:`~pandas.DataFrame`, normalization can be done on a specific group. Args: x (FrameArray): the FrameArray to be normalized. by: the conditions used to determine the groups for the :meth:`~pandas.DataFrame.groupby`. (default :obj:`None`) axis (int): axis for the function to be applied on. (default 0) level (int): level of axis for the function to be applied on (for MultiIndexed DataFrames). (default 0) Returns: FrameArray: the normalized FrameArray """ if isinstance(x, pd.DataFrame): if by is not None: groups = x.groupby(by) mean = groups.transform(np.nanmean) std = groups.transform(np.nanstd) x = x[mean.columns] else: mean = x.mean(axis=axis, level=level, skipna=True) std = x.std(axis=axis, level=level, skipna=True) else: x = np.asarray(x) mean = x.mean(axis=axis, keepdims=True) std = x.std(axis=axis, keepdims=True) return (x - mean) / (std + tsl.epsilon)