Source code for torchtime.data

"""
=====================
Time series data sets
=====================

* `PhysioNet2012 <#torchtime.data.PhysioNet2012>`_
* `PhysioNet2019 <#torchtime.data.PhysioNet2019>`_
* `PhysioNet2019Binary <#torchtime.data.PhysioNet2019Binary>`_
* `UEA <#torchtime.data.UEA>`_
"""

import csv
import pathlib
from typing import Callable, Dict, List, Union

import numpy as np
import pandas as pd
import torch
import torch.nn.functional as F
from sklearn.model_selection import train_test_split
from sktime.datasets import load_from_tsfile_to_dataframe
from torch import Tensor
from torch.nn.utils.rnn import pad_sequence
from torch.utils.data import Dataset
from tqdm import tqdm

from torchtime.constants import (
    EPS,
    OBJ_EXT,
    PHYSIONET_2012_DATASETS,
    PHYSIONET_2012_OUTCOMES,
    PHYSIONET_2012_VARS,
    PHYSIONET_2019_DATASETS,
    TQDM_FORMAT,
    UEA_DOWNLOAD_URL,
)
from torchtime.impute import forward_impute, replace_missing
from torchtime.utils import (
    _cache_data,
    _cache_exists,
    _download_archive,
    _download_to_directory,
    _get_file_list,
    _nanmode,
    _physionet_download,
    _simulate_missing,
    _validate_cache,
)


