import math
from typing import Dict, List, Literal, Optional, Union
import numpy as np
import pandas as pd
from tsl.data import Splitter
from tsl.data.datamodule.splitters import indices_between
from tsl.datasets.prototypes import DatetimeDataset
from tsl.utils import download_url, ensure_list
class EngRadSplitter(Splitter):
def __init__(self,
val_len: int = None,
val_seq_len: int = 7,
first_val_step=(2019, 1, 1),
first_test_step=(2020, 1, 1)):
super(EngRadSplitter, self).__init__()
self._val_len = val_len
self.val_seq_len = val_seq_len
self.first_val_step = first_val_step
self.first_test_step = first_test_step
def fit(self, dataset):
# Get test indices
test_idxs = indices_between(dataset, first_ts=self.first_test_step)
# Get validation indices
if self.first_val_step is not None:
val_idxs = indices_between(dataset,
first_ts=self.first_val_step,
last_ts=self.first_test_step)
else:
val_idxs = np.setdiff1d(np.arange(len(dataset)), test_idxs)
# Remove validation indices overlapping with test indices
ovl_idxs, _ = dataset.overlapping_indices(val_idxs,
test_idxs,
synch_mode='window',
as_mask=True)
val_idxs = val_idxs[~ovl_idxs]
ovl_idxs, _ = dataset.overlapping_indices(val_idxs,
test_idxs,
synch_mode='horizon',
as_mask=True)
val_idxs = val_idxs[~ovl_idxs]
# Sparsify validation set according to val_len
val_len = self._val_len
if val_len < 1:
val_len = int(val_len * len(val_idxs))
# Take sparse sequences of self.val_seq_len
num_seq = math.ceil(val_len / self.val_seq_len)
seq_len = len(val_idxs) // num_seq
val_seq_start = seq_len - self.val_seq_len
seq_start_idx = val_idxs[val_seq_start::seq_len]
val_seq_idxs = np.ravel(seq_start_idx[:, None] +
np.arange(self.val_seq_len))
# Remove possibly out-of-bounds indices
val_idxs = np.intersect1d(val_seq_idxs, val_idxs)
# Use all other indices for training
train_idxs = np.arange(val_idxs[-1])
ovl_idxs, _ = dataset.overlapping_indices(train_idxs,
val_idxs,
synch_mode='window',
as_mask=True)
train_idxs = train_idxs[~ovl_idxs]
ovl_idxs, _ = dataset.overlapping_indices(train_idxs,
val_idxs,
synch_mode='horizon',
as_mask=True)
train_idxs = train_idxs[~ovl_idxs]
self.set_indices(train_idxs, val_idxs, test_idxs)
[docs]class EngRad(DatetimeDataset):
r"""The EngRAD dataset from the paper `"Graph-based Forecasting with
Missing Data through Spatiotemporal Downsampling"
<https://arxiv.org/abs/2402.10634>`_ (Marisca et al., ICML 2024).
The dataset consists of weather measurements collected hourly in 722 cities
spread across England from 2018 to 2020. The dataset is available through
`Zenodo <https://zenodo.org/records/12760772>`_.
Data provider: https://open-meteo.com/
Dataset size:
+ Time steps: 26304
+ Nodes: 487
+ Channels: 5
+ Sampling rate: 1 hour
+ Missing values: 0.00%
Channels:
+ ``temperature_2m``: Air temperature at 2 meters above ground (°C).
Instant.
+ ``relative_humidity_2m``: Relative humidity at 2 meters above ground
(%). Instant.
+ ``precipitation``: Total precipitation (rain, showers, snow) sum of
the preceding hour (mm). Preceding hour sum.
+ ``cloud_cover``: Total cloud cover as an area fraction (%). Instant.
+ ``shortwave_radiation``: Global horizontal irradiation (GHI) (W/m²).
Preceding hour mean.
Static attributes:
+ :obj:`metadata`: information associated to the locations.
+ :obj:`distances`: :math:`N \times N` matrix of pairwise distances
between the locations.
"""
url = "https://zenodo.org/records/12760772/files/data.h5?download=1"
similarity_options = {'distance', 'grid'}
def __init__(self,
root: str = None,
target_channels: Optional[Union[str, List[str]]] = 'all',
covariate_channels: Optional[Union[str, List[str]]] = None,
mask_zero_radiance: bool = False,
precipitation_unit: Literal["mm", "cm"] = "mm",
freq: Optional[str] = None):
self.root = root
self.mask_zero_radiance = mask_zero_radiance
self.precipitation_unit = precipitation_unit
# Load data
df, metadata, dist, mask = self.load(self.mask_zero_radiance)
# Set covariates
covariates = dict(metadata=(metadata, 'n f'), distances=(dist, 'n n'))
# Optionally filter channels
target = df
if target_channels is not None and target_channels != 'all':
target_channels = ensure_list(target_channels)
nodes = metadata.index
columns = pd.MultiIndex.from_product([nodes, target_channels])
target = df.loc[:, columns]
if mask is not None:
mask = mask.loc[:, columns]
# Optionally add covariates
if covariate_channels == 'all':
covariates['u'] = (df, 't n f')
elif covariate_channels is not None:
covariate_channels = ensure_list(covariate_channels)
nodes = metadata.index
columns = pd.MultiIndex.from_product([nodes, covariate_channels])
covariates['u'] = (df.loc[:, columns], 't n f')
super().__init__(target=target,
mask=mask,
covariates=covariates,
freq=freq,
similarity_score='distance',
temporal_aggregation='mean',
spatial_aggregation='mean',
name='EngRad')
@property
def raw_file_names(self) -> List[str]:
return ['data.h5']
@property
def required_file_names(self) -> Dict[str, str]:
return {'data': 'data.h5', 'distances': 'dist.npy'}
[docs] def download(self):
download_url(self.url, self.root_dir, 'data.h5')
[docs] def build(self):
self.maybe_download()
# compute distances from latitude and longitude degrees
path = self.required_files_paths['data']
metadata = pd.DataFrame(pd.read_hdf(path, 'metadata'))
coords = metadata.loc[:, ['lat', 'lon']]
from tsl.ops.similarities import geographical_distance
dist = geographical_distance(coords, to_rad=True).values
np.save(self.required_files_paths['distances'], dist)
[docs] def load_raw(self):
self.maybe_build()
df = pd.read_hdf(self.required_files_paths['data'], 'data')
metadata = pd.read_hdf(self.required_files_paths['data'], 'metadata')
dist = np.load(self.required_files_paths['distances'])
return pd.DataFrame(df), metadata, dist
[docs] def load(self, mask_zero_radiance: bool = False):
df, metadata, dist = self.load_raw()
if mask_zero_radiance:
mask = pd.DataFrame(True, index=df.index, columns=df.columns)
swr = df.loc[:, (slice(None), 'shortwave_radiation')] > 0
mask.loc[swr.index, swr.columns] = swr
else:
mask = None
if self.precipitation_unit == 'cm':
df.loc[:, (slice(None), 'precipitation')] /= 10
return df, metadata, dist, mask
[docs] def get_splitter(self, method: Optional[str] = None, **kwargs):
if method == 'engrad':
return EngRadSplitter(**kwargs)
[docs] def compute_similarity(self, method: str, **kwargs):
from tsl.ops.similarities import gaussian_kernel
if method == "distance":
theta = kwargs.get('theta', np.std(self.distances))
return gaussian_kernel(self.distances, theta=theta)
if method == "grid":
dist = self.distances.copy()
dist[dist > 16] = np.inf # keep only grid edges
theta = kwargs.get('theta', 20)
return gaussian_kernel(dist, theta=theta)