Skip to content

Commit

Permalink
feat: introduce stride in dataset (numaproj#360)
Browse files Browse the repository at this point in the history
Useful for flattened vectored data

---------

Signed-off-by: Avik Basu <[email protected]>
Co-authored-by: Avik Basu <[email protected]>
Signed-off-by: skondakindi <[email protected]>
  • Loading branch information
2 people authored and skondakindi committed Apr 11, 2024
1 parent 4597e76 commit 9f847f4
Show file tree
Hide file tree
Showing 5 changed files with 96 additions and 53 deletions.
2 changes: 2 additions & 0 deletions numalogic/config/_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ class LightningTrainerConf:
enable_checkpointing: bool = False
enable_progress_bar: bool = False
enable_model_summary: bool = True
deterministic: bool = False


@dataclass
Expand All @@ -94,6 +95,7 @@ class TrainerConf:
retry_sec: int = 600 # 10 min
batch_size: int = 64
data_freq_sec: int = 60
ds_stride: int = 1
transforms: Optional[list[ModelInfo]] = None
pltrainer_conf: LightningTrainerConf = field(default_factory=LightningTrainerConf)

Expand Down
17 changes: 11 additions & 6 deletions numalogic/tools/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ class StreamingDataset(IterableDataset):
----
data: A numpy array containing the input data in the shape of (batch, num_features).
seq_len: Length of the sliding window sequences to be generated from the input data
stride: Stride to jump between sequences; defaults to 1
Raises
------
Expand All @@ -109,19 +110,23 @@ class StreamingDataset(IterableDataset):
have a minimum dimension size of 2
"""

__slots__ = ("_seq_len", "_data")
__slots__ = ("_seq_len", "_data", "_stride")

def __init__(self, data: npt.NDArray[float], seq_len: int):
def __init__(self, data: npt.NDArray[float], seq_len: int, stride: int = 1):
if seq_len > len(data):
raise ValueError(f"Sequence length: {seq_len} is more than data size: {len(data)}")

if stride >= seq_len:
raise ValueError(f"Stride: {stride} should be less than sequence length: {seq_len}")

if data.ndim != 2:
raise InvalidDataShapeError(
f"Input data should have dim=2, received shape: {data.shape}"
)

self._seq_len = seq_len
self._data = data.astype(np.float32)
self._stride = stride

@property
def data(self) -> npt.NDArray[float]:
Expand All @@ -148,9 +153,9 @@ def create_seq(self, input_: npt.NDArray[float]) -> Generator[npt.NDArray[float]
A subarray of size (seq_len, num_features) from the input data.
"""
idx = 0
while idx < len(self):
while idx < len(self._data) - self._seq_len + 1:
yield input_[idx : idx + self._seq_len]
idx += 1
idx += self._stride

def __iter__(self) -> Iterator[npt.NDArray[float]]:
r"""Returns an iterator for the StreamingDataset object.
Expand All @@ -167,7 +172,7 @@ def __iter__(self) -> Iterator[npt.NDArray[float]]:

def __len__(self) -> int:
r"""Returns the number of sequences that can be generated from the input data."""
return len(self._data) - self._seq_len + 1
return (len(self._data) - self._seq_len) // self._stride + 1

def __getitem__(self, idx: Union[int, slice]) -> npt.NDArray[float]:
r"""Retrieves a sequence from the input data at the specified index."""
Expand All @@ -178,7 +183,7 @@ def __getitem__(self, idx: Union[int, slice]) -> npt.NDArray[float]:
raw_data_size = len(self._data)
start = idx.start or 0
stop = min(idx.stop, raw_data_size) if idx.stop else raw_data_size
for i in range(start, stop - self._seq_len + 1):
for i in range(start, stop - self._seq_len + 1, self._stride):
output.append(self._data[i : (i + self._seq_len)])
return np.stack(output)
if idx >= len(self):
Expand Down
4 changes: 2 additions & 2 deletions numalogic/udfs/postprocess.py
Original file line number Diff line number Diff line change
Expand Up @@ -328,9 +328,9 @@ def compute(
scores = cls.compute_threshold(model, input_) # (seqlen x nfeat)
win_scores = cls.compute_feature_scores(
scores, win_agg_conf=score_conf.window_agg
) # (seqlen x nfeat)
) # (nfeat,)
if postproc_tx:
win_scores = cls.compute_postprocess(postproc_tx, win_scores) # (seqlen x nfeat)
win_scores = cls.compute_postprocess(postproc_tx, win_scores) # (nfeat,)
return win_scores # (nfeat, )

@classmethod
Expand Down
2 changes: 1 addition & 1 deletion numalogic/udfs/trainer/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ def compute(
stateful=any(_conf.stateful for _conf in numalogic_cfg.preprocess),
)

train_ds = StreamingDataset(input_, model.seq_len)
train_ds = StreamingDataset(input_, model.seq_len, stride=trainer_cfg.ds_stride)
trainer = TimeseriesTrainer(**asdict(trainer_cfg.pltrainer_conf))
trainer.fit(
model, train_dataloaders=DataLoader(train_ds, batch_size=trainer_cfg.batch_size)
Expand Down
124 changes: 80 additions & 44 deletions tests/tools/test_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@
import unittest

import numpy as np
import pytest
import torch
from numpy.testing import assert_allclose
from numpy.testing import assert_allclose, assert_array_equal
from torch.testing import assert_close
from torch.utils.data import DataLoader

Expand All @@ -22,40 +23,40 @@
RNG = np.random.default_rng(42)


class TestStreamingDataset(unittest.TestCase):
data = None
m = None
n = None
@pytest.fixture
def setup():
m, n = 30, 3
return np.arange(30 * 3).reshape(30, 3).astype(np.float32), m, n

@classmethod
def setUpClass(cls) -> None:
cls.m = 30
cls.n = 3
cls.data = np.arange(cls.m * cls.n).reshape(30, 3)

def test_dataset(self):
dataset = StreamingDataset(self.data, seq_len=SEQ_LEN)
class TestStreamingDataset:
def test_dataset(self, setup):
data, m, n = setup
dataset = StreamingDataset(data, seq_len=SEQ_LEN)
for seq in dataset:
self.assertTupleEqual((SEQ_LEN, self.n), seq.shape)
self.assertEqual(self.data.shape[0] - SEQ_LEN + 1, len(dataset))
assert_allclose(np.ravel(dataset[0]), np.ravel(self.data[:12, :]))
assert_allclose(self.data, dataset.data)

def test_dataset_getitem(self):
ds = StreamingDataset(self.data, seq_len=SEQ_LEN)
self.assertEqual(len(self.data) - SEQ_LEN + 1, len(ds))
self.assertTupleEqual((15 - SEQ_LEN + 1, SEQ_LEN, self.n), ds[:15].shape)
self.assertTupleEqual((1, SEQ_LEN, self.n), ds[3:15].shape)
self.assertTupleEqual((self.m - SEQ_LEN + 1, SEQ_LEN, self.n), ds[:50].shape)

def test_as_array(self):
ds = StreamingDataset(self.data, seq_len=SEQ_LEN)
self.assertTupleEqual((self.m - SEQ_LEN + 1, SEQ_LEN, self.n), ds.as_array().shape)

def test_w_dataloader_01(self):
assert (SEQ_LEN, n) == seq.shape
assert (data.shape[0] - SEQ_LEN + 1) == len(dataset)
assert_allclose(np.ravel(dataset[0]), np.ravel(data[:12, :]))
assert_allclose(data, dataset.data)

def test_dataset_getitem(self, setup):
data, m, n = setup
ds = StreamingDataset(data, seq_len=SEQ_LEN)
assert (len(data) - SEQ_LEN + 1) == len(ds)
assert (15 - SEQ_LEN + 1, SEQ_LEN, n) == ds[:15].shape
assert (1, SEQ_LEN, n) == ds[3:15].shape
assert (m - SEQ_LEN + 1, SEQ_LEN, n) == ds[:50].shape

def test_as_array(self, setup):
data, m, n = setup
ds = StreamingDataset(data, seq_len=SEQ_LEN)
assert (m - SEQ_LEN + 1, SEQ_LEN, n) == ds.as_array().shape

def test_w_dataloader_01(self, setup):
data, m, n = setup
batch_size = 4
dl = DataLoader(
StreamingDataset(self.data, seq_len=SEQ_LEN),
StreamingDataset(data, seq_len=SEQ_LEN),
batch_size=batch_size,
num_workers=1,
drop_last=True,
Expand All @@ -64,28 +65,63 @@ def test_w_dataloader_01(self):
assert_close(batch[0, 1, :], batch[1, 0, :])
assert_close(batch[2, 1, :], batch[3, 0, :])

with self.assertRaises(NotImplementedError):
with pytest.raises(NotImplementedError):
dl = DataLoader(
StreamingDataset(self.data, seq_len=SEQ_LEN),
StreamingDataset(data, seq_len=SEQ_LEN),
batch_size=batch_size,
drop_last=True,
num_workers=2,
)
for _ in dl:
pass

def test_dataset_err_01(self):
with self.assertRaises(ValueError):
StreamingDataset(self.data, seq_len=self.m + 1)

def test_dataset_err_02(self):
dataset = StreamingDataset(self.data, seq_len=SEQ_LEN)
with self.assertRaises(IndexError):
_ = dataset[self.m - 5]

def test_dataset_err_03(self):
with self.assertRaises(InvalidDataShapeError):
StreamingDataset(self.data.ravel(), seq_len=SEQ_LEN)
def test_dataset_err_01(self, setup):
data, m, _ = setup
with pytest.raises(ValueError):
StreamingDataset(data, seq_len=m + 1)

def test_dataset_err_02(self, setup):
data, m, _ = setup
dataset = StreamingDataset(data, seq_len=SEQ_LEN)
with pytest.raises(IndexError):
_ = dataset[m - 5]

def test_dataset_err_03(self, setup):
data, _, _ = setup
with pytest.raises(InvalidDataShapeError):
StreamingDataset(data.ravel(), seq_len=SEQ_LEN)

def test_ds_with_stride(self, setup):
data, m, n = setup
stride = 4
dataset = StreamingDataset(data, seq_len=SEQ_LEN, stride=stride)
assert (m - SEQ_LEN) // stride + 1 == len(dataset)
for idx, seq in enumerate(dataset):
assert (SEQ_LEN, n) == seq.shape
assert_array_equal(seq[0], data[idx * stride])

assert (len(dataset), SEQ_LEN, n) == dataset.as_array().shape
assert (len(dataset), SEQ_LEN, n) == dataset.as_tensor().shape

def test_ds_with_stride_err(self, setup):
data, _, _ = setup
with pytest.raises(ValueError):
StreamingDataset(data, seq_len=SEQ_LEN, stride=20)

def test_ds_with_stride_dataloader(self, setup):
data, m, n = setup
stride = 2
dataset = StreamingDataset(data, seq_len=SEQ_LEN, stride=stride)
batch_size = 5
dl = DataLoader(
dataset,
batch_size=batch_size,
)
total_samples = 0
for batch in dl:
total_samples += batch.shape[0]
assert (batch_size, SEQ_LEN, n) == batch.shape
assert total_samples == len(dataset)


class TestStreamingDataLoader(unittest.TestCase):
Expand Down

0 comments on commit 9f847f4

Please sign in to comment.