class _TimeSeriesDataset(Dataset):
    """**Generic time series PyTorch Dataset.**

    .. warning::
        Overload the ``_get_data()`` method to define a data set.

    The proportion of data in the training, validation and (optional) test data sets are
    specified by the ``train_prop`` and ``val_prop`` arguments. For a
    training/validation split specify ``train_prop`` only. For a
    training/validation/test split specify both ``train_prop`` and ``val_prop``.

    For example ``train_prop=0.8`` generates a 80/20% train/validation split, but
    ``train_prop=0.8``, ``val_prop=0.1`` generates a 80/10/10% train/validation/test
    split. Splits are formed using stratified sampling.

    When passed to a PyTorch DataLoader, batches are a named dictionary with ``X``,
    ``y`` and ``length`` data. The ``split`` argument determines whether training,
    validation or test data are returned.

    Missing data can be simulated by dropping data at random. Support is also provided
    to impute missing data. These options are controlled by the ``missing`` and
    ``impute`` arguments. See the `missing data tutorial
    <https://philipdarke.com/torchtime/tutorials/missing_data.html>`_ for more
    information.

    .. warning::
        Mean imputation is unsuitable for categorical variables. To impute missing
        values for a categorical variable with the channel mode (rather than the channel
        mean), pass the channel indices to the ``categorical`` argument. Note this is
        also required for forward imputation to appropriately impute initial missing
        values.

        Alternatively, the calculated channel mean/mode can be overridden using the
        ``channel_means`` argument. This can be used to impute missing data with a fixed
        value.

    Args:
        dataset: Name of the cache directory for the data set.
        split: The data split to return, either *train*, *val* (validation) or *test*.
        train_prop: Proportion of data in the training set.
        val_prop: Proportion of data in the validation set (optional, see above).
        missing: The proportion of data to drop at random. If ``missing`` is a single
            value, data are dropped from all channels. To drop data independently across
            each channel, pass a list of the proportion missing for each channel e.g.
            ``[0.5, 0.2, 0.8]``. Default 0 i.e. no missing data simulation.
        impute: Method used to impute missing data, either *none*, *zero*, *mean*,
            *forward* or a custom imputation function (default "none"). See warning
            above.
        categorical: List with channel indices of categorical variables. Only required
            if imputing data. Default ``[]`` i.e. no categorical variables.
        channel_means: Override the calculated channel mean/mode when imputing data.
            Only used if imputing data. Dictionary with channel indices and values e.g.
            ``{1: 4.5, 3: 7.2}`` (default ``{}`` i.e. no overridden channel mean/modes).
        time: Append time stamp in the first channel (default True).
        mask: Append missing data mask for each channel (default False).
        delta: Append time since previous observation for each channel calculated as in
            `Che et al (2018) <https://doi.org/10.1038/s41598-018-24271-9>`_. Default
            False.
        standardise: Standardise the time series (default False).
        overwrite_cache: Overwrite saved cache (default False).
        path: Location of the ``.torchtime`` cache directory (default ".").
        seed: Random seed for reproducibility (optional).

    Attributes:
        X (Tensor): A tensor of default shape (*n*, *s*, *c* + 1) where *n* = number of
            trajectories, *s* = (longest) trajectory length and *c* = number of
            channels. By default, a time stamp is appended as the first channel. If
            ``time`` is False, the time stamp is omitted and the tensor has shape
            (*n*, *s*, *c*).

            A missing data mask and/or time delta channels can be appended with the
            ``mask`` and ``delta`` arguments. These each have the same number of
            channels as the data set. For example, if ``time``, ``mask`` and
            ``delta`` are all True, ``X`` has shape (*n*, *s*, 3 * *c* + 1) and the
            channels are in the order: time stamp, time series, missing data mask, time
            deltas.

            Where trajectories are of unequal lengths they are padded with ``NaNs`` to
            the length of the longest trajectory in the data.
        y (Tensor): One-hot encoded label data. A tensor of shape (*n*, *l*) where *l*
            is the number of classes.
        length (Tensor): Length of each trajectory prior to padding. A tensor of shape
            (*n*).

    .. note::
        ``X``, ``y`` and ``length`` are available for the training, validation and test
        splits by appending ``_train``, ``_val`` and ``_test`` respectively. For
        example, ``y_val`` returns the labels for the validation data set. These
        attributes are available regardless of the ``split`` argument.

    Returns:
        A PyTorch Dataset object which can be passed to a DataLoader.
    """

    def __init__(
        self,
        dataset: str,
        split: str,
        train_prop: float,
        val_prop: float = None,
        missing: Union[float, List[float]] = 0.0,
        impute: Union[str, Callable[[Tensor], Tensor]] = "none",
        categorical: List[int] = [],
        channel_means: Dict[int, float] = {},
        time: bool = True,
        mask: bool = False,
        delta: bool = False,
        standardise: bool = False,
        overwrite_cache: bool = False,
        path: str = ".",
        seed: int = None,
    ) -> None:
        self.dataset = dataset
        self.split = split
        self.train_prop = train_prop
        self.val_prop = val_prop
        self.test_prop = 0
        self.missing = missing
        self.impute = impute
        self.categorical = categorical
        self.channel_means = channel_means
        self.time = time
        self.mask = mask
        self.delta = delta
        self.standardise = standardise
        self.overwrite_cache = overwrite_cache
        self.path = pathlib.Path() / path / ".torchtime" / self.dataset
        self.seed = seed

        # Constants
        self.IMPUTE_FUNCTIONS = {
            "none": self._no_imputation,
            "zero": self._zero_imputation,
            "mean": self._mean_imputation,
            "forward": self._forward_imputation,
        }

        # Validate arguments and set data splits
        self._validate_arguments()

        # 1. Get data from cache or, if no cache, call _get_data() and cache results
        if _cache_exists(self.path) and not self.overwrite_cache:
            if _validate_cache(self.path):
                X_all = torch.load(self.path / ("X" + OBJ_EXT))
                y_all = torch.load(self.path / ("y" + OBJ_EXT))
                length_all = torch.load(self.path / ("length" + OBJ_EXT))
            else:
                raise Exception(
                    "Cache is corrupted! Use 'overwrite_cache' = True to rebuild."
                )
        else:
            X_all, y_all, length_all = self._get_data()
            X_all = X_all.float()  # float32 precision
            y_all = y_all.float()  # float32 precision
            length_all = length_all.long()  # int64 precision
            _cache_data(self.path, X_all, y_all, length_all)

        # 2. Simulate missing data
        if (type(self.missing) is list and sum(self.missing) > EPS) or (
            type(self.missing) is float and self.missing > EPS
        ):
            _simulate_missing(X_all, self.missing, seed=self.seed)

        # 3. Add time stamp/mask/time delta channels
        if self.time:
            X_all = torch.cat([self._time_stamp(X_all), X_all], dim=2)
        if self.mask:
            X_all = torch.cat([X_all, self._missing_mask(X_all)], dim=2)
        if delta:
            X_all = torch.cat([X_all, self._time_delta(X_all)], dim=2)

        # 4. Form train/validation/test splits
        stratify = torch.nansum(y_all, dim=1) > 0
        (
            self.X_train,
            self.y_train,
            self.length_train,
            self.X_val,
            self.y_val,
            self.length_val,
            self.X_test,
            self.y_test,
            self.length_test,
        ) = self._split_data(
            X_all,
            y_all,
            length_all,
            stratify,
        )

        # Set up for standardisation/imputation
        if self.standardise or self.impute != "none":
            # Number of channels
            n_channels = int(
                (self.X_train.size(2) - self.time) / (1 + self.mask + self.delta)
            )
            # Time series channels
            data_idx = torch.arange(self.time, self.time + n_channels)
            X_train_data = self.X_train[:, :, data_idx]
            # Training data channel means
            train_means = torch.nanmean(X_train_data, dim=(0, 1), keepdim=True)
            fill = train_means.flatten()
        else:
            # Null values to pass to imputer()
            fill = None
            data_idx = None

        # 5. Standardise data
        if self.standardise:
            # Training data channel standard deviations
            train_stds = torch.full((1, 1, n_channels), fill_value=float("nan"))
            for c, Xc in enumerate(X_train_data.unbind(dim=-1)):
                train_stds[:, :, c] = torch.std(Xc[~torch.isnan(Xc)])
            # Standardise data
            self.X_train[:, :, self.time : (self.time + n_channels)] = (
                self.X_train[:, :, self.time : (self.time + n_channels)] - train_means
            ) / (train_stds + EPS)
            self.X_val[:, :, self.time : (self.time + n_channels)] = (
                self.X_val[:, :, self.time : (self.time + n_channels)] - train_means
            ) / (train_stds + EPS)
            if self.test_prop > EPS:
                self.X_test[:, :, self.time : (self.time + n_channels)] = (
                    self.X_test[:, :, self.time : (self.time + n_channels)]
                    - train_means
                ) / (train_stds + EPS)

        # Additional set up for imputation
        if self.impute != "none":
            if self.categorical != []:
                # Impute using mode if categorical variable
                assert (
                    all([type(cat) is int for cat in self.categorical])
                    and min(self.categorical) >= 0
                    and max(self.categorical) < n_channels
                ), "indices in 'categorical' should be between 0 and {}".format(
                    n_channels - 1
                )
                train_modes = [
                    _nanmode(X_train_data[:, :, channel])
                    for channel in self.categorical
                ]
                for i, idx in enumerate(self.categorical):
                    fill[idx] = train_modes[i]
            # Override mean/mode if required
            if self.channel_means != {}:
                for x, y in self.channel_means.items():
                    assert (
                        type(x) is int and x >= 0 and x < n_channels
                    ), "keys in 'channel_means' should be between 0 and {}".format(
                        n_channels - 1
                    )
                    fill[x] = y

        # 6. Impute missing data
        self.X_train, self.y_train = self.imputer(
            self.X_train, self.y_train, fill, data_idx
        )
        self.X_val, self.y_val = self.imputer(self.X_val, self.y_val, fill, data_idx)
        if self.test_prop > EPS:
            self.X_test, self.y_test = self.imputer(
                self.X_test, self.y_test, fill, data_idx
            )
        else:
            del self.X_test, self.y_test, self.length_test

        # 7. Return data split
        if split == "test":
            self.X = self.X_test
            self.y = self.y_test
            self.length = self.length_test
        elif split == "train":
            self.X = self.X_train
            self.y = self.y_train
            self.length = self.length_train
        else:
            self.X = self.X_val
            self.y = self.y_val
            self.length = self.length_val

    def __str__(self):
        """Print data set details."""
        return """TimeSeriesDataset: {}
 - cache location = {}
 - data split = {:.0f}/{:.0f}/{:.0f} (training/validation/test)
 - time/mask/delta channels = {}/{}/{}
 - random seed = {}
 - X, y, length attributes return the {} split""".format(
            self.dataset,
            self.path,
            100 * self.train_prop,
            100 * self.val_prop,
            100 * self.test_prop,
            self.time,
            self.mask,
            self.delta,
            self.seed,
            self.split,
        )

    def _validate_arguments(self):
        """Validate arguments and set imputation function/data splits."""
        # Validate impute arguments
        impute_options = list(self.IMPUTE_FUNCTIONS.keys())
        impute_error = "argument 'impute' must be a string in {} or a function".format(
            impute_options
        )
        if self.impute != "none":
            assert (
                type(self.categorical) is list
            ), "argument 'categorical' must be a list"
            assert (
                type(self.channel_means) is dict
            ), "argument 'channel_means' must be a dictionary"
        # Set impute function
        if type(self.impute) is str:
            assert self.impute in impute_options, impute_error
            self.imputer = self.IMPUTE_FUNCTIONS.get(self.impute)
        elif callable(self.impute):
            self.imputer = self.impute
        else:
            raise Exception(impute_error)
        # Validate/set data splits
        assert (
            self.train_prop > EPS and self.train_prop < 1
        ), "argument 'train_prop' must be in range (0, 1)"
        if self.val_prop is None:
            self.val_prop = 1 - self.train_prop
        else:
            assert (
                self.val_prop > EPS and self.val_prop < 1 - self.train_prop
            ), "argument 'val_prop' must be in range (0, {})".format(
                1 - self.train_prop
            )
            self.test_prop = 1 - self.train_prop - self.val_prop
            self.val_prop = self.val_prop / (1 - self.test_prop)
        splits = ["train", "val"]
        if self.test_prop > EPS:
            splits.append("test")
        assert self.split in splits, "argument 'split' must be one of {}".format(splits)

    @staticmethod
    def _no_imputation(X, y, fill, select):
        """No imputation."""
        return X, y

    @staticmethod
    def _zero_imputation(X, y, fill, select):
        """Zero imputation. Replace missing values with zeros."""
        X_imputed = replace_missing(X, fill=torch.zeros(select.size(-1)), select=select)
        y_imputed = replace_missing(y, fill=torch.zeros(y.size(-1)))
        return X_imputed, y_imputed

    @staticmethod
    def _mean_imputation(X, y, fill, select):
        """Mean imputation. Replace missing values in ``X`` from ``fill``. Replace
        missing values in ``y`` with zeros."""
        X_imputed = replace_missing(X, fill=fill, select=select)
        y_imputed = replace_missing(y, fill=torch.zeros(y.size(-1)))
        return X_imputed, y_imputed

    @staticmethod
    def _forward_imputation(X, y, fill, select):
        """Forward imputation. Replace missing values with previous observation. Replace
        any initial missing values in ``X`` from ``fill``. Assume no missing initial
        values in ``y`` but there may be trailing missing values due to padding."""
        X_imputed = forward_impute(X, fill=fill, select=select)
        y_imputed = forward_impute(y)
        return X_imputed, y_imputed

    def _get_data(self):
        """Overload this function to return ``X``, ``y`` and ``length`` tensors."""
        raise NotImplementedError

    @staticmethod
    def _time_stamp(X):
        """Calculate time stamp."""
        time_stamp = torch.arange(X.size(1)).unsqueeze(0)
        time_stamp = time_stamp.tile((X.size(0), 1)).unsqueeze(2)
        return time_stamp

    def _missing_mask(self, X):
        """Calculate missing data mask."""
        mask = torch.logical_not(torch.isnan(X[:, :, self.time :]))
        return mask

    def _time_delta(self, X):
        """Calculate time delta calculated as in Che et al, 2018, see
        https://www.nature.com/articles/s41598-018-24271-9."""
        # Add time and mask channels
        if not self.time:
            X = torch.cat([self._time_stamp(X), X], dim=2)
        if not self.mask:
            X = torch.cat([X, self._missing_mask(X)], dim=2)
        # Time of each observation by channel
        n_channels = int((X.size(-1) - 1) / 2)
        X = X.transpose(1, 2)  # shape (n, c, s)
        time_stamp = X[:, 0].unsqueeze(1).repeat(1, n_channels, 1)
        # Time delta/mask are 0/1 at time 0 by definition
        time_delta = time_stamp.clone()
        time_delta[:, :, 0] = 0
        time_mask = X[:, -n_channels:].clone()
        time_mask[:, :, 0] = 1
        # Time of previous observation if data missing
        time_delta = time_delta.gather(-1, torch.cummax(time_mask, -1)[1])
        # Calculate time delta
        time_delta = torch.cat(
            (
                time_delta[:, :, 0].unsqueeze(2),  # t = 0
                time_stamp[:, :, 1:]
                - time_delta[:, :, :-1],  # i.e. time minus time of previous observation
            ),
            dim=2,
        )
        return time_delta.transpose(1, 2)

    def _split_data(self, X, y, length, stratify):
        """Split data (``X``, ``y``, ``length``) into training, validation and
        (optional) test sets using stratified sampling."""
        random_state = np.random.RandomState(self.seed)
        if self.test_prop > EPS:
            # Test split
            test_nontest_split = train_test_split(
                X,
                y,
                length,
                stratify,
                train_size=self.test_prop,
                random_state=random_state,
                shuffle=True,
                stratify=stratify,
            )
            (
                X_test,
                X_nontest,
                y_test,
                y_nontest,
                length_test,
                length_nontest,
                _,
                stratify_nontest,
            ) = test_nontest_split
            # Validation/train split
            val_train_split = train_test_split(
                X_nontest,
                y_nontest,
                length_nontest,
                train_size=self.val_prop,
                random_state=random_state,
                shuffle=True,
                stratify=stratify_nontest,
            )
            X_val, X_train, y_val, y_train, length_val, length_train = val_train_split
        else:
            # Validation/train split
            val_train_split = train_test_split(
                X,
                y,
                length,
                train_size=self.val_prop,
                random_state=random_state,
                shuffle=True,
                stratify=stratify,
            )
            X_val, X_train, y_val, y_train, length_val, length_train = val_train_split
            X_test, y_test, length_test = float("nan"), float("nan"), float("nan")
        return (
            X_train,
            y_train,
            length_train,
            X_val,
            y_val,
            length_val,
            X_test,
            y_test,
            length_test,
        )

    def __len__(self):
        return self.X.size(0)

    def __getitem__(self, idx):
        return {"X": self.X[idx], "y": self.y[idx], "length": self.length[idx]}


