import os
from typing import List, Optional, Sequence
import numpy as np
import pandas as pd
from tsl.data.datamodule.splitters import Splitter, disjoint_months
from tsl.data.synch_mode import HORIZON
from tsl.datasets.prototypes import DatetimeDataset
from tsl.datasets.prototypes.mixin import MissingValuesMixin
from tsl.utils import download_url, extract_zip
def infer_mask(df, infer_from='next'):
"""Infer evaluation mask from DataFrame. In the evaluation mask a value is 1
if it is present in the DataFrame and absent in the :obj:`infer_from` month.
Args:
df (pd.Dataframe): The DataFrame.
infer_from (str): Denotes from which month the evaluation value must be
inferred. Can be either :obj:`previous` or :obj:`next`.
Returns:
pd.DataFrame: The evaluation mask for the DataFrame.
"""
mask = (~df.isna()).astype('uint8')
eval_mask = pd.DataFrame(index=mask.index, columns=mask.columns,
data=0).astype('uint8')
if infer_from == 'previous':
offset = -1
elif infer_from == 'next':
offset = 1
else:
raise ValueError('`infer_from` can only be one of {}'.format(
['previous', 'next']))
months = sorted(set(zip(mask.index.year, mask.index.month)))
length = len(months)
for i in range(length):
j = (i + offset) % length
year_i, month_i = months[i]
year_j, month_j = months[j]
cond_j = (mask.index.year == year_j) & (mask.index.month == month_j)
mask_j = mask[cond_j]
offset_i = 12 * (year_i - year_j) + (month_i - month_j)
mask_i = mask_j.shift(1, pd.DateOffset(months=offset_i))
mask_i = mask_i[~mask_i.index.duplicated(keep='first')]
mask_i = mask_i[np.in1d(mask_i.index, mask.index)]
i_idx = mask_i.index
eval_mask.loc[i_idx] = ~mask_i.loc[i_idx] & mask.loc[i_idx]
return eval_mask
class AirQualitySplitter(Splitter):
def __init__(self,
val_len: int = None,
test_months: Sequence = (3, 6, 9, 12)):
super(AirQualitySplitter, self).__init__()
self._val_len = val_len
self.test_months = test_months
def fit(self, dataset):
nontest_idxs, test_idxs = disjoint_months(dataset,
months=self.test_months,
synch_mode=HORIZON)
# take equal number of samples before each month of testing
val_len = self._val_len
if val_len < 1:
val_len = int(val_len * len(nontest_idxs))
val_len = val_len // len(self.test_months)
# get indices of first day of each testing month
delta = np.diff(test_idxs)
delta_idxs = np.flatnonzero(delta > delta.min())
end_month_idxs = test_idxs[1:][delta_idxs]
if len(end_month_idxs) < len(self.test_months):
end_month_idxs = np.insert(end_month_idxs, 0, test_idxs[0])
# expand month indices
month_val_idxs = [
np.arange(v_idx - val_len, v_idx) - dataset.window
for v_idx in end_month_idxs
]
val_idxs = np.concatenate(month_val_idxs) % len(dataset)
# remove overlapping indices from training set
ovl_idxs, _ = dataset.overlapping_indices(nontest_idxs,
val_idxs,
synch_mode=HORIZON,
as_mask=True)
train_idxs = nontest_idxs[~ovl_idxs]
self.set_indices(train_idxs, val_idxs, test_idxs)
[docs]class AirQuality(DatetimeDataset, MissingValuesMixin):
r"""Measurements of pollutant :math:`PM2.5` collected by 437 air quality
monitoring stations spread across 43 Chinese cities from May 2014 to April
2015.
The dataset contains also a smaller version :obj:`AirQuality(small=True)`
with only the subset of nodes containing the 36 sensors in Beijing.
Data collected inside the `Urban Air
<https://www.microsoft.com/en-us/research/project/urban-air/>`_ project.
Dataset size:
+ Time steps: 8760
+ Nodes: 437
+ Channels: 1
+ Sampling rate: 1 hour
+ Missing values: 25.67%
Static attributes:
+ :obj:`dist`: :math:`N \times N` matrix of node pairwise distances.
"""
url = "https://drive.switch.ch/index.php/s/W0fRqotjHxIndPj/download"
similarity_options = {'distance'}
def __init__(self,
root: str = None,
impute_nans: bool = True,
small: bool = False,
test_months: Sequence = (3, 6, 9, 12),
infer_eval_from: str = 'next',
freq: Optional[str] = None,
masked_sensors: Optional[Sequence] = None):
self.root = root
self.small = small
self.test_months = test_months
self.infer_eval_from = infer_eval_from # [next, previous]
if masked_sensors is None:
self.masked_sensors = []
else:
self.masked_sensors = list(masked_sensors)
df, mask, eval_mask, dist = self.load(impute_nans=impute_nans)
super().__init__(target=df,
mask=mask,
freq=freq,
similarity_score='distance',
temporal_aggregation='mean',
spatial_aggregation='mean',
default_splitting_method='air_quality',
name='AQI36' if self.small else 'AQI')
self.add_covariate('dist', dist, pattern='n n')
self.set_eval_mask(eval_mask)
@property
def raw_file_names(self) -> List[str]:
return ['full437.h5', 'small36.h5']
@property
def required_file_names(self) -> List[str]:
return self.raw_file_names + ['aqi_dist.npy']
[docs] def download(self):
path = download_url(self.url, self.root_dir, 'data.zip')
extract_zip(path, self.root_dir)
os.unlink(path)
[docs] def build(self):
self.maybe_download()
# compute distances from latitude and longitude degrees
path = os.path.join(self.root_dir, 'full437.h5')
stations = pd.DataFrame(pd.read_hdf(path, 'stations'))
st_coord = stations.loc[:, ['latitude', 'longitude']]
from tsl.ops.similarities import geographical_distance
dist = geographical_distance(st_coord, to_rad=True).values
np.save(os.path.join(self.root_dir, 'aqi_dist.npy'), dist)
[docs] def load_raw(self):
self.maybe_build()
dist = np.load(os.path.join(self.root_dir, 'aqi_dist.npy'))
if self.small:
path = os.path.join(self.root_dir, 'small36.h5')
eval_mask = pd.read_hdf(path, 'eval_mask')
dist = dist[:36, :36]
else:
path = os.path.join(self.root_dir, 'full437.h5')
eval_mask = None
df = pd.read_hdf(path, 'pm25')
return pd.DataFrame(df), dist, eval_mask
[docs] def load(self, impute_nans=True):
# load readings and stations metadata
df, dist, eval_mask = self.load_raw()
# compute the masks:
mask = (~np.isnan(df.values)).astype('uint8') # 1 if value is valid
if eval_mask is None:
eval_mask = infer_mask(df, infer_from=self.infer_eval_from)
# 1 if value is ground-truth for imputation
eval_mask = eval_mask.values.astype('uint8')
if len(self.masked_sensors):
eval_mask[:, self.masked_sensors] = mask[:, self.masked_sensors]
# eventually replace nans with weekly mean by hour
if impute_nans:
from tsl.ops.framearray import temporal_mean
df = df.fillna(temporal_mean(df))
return df, mask, eval_mask, dist
[docs] def get_splitter(self, method: Optional[str] = None, **kwargs):
if method == 'air_quality':
val_len = kwargs.get('val_len')
return AirQualitySplitter(test_months=self.test_months,
val_len=val_len)
[docs] def compute_similarity(self, method: str, **kwargs):
if method == "distance":
from tsl.ops.similarities import gaussian_kernel
# use same theta for both air and air36
theta = np.std(self.dist[:36, :36])
return gaussian_kernel(self.dist, theta=theta)