From 9f847f46c669db92c202aba2eaefa952fe4b8ea8 Mon Sep 17 00:00:00 2001 From: Avik Basu <3485425+ab93@users.noreply.github.com> Date: Wed, 10 Apr 2024 15:29:02 -0700 Subject: [PATCH] feat: introduce stride in dataset (#360) Useful for flattened vectored data --------- Signed-off-by: Avik Basu Co-authored-by: Avik Basu Signed-off-by: skondakindi --- numalogic/config/_config.py | 2 + numalogic/tools/data.py | 17 +++-- numalogic/udfs/postprocess.py | 4 +- numalogic/udfs/trainer/_base.py | 2 +- tests/tools/test_data.py | 124 ++++++++++++++++++++------------ 5 files changed, 96 insertions(+), 53 deletions(-) diff --git a/numalogic/config/_config.py b/numalogic/config/_config.py index 1fd5fe43..67f37eb9 100644 --- a/numalogic/config/_config.py +++ b/numalogic/config/_config.py @@ -82,6 +82,7 @@ class LightningTrainerConf: enable_checkpointing: bool = False enable_progress_bar: bool = False enable_model_summary: bool = True + deterministic: bool = False @dataclass @@ -94,6 +95,7 @@ class TrainerConf: retry_sec: int = 600 # 10 min batch_size: int = 64 data_freq_sec: int = 60 + ds_stride: int = 1 transforms: Optional[list[ModelInfo]] = None pltrainer_conf: LightningTrainerConf = field(default_factory=LightningTrainerConf) diff --git a/numalogic/tools/data.py b/numalogic/tools/data.py index 8ff5222a..5ee0b599 100644 --- a/numalogic/tools/data.py +++ b/numalogic/tools/data.py @@ -101,6 +101,7 @@ class StreamingDataset(IterableDataset): ---- data: A numpy array containing the input data in the shape of (batch, num_features). seq_len: Length of the sliding window sequences to be generated from the input data + stride: Stride to jump between sequences; defaults to 1 Raises ------ @@ -109,12 +110,15 @@ class StreamingDataset(IterableDataset): have a minimum dimension size of 2 """ - __slots__ = ("_seq_len", "_data") + __slots__ = ("_seq_len", "_data", "_stride") - def __init__(self, data: npt.NDArray[float], seq_len: int): + def __init__(self, data: npt.NDArray[float], seq_len: int, stride: int = 1): if seq_len > len(data): raise ValueError(f"Sequence length: {seq_len} is more than data size: {len(data)}") + if stride >= seq_len: + raise ValueError(f"Stride: {stride} should be less than sequence length: {seq_len}") + if data.ndim != 2: raise InvalidDataShapeError( f"Input data should have dim=2, received shape: {data.shape}" @@ -122,6 +126,7 @@ def __init__(self, data: npt.NDArray[float], seq_len: int): self._seq_len = seq_len self._data = data.astype(np.float32) + self._stride = stride @property def data(self) -> npt.NDArray[float]: @@ -148,9 +153,9 @@ def create_seq(self, input_: npt.NDArray[float]) -> Generator[npt.NDArray[float] A subarray of size (seq_len, num_features) from the input data. """ idx = 0 - while idx < len(self): + while idx < len(self._data) - self._seq_len + 1: yield input_[idx : idx + self._seq_len] - idx += 1 + idx += self._stride def __iter__(self) -> Iterator[npt.NDArray[float]]: r"""Returns an iterator for the StreamingDataset object. @@ -167,7 +172,7 @@ def __iter__(self) -> Iterator[npt.NDArray[float]]: def __len__(self) -> int: r"""Returns the number of sequences that can be generated from the input data.""" - return len(self._data) - self._seq_len + 1 + return (len(self._data) - self._seq_len) // self._stride + 1 def __getitem__(self, idx: Union[int, slice]) -> npt.NDArray[float]: r"""Retrieves a sequence from the input data at the specified index.""" @@ -178,7 +183,7 @@ def __getitem__(self, idx: Union[int, slice]) -> npt.NDArray[float]: raw_data_size = len(self._data) start = idx.start or 0 stop = min(idx.stop, raw_data_size) if idx.stop else raw_data_size - for i in range(start, stop - self._seq_len + 1): + for i in range(start, stop - self._seq_len + 1, self._stride): output.append(self._data[i : (i + self._seq_len)]) return np.stack(output) if idx >= len(self): diff --git a/numalogic/udfs/postprocess.py b/numalogic/udfs/postprocess.py index f3d0e850..b7b24008 100644 --- a/numalogic/udfs/postprocess.py +++ b/numalogic/udfs/postprocess.py @@ -328,9 +328,9 @@ def compute( scores = cls.compute_threshold(model, input_) # (seqlen x nfeat) win_scores = cls.compute_feature_scores( scores, win_agg_conf=score_conf.window_agg - ) # (seqlen x nfeat) + ) # (nfeat,) if postproc_tx: - win_scores = cls.compute_postprocess(postproc_tx, win_scores) # (seqlen x nfeat) + win_scores = cls.compute_postprocess(postproc_tx, win_scores) # (nfeat,) return win_scores # (nfeat, ) @classmethod diff --git a/numalogic/udfs/trainer/_base.py b/numalogic/udfs/trainer/_base.py index da339c28..3d26916d 100644 --- a/numalogic/udfs/trainer/_base.py +++ b/numalogic/udfs/trainer/_base.py @@ -114,7 +114,7 @@ def compute( stateful=any(_conf.stateful for _conf in numalogic_cfg.preprocess), ) - train_ds = StreamingDataset(input_, model.seq_len) + train_ds = StreamingDataset(input_, model.seq_len, stride=trainer_cfg.ds_stride) trainer = TimeseriesTrainer(**asdict(trainer_cfg.pltrainer_conf)) trainer.fit( model, train_dataloaders=DataLoader(train_ds, batch_size=trainer_cfg.batch_size) diff --git a/tests/tools/test_data.py b/tests/tools/test_data.py index 58fe6b8d..b600e21d 100644 --- a/tests/tools/test_data.py +++ b/tests/tools/test_data.py @@ -2,8 +2,9 @@ import unittest import numpy as np +import pytest import torch -from numpy.testing import assert_allclose +from numpy.testing import assert_allclose, assert_array_equal from torch.testing import assert_close from torch.utils.data import DataLoader @@ -22,40 +23,40 @@ RNG = np.random.default_rng(42) -class TestStreamingDataset(unittest.TestCase): - data = None - m = None - n = None +@pytest.fixture +def setup(): + m, n = 30, 3 + return np.arange(30 * 3).reshape(30, 3).astype(np.float32), m, n - @classmethod - def setUpClass(cls) -> None: - cls.m = 30 - cls.n = 3 - cls.data = np.arange(cls.m * cls.n).reshape(30, 3) - def test_dataset(self): - dataset = StreamingDataset(self.data, seq_len=SEQ_LEN) +class TestStreamingDataset: + def test_dataset(self, setup): + data, m, n = setup + dataset = StreamingDataset(data, seq_len=SEQ_LEN) for seq in dataset: - self.assertTupleEqual((SEQ_LEN, self.n), seq.shape) - self.assertEqual(self.data.shape[0] - SEQ_LEN + 1, len(dataset)) - assert_allclose(np.ravel(dataset[0]), np.ravel(self.data[:12, :])) - assert_allclose(self.data, dataset.data) - - def test_dataset_getitem(self): - ds = StreamingDataset(self.data, seq_len=SEQ_LEN) - self.assertEqual(len(self.data) - SEQ_LEN + 1, len(ds)) - self.assertTupleEqual((15 - SEQ_LEN + 1, SEQ_LEN, self.n), ds[:15].shape) - self.assertTupleEqual((1, SEQ_LEN, self.n), ds[3:15].shape) - self.assertTupleEqual((self.m - SEQ_LEN + 1, SEQ_LEN, self.n), ds[:50].shape) - - def test_as_array(self): - ds = StreamingDataset(self.data, seq_len=SEQ_LEN) - self.assertTupleEqual((self.m - SEQ_LEN + 1, SEQ_LEN, self.n), ds.as_array().shape) - - def test_w_dataloader_01(self): + assert (SEQ_LEN, n) == seq.shape + assert (data.shape[0] - SEQ_LEN + 1) == len(dataset) + assert_allclose(np.ravel(dataset[0]), np.ravel(data[:12, :])) + assert_allclose(data, dataset.data) + + def test_dataset_getitem(self, setup): + data, m, n = setup + ds = StreamingDataset(data, seq_len=SEQ_LEN) + assert (len(data) - SEQ_LEN + 1) == len(ds) + assert (15 - SEQ_LEN + 1, SEQ_LEN, n) == ds[:15].shape + assert (1, SEQ_LEN, n) == ds[3:15].shape + assert (m - SEQ_LEN + 1, SEQ_LEN, n) == ds[:50].shape + + def test_as_array(self, setup): + data, m, n = setup + ds = StreamingDataset(data, seq_len=SEQ_LEN) + assert (m - SEQ_LEN + 1, SEQ_LEN, n) == ds.as_array().shape + + def test_w_dataloader_01(self, setup): + data, m, n = setup batch_size = 4 dl = DataLoader( - StreamingDataset(self.data, seq_len=SEQ_LEN), + StreamingDataset(data, seq_len=SEQ_LEN), batch_size=batch_size, num_workers=1, drop_last=True, @@ -64,9 +65,9 @@ def test_w_dataloader_01(self): assert_close(batch[0, 1, :], batch[1, 0, :]) assert_close(batch[2, 1, :], batch[3, 0, :]) - with self.assertRaises(NotImplementedError): + with pytest.raises(NotImplementedError): dl = DataLoader( - StreamingDataset(self.data, seq_len=SEQ_LEN), + StreamingDataset(data, seq_len=SEQ_LEN), batch_size=batch_size, drop_last=True, num_workers=2, @@ -74,18 +75,53 @@ def test_w_dataloader_01(self): for _ in dl: pass - def test_dataset_err_01(self): - with self.assertRaises(ValueError): - StreamingDataset(self.data, seq_len=self.m + 1) - - def test_dataset_err_02(self): - dataset = StreamingDataset(self.data, seq_len=SEQ_LEN) - with self.assertRaises(IndexError): - _ = dataset[self.m - 5] - - def test_dataset_err_03(self): - with self.assertRaises(InvalidDataShapeError): - StreamingDataset(self.data.ravel(), seq_len=SEQ_LEN) + def test_dataset_err_01(self, setup): + data, m, _ = setup + with pytest.raises(ValueError): + StreamingDataset(data, seq_len=m + 1) + + def test_dataset_err_02(self, setup): + data, m, _ = setup + dataset = StreamingDataset(data, seq_len=SEQ_LEN) + with pytest.raises(IndexError): + _ = dataset[m - 5] + + def test_dataset_err_03(self, setup): + data, _, _ = setup + with pytest.raises(InvalidDataShapeError): + StreamingDataset(data.ravel(), seq_len=SEQ_LEN) + + def test_ds_with_stride(self, setup): + data, m, n = setup + stride = 4 + dataset = StreamingDataset(data, seq_len=SEQ_LEN, stride=stride) + assert (m - SEQ_LEN) // stride + 1 == len(dataset) + for idx, seq in enumerate(dataset): + assert (SEQ_LEN, n) == seq.shape + assert_array_equal(seq[0], data[idx * stride]) + + assert (len(dataset), SEQ_LEN, n) == dataset.as_array().shape + assert (len(dataset), SEQ_LEN, n) == dataset.as_tensor().shape + + def test_ds_with_stride_err(self, setup): + data, _, _ = setup + with pytest.raises(ValueError): + StreamingDataset(data, seq_len=SEQ_LEN, stride=20) + + def test_ds_with_stride_dataloader(self, setup): + data, m, n = setup + stride = 2 + dataset = StreamingDataset(data, seq_len=SEQ_LEN, stride=stride) + batch_size = 5 + dl = DataLoader( + dataset, + batch_size=batch_size, + ) + total_samples = 0 + for batch in dl: + total_samples += batch.shape[0] + assert (batch_size, SEQ_LEN, n) == batch.shape + assert total_samples == len(dataset) class TestStreamingDataLoader(unittest.TestCase):