from copy import deepcopy
import numpy as np
import pandas as pd
from tsl import logger
from tsl.datasets.prototypes import TabularDataset
from tsl.datasets.prototypes.mixin import MissingValuesMixin
from tsl.utils.python_utils import ensure_list
def sample_mask(shape,
p: float = 0.002,
p_noise: float = 0.,
max_seq: int = 1,
min_seq: int = 1,
rng: np.random.Generator = None,
verbose: bool = True):
if rng is None:
rand = np.random.random
randint = np.random.randint
else:
rand = rng.random
randint = rng.integers
if verbose:
logger.info(f'Generating mask with base p={p}')
mask = rand(shape) < p
for col in range(mask.shape[1]):
idxs = np.flatnonzero(mask[:, col])
if not len(idxs):
continue
fault_len = min_seq
if max_seq > min_seq:
fault_len = fault_len + int(randint(max_seq - min_seq))
idxs_ext = np.concatenate([np.arange(i, i + fault_len) for i in idxs])
idxs = np.unique(idxs_ext)
idxs = np.clip(idxs, 0, shape[0] - 1)
mask[idxs, col] = True
mask = mask | (rand(mask.shape) < p_noise)
return mask.astype('uint8')
def missing_val_lens(mask):
m = np.concatenate([
np.zeros((1, mask.shape[1])), (~mask.astype('bool')).astype('int'),
np.zeros((1, mask.shape[1]))
])
mdiff = np.diff(m, axis=0)
lens = []
for c in range(m.shape[1]):
mj, = mdiff[:, c].nonzero()
diff = np.diff(mj)[::2]
lens.extend(list(diff))
return lens
def to_missing_values_dataset(dataset: TabularDataset,
eval_mask: np.ndarray,
inplace: bool = True):
assert isinstance(dataset, TabularDataset)
if not inplace:
dataset = deepcopy(dataset)
# Dynamically inherit from MissingValuesDataset
bases = tuple([dataset.__class__, MissingValuesMixin])
cls_name = "MissingValues%s" % dataset.__class__.__name__
dataset.__class__ = type(cls_name, tuple(bases), {})
# Change dataset name
dataset.name = "MissingValues%s" % dataset.name
dataset.set_eval_mask(eval_mask)
return dataset
def add_missing_values(dataset: TabularDataset,
p_noise=0.05,
p_fault=0.01,
min_seq=1,
max_seq=10,
seed=None,
inplace=True):
if seed is None:
seed = np.random.randint(1e9)
# Fix seed for random mask generation
random = np.random.default_rng(seed)
# Compute evaluation mask
shape = (dataset.length, dataset.n_nodes, dataset.n_channels)
eval_mask = sample_mask(shape,
p=p_fault,
p_noise=p_noise,
min_seq=min_seq,
max_seq=max_seq,
rng=random)
# Convert to missing values dataset
dataset = to_missing_values_dataset(dataset, eval_mask, inplace)
# Store evaluation mask params in dataset
dataset.p_fault = p_fault
dataset.p_noise = p_noise
dataset.min_seq = min_seq
dataset.max_seq = max_seq
dataset.seed = seed
dataset.random = random
return dataset
[docs]def prediction_dataframe(y, index, columns=None, aggregate_by='mean'):
"""Aggregate batched predictions in a single DataFrame.
Args:
y (list or np.ndarray): The list of predictions.
index (list or np.ndarray): The list of time indexes coupled with
the predictions.
columns (list or pd.Index): The columns of the returned DataFrame.
aggregate_by (str or list): How to aggregate the predictions in case
there are more than one for a step.
- `mean`: take the mean of the predictions;
- `central`: take the prediction at the central position, assuming
that the predictions are ordered chronologically;
- `smooth_central`: average the predictions weighted by a gaussian
signal with std=1.
Returns:
pd.DataFrame: The evaluation mask for the DataFrame.
"""
dfs = [
pd.DataFrame(data=data.reshape(data.shape[:2]),
index=idx,
columns=columns) for data, idx in zip(y, index)
]
df = pd.concat(dfs)
preds_by_step = df.groupby(df.index)
# aggregate according passed methods
aggr_methods = ensure_list(aggregate_by)
dfs = []
for aggr_by in aggr_methods:
if aggr_by == 'mean':
dfs.append(preds_by_step.mean())
elif aggr_by == 'central':
dfs.append(preds_by_step.aggregate(lambda x: x[int(len(x) // 2)]))
elif aggr_by == 'smooth_central':
from scipy.signal import gaussian
dfs.append(
preds_by_step.aggregate(
lambda x: np.average(x, weights=gaussian(len(x), 1))))
elif aggr_by == 'last':
# first imputation has missing value in last position
dfs.append(preds_by_step.aggregate(lambda x: x[0]))
else:
raise ValueError("aggregate_by can only be one of "
"['mean', 'central', 'smooth_central', 'last']")
if isinstance(aggregate_by, str):
return dfs[0]
return dfs