[docs]class PhysioNet2012(_TimeSeriesDataset): r"""**Returns the PhysioNet Challenge 2012 data as a PyTorch Dataset.** See the PhysioNet `website <https://physionet.org/content/challenge-2012/1.0.0/>`_ for a description of the data set. The proportion of data in the training, validation and (optional) test data sets are specified by the ``train_prop`` and ``val_prop`` arguments. For a training/validation split specify ``train_prop`` only. For a training/validation/test split specify both ``train_prop`` and ``val_prop``. For example ``train_prop=0.8`` generates a 80/20% train/validation split, but ``train_prop=0.8``, ``val_prop=0.1`` generates a 80/10/10% train/validation/test split. Splits are formed using stratified sampling. When passed to a PyTorch DataLoader, batches are a named dictionary with ``X``, ``y`` and ``length`` data. The ``split`` argument determines whether training, validation or test data are returned. Missing data can imputed using the ``impute`` argument. See the `missing data tutorial <https://philipdarke.com/torchtime/tutorials/missing_data.html>`_ for more information. Data channels are in the following order: :0. Mins: Minutes since ICU admission. Derived from the PhysioNet time stamp. :1. Albumin: Albumin (g/dL) :2. ALP: Alkaline phosphatase (IU/L) :3. ALT: Alanine transaminase (IU/L) :4. AST: Aspartate transaminase (IU/L) :5. Bilirubin: Bilirubin (mg/dL) :6. BUN: Blood urea nitrogen (mg/dL) :7. Cholesterol: Cholesterol (mg/dL) :8. Creatinine: Serum creatinine (mg/dL) :9. DiasABP: Invasive diastolic arterial blood pressure (mmHg) :10. FiO2: Fractional inspired O\ :sub:`2` (0-1) :11. GCS: Glasgow Coma Score (3-15) :12. Glucose: Serum glucose (mg/dL) :13. HCO3: Serum bicarbonate (mmol/L) :14. HCT: Hematocrit (%) :15. HR: Heart rate (bpm) :16. K: Serum potassium (mEq/L) :17. Lactate: Lactate (mmol/L) :18. Mg: Serum magnesium (mmol/L) :19. MAP: Invasive mean arterial blood pressure (mmHg) :20. MechVent: Mechanical ventilation respiration (0:false, or 1:true) :21. Na: Serum sodium (mEq/L) :22. NIDiasABP: Non-invasive diastolic arterial blood pressure (mmHg) :23. NIMAP: Non-invasive mean arterial blood pressure (mmHg) :24. NISysABP: Non-invasive systolic arterial blood pressure (mmHg) :25. PaCO2: Partial pressure of arterial CO\ :sub:`2` (mmHg)] :26. PaO2: Partial pressure of arterial O\ :sub:`2` (mmHg) :27. pH: Arterial pH (0-14) :28. Platelets: Platelets (cells/nL) :29. RespRate: Respiration rate (bpm) :30. SaO2: O\ :sub:`2` saturation in hemoglobin (%) :31. SysABP: Invasive systolic arterial blood pressure (mmHg) :32. Temp: Temperature (°C) :33. TroponinI: Troponin-I (μg/L). Note this is labelled *TropI* in the PhysioNet data dictionary. :34. TroponinT: Troponin-T (μg/L). Note this is labelled *TropT* in the PhysioNet data dictionary. :35. Urine: Urine output (mL) :36. WBC: White blood cell count (cells/nL) :37. Weight: Weight (kg) :38. Age: Age (years) at ICU admission :39. Gender: Gender (0: female, or 1: male) :40. Height: Height (cm) at ICU admission :41. ICUType1: Type of ICU unit (1: Coronary Care Unit) :42. ICUType2: Type of ICU unit (2: Cardiac Surgery Recovery Unit) :43. ICUType3: Type of ICU unit (3: Medical ICU) :44. ICUType4: Type of ICU unit (4: Surgical ICU) .. note:: Channels 38 to 41 do not vary with time. Variables 11 (GCS) and 27 (pH) are assumed to be ordinal and are imputed using the same method as a continuous variable. Variable 20 (MechVent) has value ``Nan`` (the majority of values) or 1. It is assumed that value 1 indicates that mechanical ventilation has been used and ``NaN`` indicates either missing data or no mechanical ventilation. Accordingly, the channel mode is assumed to be zero. Variables 41-44 are the one-hot encoded value of ICUType. Args: split: The data split to return, either *train*, *val* (validation) or *test*. train_prop: Proportion of data in the training set. val_prop: Proportion of data in the validation set (optional, see above). impute: Method used to impute missing data, either *none*, *zero*, *mean*, *forward* or a custom imputation function (default "none"). time: Append time stamp in the first channel (default True). mask: Append missing data mask for each channel (default False). delta: Append time since previous observation for each channel calculated as in `Che et al (2018) <https://doi.org/10.1038/s41598-018-24271-9>`_. Default False. standardise: Standardise the time series (default False). overwrite_cache: Overwrite saved cache (default False). path: Location of the ``.torchtime`` cache directory (default "."). seed: Random seed for reproducibility (optional). Attributes: X (Tensor): A tensor of default shape (*n*, *s*, *c* + 1) where *n* = number of trajectories, *s* = (longest) trajectory length and *c* = number of channels in the PhysioNet data (*including* the time since admission in minutes). See above for the order of the PhysioNet channels. By default, a time stamp is appended as the first channel. If ``time`` is False, the time stamp is omitted and the tensor has shape (*n*, *s*, *c*). A missing data mask and/or time delta channels can be appended with the ``mask`` and ``delta`` arguments. These each have the same number of channels as the Physionet data. For example, if ``time``, ``mask`` and ``delta`` are all True, ``X`` has shape (*n*, *s*, 3 * *c* + 1 = 127) and the channels are in the order: time stamp, time series, missing data mask, time deltas. Note that PhysioNet trajectories are of unequal length and are therefore padded with ``NaNs`` to the length of the longest trajectory in the data. y (Tensor): In-hospital survival (the ``In-hospital_death`` variable) for each patient. *y* = 1 indicates an in-hospital death. A tensor of shape (*n*, 1). length (Tensor): Length of each trajectory prior to padding. A tensor of shape (*n*). .. note:: ``X``, ``y`` and ``length`` are available for the training, validation and test splits by appending ``_train``, ``_val`` and ``_test`` respectively. For example, ``y_val`` returns the labels for the validation data set. These attributes are available regardless of the ``split`` argument. Returns: A PyTorch Dataset object which can be passed to a DataLoader. """ def __init__( self, split: str, train_prop: float, val_prop: float = None, impute: Union[str, Callable[[Tensor], Tensor]] = "none", time: bool = True, mask: bool = False, delta: bool = False, standardise: bool = False, overwrite_cache: bool = False, path: str = ".", seed: int = None, ) -> None: self.dataset_path = pathlib.Path() / path / ".torchtime" / "physionet_2012" super(PhysioNet2012, self).__init__( dataset="physionet_2012", split=split, train_prop=train_prop, val_prop=val_prop, impute=impute, categorical=[20], channel_means={20: 0.0}, time=time, mask=mask, delta=delta, standardise=standardise, overwrite_cache=overwrite_cache, path=path, seed=seed, ) def _get_data(self): """Download data and form ``X``, ``y``, ``length`` tensors.""" outcome_path = self.dataset_path / "outcomes" all_X = [None for _ in PHYSIONET_2012_DATASETS] # Download and extract data _physionet_download( PHYSIONET_2012_DATASETS, self.dataset_path, self.overwrite_cache ) [ _download_to_directory(url, outcome_path, self.overwrite_cache) for url in PHYSIONET_2012_OUTCOMES ] # Prepare time series data print("Processing data...") data_directories = [ self.dataset_path / directory for directory in PHYSIONET_2012_DATASETS ] data_files = _get_file_list(data_directories) length = self._get_lengths(data_files) for i, files in enumerate(data_files): all_X[i] = self._process_files(files, max(length), PHYSIONET_2012_VARS) # Prepare labels outcome_files = _get_file_list(outcome_path) all_y = self._get_labels(outcome_files, data_files) # Form tensors X = torch.cat(all_X) X[X == -1] = float("nan") # replace -1 missing data indicator with NaNs y = torch.cat(all_y) length = torch.tensor(length) return X, y, length @staticmethod def _get_lengths(data_files): """Get length of each time series.""" lengths = [] for files in data_files: for file_j in tqdm( files, total=len(files), bar_format=TQDM_FORMAT, ): with open(file_j) as file: reader = csv.reader(file, delimiter=",") lengths_j = [] for k, row in enumerate(reader): if k > 0 and row[1] != "": # ignore head and rows without data lengths_j.append(row[0]) lengths_j = set(lengths_j) lengths.append(len(lengths_j)) return lengths @staticmethod def _process_files(files, max_length, channels): """Process ``.txt`` files.""" X = np.full((len(files), max_length, len(channels)), float("nan")) template_dataframe = pd.DataFrame(columns=channels) for i, file_i in tqdm( enumerate(files), total=len(files), bar_format=TQDM_FORMAT, ): with open(file_i) as file: Xi = pd.read_csv(file) Xi = Xi.pivot_table(index="Time", columns="Parameter", values="Value") Xi["Mins"] = [int(t[:2]) * 60 + int(t[3:]) for t in Xi.index] Xi = pd.concat([template_dataframe, Xi]) Xi = Xi.apply(pd.to_numeric, downcast="float") # Add static variables Xi["Age"] = Xi.loc["00:00", "Age"] Xi["Gender"] = Xi.loc["00:00", "Gender"] Xi["Height"] = Xi.loc["00:00", "Height"] # One-hot encode ICUType icu_classes = 4 icu_onehot = np.eye(icu_classes)[int(Xi.loc["00:00", "ICUType"]) - 1] for j in range(icu_classes): Xi["ICUType" + str(j + 1)] = icu_onehot[j] X[i, : Xi.shape[0], :] = Xi[channels] # TODO: only include time 0 if a weight is provided return torch.tensor(X) @staticmethod def _get_labels(outcome_files, data_files): """Process outcome files.""" y = [] for i, file_i in enumerate(outcome_files): ids = data_files[i] with open(file_i) as file: y_i = pd.read_csv( file, index_col=0, usecols=["RecordID", "In-hospital_death"] ) ids_i = [int(id.stem) for id in ids] y_i = torch.tensor(y_i.loc[ids_i].values) y.append(y_i) return y
[docs]class PhysioNet2019(_TimeSeriesDataset): """**Returns the PhysioNet Challenge 2019 data as a PyTorch Dataset.** See the PhysioNet `website <https://physionet.org/content/challenge-2019/1.0.0/>`_ for a description of the data set. The proportion of data in the training, validation and (optional) test data sets are specified by the ``train_prop`` and ``val_prop`` arguments. For a training/validation split specify ``train_prop`` only. For a training/validation/test split specify both ``train_prop`` and ``val_prop``. For example ``train_prop=0.8`` generates a 80/20% train/validation split, but ``train_prop=0.8``, ``val_prop=0.1`` generates a 80/10/10% train/validation/test split. Splits are formed using stratified sampling. When passed to a PyTorch DataLoader, batches are a named dictionary with ``X``, ``y`` and ``length`` data. The ``split`` argument determines whether training, validation or test data are returned. Missing data can imputed using the ``impute`` argument. See the `missing data tutorial <https://philipdarke.com/torchtime/tutorials/missing_data.html>`_ for more information. Args: split: The data split to return, either *train*, *val* (validation) or *test*. train_prop: Proportion of data in the training set. val_prop: Proportion of data in the validation set (optional, see above). impute: Method used to impute missing data, either *none*, *zero*, *mean*, *forward* or a custom imputation function (default "none"). time: Append time stamp in the first channel (default True). mask: Append missing data mask for each channel (default False). delta: Append time since previous observation for each channel calculated as in `Che et al (2018) <https://doi.org/10.1038/s41598-018-24271-9>`_. Default False. standardise: Standardise the time series (default False). overwrite_cache: Overwrite saved cache (default False). path: Location of the ``.torchtime`` cache directory (default "."). seed: Random seed for reproducibility (optional). Attributes: X (Tensor): A tensor of default shape (*n*, *s*, *c* + 1) where *n* = number of trajectories, *s* = (longest) trajectory length and *c* = number of channels in the PhysioNet data (*including* the ``ICULOS`` time stamp). The channels are ordered as set out on the PhysioNet `website <https://physionet.org/content/challenge-2019/1.0.0/>`_. By default, a time stamp is appended as the first channel. If ``time`` is False, the time stamp is omitted and the tensor has shape (*n*, *s*, *c*). A missing data mask and/or time delta channels can be appended with the ``mask`` and ``delta`` arguments. These each have the same number of channels as the Physionet data. For example, if ``time``, ``mask`` and ``delta`` are all True, ``X`` has shape (*n*, *s*, 3 * *c* + 1 = 121) and the channels are in the order: time stamp, time series, missing data mask, time deltas. Note that PhysioNet trajectories are of unequal length and are therefore padded with ``NaNs`` to the length of the longest trajectory in the data. y (Tensor): ``SepsisLabel`` at each time point. A tensor of shape (*n*, *s*, 1). length (Tensor): Length of each trajectory prior to padding. A tensor of shape (*n*). .. note:: ``X``, ``y`` and ``length`` are available for the training, validation and test splits by appending ``_train``, ``_val`` and ``_test`` respectively. For example, ``y_val`` returns the labels for the validation data set. These attributes are available regardless of the ``split`` argument. Returns: A PyTorch Dataset object which can be passed to a DataLoader. """ def __init__( self, split: str, train_prop: float, val_prop: float = None, impute: Union[str, Callable[[Tensor], Tensor]] = "none", time: bool = True, mask: bool = False, delta: bool = False, standardise: bool = False, overwrite_cache: bool = False, path: str = ".", seed: int = None, ) -> None: super(PhysioNet2019, self).__init__( dataset="physionet_2019", split=split, train_prop=train_prop, val_prop=val_prop, impute=impute, time=time, mask=mask, delta=delta, standardise=standardise, overwrite_cache=overwrite_cache, path=path, seed=seed, ) def _get_data(self): """Download data and form ``X``, ``y``, ``length`` tensors.""" # Download and extract data _physionet_download(PHYSIONET_2019_DATASETS, self.path, self.overwrite_cache) # Prepare data print("Processing data...") data_directories = [self.path / dataset for dataset in PHYSIONET_2019_DATASETS] data_files = _get_file_list(data_directories) length, channels = self._get_lengths_channels(data_files) all_X = [None for _ in PHYSIONET_2019_DATASETS] all_y = [None for _ in PHYSIONET_2019_DATASETS] for i, files in enumerate(data_files): all_X[i], all_y[i] = self._process_files(files, max(length), channels) # Form tensors X = torch.cat(all_X) y = torch.cat(all_y) length = torch.tensor(length) return X, y, length @staticmethod def _get_lengths_channels(data_files, max_time=None): """Get length of each time series and number of channels. Time series can be truncated at a specific hour with the ``max_time`` argument.""" lengths = [] # sequence lengths channels = [] # number of channels for files in data_files: for file_j in tqdm(files, total=len(files), bar_format=TQDM_FORMAT): with open(file_j) as file: reader = csv.reader(file, delimiter="|") lengths_j = [] for k, Xijk in enumerate(reader): channels.append(len(Xijk)) if k > 0: # ignore header if max_time: if int(Xijk[39]) <= max_time: lengths_j.append(1) else: lengths_j.append(1) lengths.append(sum(lengths_j)) channels = list(set(channels)) assert len(channels) == 1, "corrupt file, delete data and re-run" return lengths, channels[0] @staticmethod def _process_files(files, max_length, channels): """Process ``.psv`` files.""" X = np.full((len(files), max_length, channels - 1), float("nan")) y = np.full((len(files), max_length, 1), float("nan")) for i, file_i in tqdm( enumerate(files), total=len(files), bar_format=TQDM_FORMAT, ): with open(file_i) as file: reader = csv.reader(file, delimiter="|") for j, Xij in enumerate(reader): if j > 0: # ignore header X[i, j - 1] = Xij[:-1] y[i, j - 1, 0] = Xij[-1] return torch.tensor(X), torch.tensor(y)
[docs]class PhysioNet2019Binary(_TimeSeriesDataset): """**Returns a binary prediction variant of the PhysioNet Challenge 2019 data as a PyTorch Dataset.** In contrast with the full challenge, the first 72 hours of data are used to predict whether a patient develops sepsis at any point during the period of hospitalisation as in `Kidger et al (2020) <https://arxiv.org/abs/2005.08926>`_. See the PhysioNet `website <https://physionet.org/content/challenge-2019/1.0.0/>`_ for a description of the data set. The proportion of data in the training, validation and (optional) test data sets are specified by the ``train_prop`` and ``val_prop`` arguments. For a training/validation split specify ``train_prop`` only. For a training/validation/test split specify both ``train_prop`` and ``val_prop``. For example ``train_prop=0.8`` generates a 80/20% train/validation split, but ``train_prop=0.8``, ``val_prop=0.1`` generates a 80/10/10% train/validation/test split. Splits are formed using stratified sampling. When passed to a PyTorch DataLoader, batches are a named dictionary with ``X``, ``y`` and ``length`` data. The ``split`` argument determines whether training, validation or test data are returned. Missing data can imputed using the ``impute`` argument. See the `missing data tutorial <https://philipdarke.com/torchtime/tutorials/missing_data.html>`_ for more information. Args: split: The data split to return, either *train*, *val* (validation) or *test*. train_prop: Proportion of data in the training set. val_prop: Proportion of data in the validation set (optional, see above). impute: Method used to impute missing data, either *none*, *zero*, *mean*, *forward* or a custom imputation function (default "none"). time: Append time stamp in the first channel (default True). mask: Append missing data mask for each channel (default False). delta: Append time since previous observation for each channel calculated as in `Che et al (2018) <https://doi.org/10.1038/s41598-018-24271-9>`_. Default False. standardise: Standardise the time series (default False). overwrite_cache: Overwrite saved cache (default False). path: Location of the ``.torchtime`` cache directory (default "."). seed: Random seed for reproducibility (optional). Attributes: X (Tensor): A tensor of default shape (*n*, *s*, *c* + 1) where *n* = number of trajectories, *s* = (longest) trajectory length and *c* = number of channels in the PhysioNet data (*including* the ``ICULOS`` time stamp). The channels are ordered as set out on the PhysioNet `website <https://physionet.org/content/challenge-2019/1.0.0/>`_. By default, a time stamp is appended as the first channel. If ``time`` is False, the time stamp is omitted and the tensor has shape (*n*, *s*, *c*). A missing data mask and/or time delta channels can be appended with the ``mask`` and ``delta`` arguments. These each have the same number of channels as the Physionet data. For example, if ``time``, ``mask`` and ``delta`` are all True, ``X`` has shape (*n*, *s*, 3 * *c* + 1 = 121) and the channels are in the order: time stamp, time series, missing data mask, time deltas. Note that PhysioNet trajectories are of unequal length and are therefore padded with ``NaNs`` to the length of the longest trajectory in the data. y (Tensor): Whether patient is diagnosed with sepsis at any time during hospitalisation. A tensor of shape (*n*, 1). length (Tensor): Length of each trajectory prior to padding. A tensor of shape (*n*). .. note:: ``X``, ``y`` and ``length`` are available for the training, validation and test splits by appending ``_train``, ``_val`` and ``_test`` respectively. For example, ``y_val`` returns the labels for the validation data set. These attributes are available regardless of the ``split`` argument. Returns: A PyTorch Dataset object which can be passed to a DataLoader. """ def __init__( self, split: str, train_prop: float, val_prop: float = None, impute: Union[str, Callable[[Tensor], Tensor]] = "none", time: bool = True, mask: bool = False, delta: bool = False, standardise: bool = False, overwrite_cache: bool = False, path: str = ".", seed: int = None, ) -> None: self.DATASET_NAME = "physionet_2019binary" self.path_arg = path self.max_time = 72 # hours super(PhysioNet2019Binary, self).__init__( dataset=self.DATASET_NAME, split=split, train_prop=train_prop, val_prop=val_prop, impute=impute, time=time, mask=mask, delta=delta, standardise=standardise, overwrite_cache=overwrite_cache, path=path, seed=seed, ) def _get_data(self): """Download data and form ``X``, ``y``, ``length`` tensors.""" # Download and extract data in "physionet2019" directory to avoid duplication cache_path = pathlib.Path() / self.path_arg / ".torchtime" / "physionet_2019" _physionet_download(PHYSIONET_2019_DATASETS, cache_path, self.overwrite_cache) # Prepare data print("Processing data...") data_directories = [cache_path / dataset for dataset in PHYSIONET_2019_DATASETS] data_files = _get_file_list(data_directories) length, channels = PhysioNet2019._get_lengths_channels( data_files, max_time=self.max_time ) all_X = [None for _ in PHYSIONET_2019_DATASETS] all_y = [None for _ in PHYSIONET_2019_DATASETS] for i, files in enumerate(data_files): all_X[i], all_y[i] = self._process_files(files, max(length), channels) # Form tensors X = torch.cat(all_X) y = torch.cat(all_y) length = torch.tensor(length) # Drop patients with zero length sequences patient_index = torch.arange(X.size(0)).masked_select(length != 0).int() X = X.index_select(index=patient_index, dim=0) y = y.index_select(index=patient_index, dim=0) length = length.index_select(index=patient_index, dim=0) # Save cached files to "physionet2019binary" directory self.path = pathlib.Path() / self.path_arg / ".torchtime" / self.DATASET_NAME return X, y, length def _process_files(self, files, max_length, channels): """Process ``.psv`` files.""" X = np.full((len(files), max_length, channels - 1), float("nan")) y = np.full((len(files), 1), 0.0) for i, file_i in tqdm( enumerate(files), total=len(files), bar_format=TQDM_FORMAT, ): with open(file_i) as file: reader = csv.reader(file, delimiter="|") for j, Xij in enumerate(reader): if j > 0: # ignore header if int(Xij[39]) <= self.max_time: X[i, j - 1] = Xij[:-1] y[i, 0] = max(y[i, 0], int(Xij[-1])) # sepsis at any point return torch.tensor(X), torch.tensor(y)
[docs]class UEA(_TimeSeriesDataset): """ **Returns a time series classification data set from the UEA/UCR repository as a PyTorch Dataset.** See the UEA/UCR repository `website <https://www.timeseriesclassification.com/>`_ for the data sets. The proportion of data in the training, validation and (optional) test data sets are specified by the ``train_prop`` and ``val_prop`` arguments. For a training/validation split specify ``train_prop`` only. For a training/validation/test split specify both ``train_prop`` and ``val_prop``. For example ``train_prop=0.8`` generates a 80/20% train/validation split, but ``train_prop=0.8``, ``val_prop=0.1`` generates a 80/10/10% train/validation/test split. Splits are formed using stratified sampling. When passed to a PyTorch DataLoader, batches are a named dictionary with ``X``, ``y`` and ``length`` data. The ``split`` argument determines whether training, validation or test data are returned. Missing data can be simulated by dropping data at random. Support is also provided to impute missing data. These options are controlled by the ``missing`` and ``impute`` arguments. See the `missing data tutorial <https://philipdarke.com/torchtime/tutorials/missing_data.html>`_ for more information. .. warning:: Mean imputation is unsuitable for categorical variables. To impute missing values for a categorical variable with the channel mode (rather than the channel mean), pass the channel indices to the ``categorical`` argument. Note this is also required for forward imputation to appropriately impute initial missing values. Alternatively, the calculated channel mean/mode can be overridden using the ``channel_means`` argument. This can be used to impute missing data with a fixed value. Args: dataset: The UEA/UCR data set from list `here <https://timeseriesclassification.com/dataset.php>`_. split: The data split to return, either *train*, *val* (validation) or *test*. train_prop: Proportion of data in the training set. val_prop: Proportion of data in the validation set (optional, see above). missing: The proportion of data to drop at random. If ``missing`` is a single value, data are dropped from all channels. To drop data independently across each channel, pass a list of the proportion missing for each channel e.g. ``[0.5, 0.2, 0.8]``. Default 0 i.e. no missing data simulation. impute: Method used to impute missing data, either *none*, *zero*, *mean*, *forward* or a custom imputation function (default "none"). See warning above. categorical: List with channel indices of categorical variables. Only required if imputing data. Default ``[]`` i.e. no categorical variables. channel_means: Override the calculated channel mean/mode when imputing data. Only used if imputing data. Dictionary with channel indices and values e.g. ``{1: 4.5, 3: 7.2}`` (default ``{}`` i.e. no overridden channel mean/modes). time: Append time stamp in the first channel (default True). mask: Append missing data mask for each channel (default False). delta: Append time since previous observation for each channel calculated as in `Che et al (2018) <https://doi.org/10.1038/s41598-018-24271-9>`_. Default False. standardise: Standardise the time series (default False). overwrite_cache: Overwrite saved cache (default False). path: Location of the ``.torchtime`` cache directory (default "."). seed: Random seed for reproducibility (optional). Attributes: X (Tensor): A tensor of default shape (*n*, *s*, *c* + 1) where *n* = number of trajectories, *s* = (longest) trajectory length and *c* = number of channels. By default, a time stamp is appended as the first channel. If ``time`` is False, the time stamp is omitted and the tensor has shape (*n*, *s*, *c*). A missing data mask and/or time delta channels can be appended with the ``mask`` and ``delta`` arguments. These each have the same number of channels as the data set. For example, if ``time``, ``mask`` and ``delta`` are all True, ``X`` has shape (*n*, *s*, 3 * *c* + 1) and the channels are in the order: time stamp, time series, missing data mask, time deltas. Where trajectories are of unequal lengths they are padded with ``NaNs`` to the length of the longest trajectory in the data. y (Tensor): One-hot encoded label data. A tensor of shape (*n*, *l*) where *l* is the number of classes. length (Tensor): Length of each trajectory prior to padding. A tensor of shape (*n*). .. note:: ``X``, ``y`` and ``length`` are available for the training, validation and test splits by appending ``_train``, ``_val`` and ``_test`` respectively. For example, ``y_val`` returns the labels for the validation data set. These attributes are available regardless of the ``split`` argument. Returns: A PyTorch Dataset object which can be passed to a DataLoader. """ def __init__( self, dataset: str, split: str, train_prop: float, val_prop: float = None, missing: Union[float, List[float]] = 0.0, impute: Union[str, Callable[[Tensor], Tensor]] = "none", categorical: List[int] = [], channel_means: Dict[int, float] = {}, time: bool = True, mask: bool = False, delta: bool = False, standardise: bool = False, overwrite_cache: bool = False, path: str = ".", seed: int = None, ) -> None: self.dataset_name = dataset self.raw_path = ( pathlib.Path() / path / ".torchtime" / ("uea_" + self.dataset_name) / "raw" ) super(UEA, self).__init__( dataset="uea_" + self.dataset_name, split=split, train_prop=train_prop, val_prop=val_prop, missing=missing, impute=impute, categorical=categorical, channel_means=channel_means, time=time, mask=mask, delta=delta, standardise=standardise, overwrite_cache=overwrite_cache, path=path, seed=seed, ) def _download_uea_data(self, url, path): """Download UEA data if not already downloaded.""" train_path = path / (self.dataset_name + "_TRAIN.ts") test_path = path / (self.dataset_name + "_TEST.ts") # Download and extract archive if not path.is_dir(): _download_archive(url, path) else: downloaded_files = _get_file_list(path) if train_path not in downloaded_files or test_path not in downloaded_files: _download_archive(url, path) # Verify download downloaded_files = _get_file_list(path) assert ( train_path in downloaded_files ), "{} not in downloaded archive, check {}".format(train_path, url) assert ( test_path in downloaded_files ), "{} not in downloaded archive, check {}".format(test_path, url) return [train_path, test_path] @staticmethod def _extract_ts_files(data_files): """Extract ``.ts`` data based on ``sktime.datasets.load_UCR_UEA_dataset()``.""" X = pd.DataFrame(dtype="object") y = pd.Series(dtype="object") for split in data_files: contents = load_from_tsfile_to_dataframe(split) X = pd.concat([X, pd.DataFrame(contents[0])]) y = pd.concat([y, pd.Series(contents[1])]) y = pd.Series.to_numpy(y, dtype=str) return X, y def _get_data(self): """Download data and form ``X``, ``y`` and ``length`` tensors.""" data_files = self._download_uea_data( UEA_DOWNLOAD_URL + self.dataset_name + ".zip", self.raw_path ) print("Processing data...") X_raw, y_raw = self._extract_ts_files(data_files) # Length of each trajectory channel_lengths = X_raw.apply(lambda Xi: Xi.apply(len), axis=1) length = torch.tensor(channel_lengths.apply(max, axis=1).values) # Form tensor with padded trajectories X = torch.stack( [self._pad(X_raw.iloc[i], length.max()) for i in range(len(X_raw))], dim=0, ) # One-hot encode labels (start from zero) y = torch.tensor(y_raw.astype(int)) if all(y != 0): y -= 1 y = F.one_hot(y) return X, y, length def _pad(self, Xi, max_length): """Pad trajectories to length ``max_length``.""" Xi = pad_sequence([torch.tensor(Xij) for Xij in Xi]) out = torch.full((max_length, Xi.size(1)), float("nan")) # shape (s, c) out[0 : Xi.size(0)] = Xi return out