Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: introduce stride in dataset #360

Merged
merged 2 commits into from
Apr 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions numalogic/config/_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ class LightningTrainerConf:
enable_checkpointing: bool = False
enable_progress_bar: bool = False
enable_model_summary: bool = True
deterministic: bool = False


@dataclass
Expand All @@ -94,6 +95,7 @@ class TrainerConf:
retry_sec: int = 600 # 10 min
batch_size: int = 64
data_freq_sec: int = 60
ds_stride: int = 1
transforms: Optional[list[ModelInfo]] = None
pltrainer_conf: LightningTrainerConf = field(default_factory=LightningTrainerConf)

Expand Down
17 changes: 11 additions & 6 deletions numalogic/tools/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ class StreamingDataset(IterableDataset):
----
data: A numpy array containing the input data in the shape of (batch, num_features).
seq_len: Length of the sliding window sequences to be generated from the input data
stride: Stride to jump between sequences; defaults to 1

Raises
------
Expand All @@ -109,19 +110,23 @@ class StreamingDataset(IterableDataset):
have a minimum dimension size of 2
"""

__slots__ = ("_seq_len", "_data")
__slots__ = ("_seq_len", "_data", "_stride")

def __init__(self, data: npt.NDArray[float], seq_len: int):
def __init__(self, data: npt.NDArray[float], seq_len: int, stride: int = 1):
if seq_len > len(data):
raise ValueError(f"Sequence length: {seq_len} is more than data size: {len(data)}")

if stride >= seq_len:
raise ValueError(f"Stride: {stride} should be less than sequence length: {seq_len}")

if data.ndim != 2:
raise InvalidDataShapeError(
f"Input data should have dim=2, received shape: {data.shape}"
)

self._seq_len = seq_len
self._data = data.astype(np.float32)
self._stride = stride

@property
def data(self) -> npt.NDArray[float]:
Expand All @@ -148,9 +153,9 @@ def create_seq(self, input_: npt.NDArray[float]) -> Generator[npt.NDArray[float]
A subarray of size (seq_len, num_features) from the input data.
"""
idx = 0
while idx < len(self):
while idx < len(self._data) - self._seq_len + 1:
yield input_[idx : idx + self._seq_len]
idx += 1
idx += self._stride

def __iter__(self) -> Iterator[npt.NDArray[float]]:
r"""Returns an iterator for the StreamingDataset object.
Expand All @@ -167,7 +172,7 @@ def __iter__(self) -> Iterator[npt.NDArray[float]]:

def __len__(self) -> int:
r"""Returns the number of sequences that can be generated from the input data."""
return len(self._data) - self._seq_len + 1
return (len(self._data) - self._seq_len) // self._stride + 1

def __getitem__(self, idx: Union[int, slice]) -> npt.NDArray[float]:
r"""Retrieves a sequence from the input data at the specified index."""
Expand All @@ -178,7 +183,7 @@ def __getitem__(self, idx: Union[int, slice]) -> npt.NDArray[float]:
raw_data_size = len(self._data)
start = idx.start or 0
stop = min(idx.stop, raw_data_size) if idx.stop else raw_data_size
for i in range(start, stop - self._seq_len + 1):
for i in range(start, stop - self._seq_len + 1, self._stride):
output.append(self._data[i : (i + self._seq_len)])
return np.stack(output)
if idx >= len(self):
Expand Down
4 changes: 2 additions & 2 deletions numalogic/udfs/postprocess.py
Original file line number Diff line number Diff line change
Expand Up @@ -328,9 +328,9 @@ def compute(
scores = cls.compute_threshold(model, input_) # (seqlen x nfeat)
win_scores = cls.compute_feature_scores(
scores, win_agg_conf=score_conf.window_agg
) # (seqlen x nfeat)
) # (nfeat,)
if postproc_tx:
win_scores = cls.compute_postprocess(postproc_tx, win_scores) # (seqlen x nfeat)
win_scores = cls.compute_postprocess(postproc_tx, win_scores) # (nfeat,)
return win_scores # (nfeat, )

@classmethod
Expand Down
2 changes: 1 addition & 1 deletion numalogic/udfs/trainer/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ def compute(
stateful=any(_conf.stateful for _conf in numalogic_cfg.preprocess),
)

train_ds = StreamingDataset(input_, model.seq_len)
train_ds = StreamingDataset(input_, model.seq_len, stride=trainer_cfg.ds_stride)
trainer = TimeseriesTrainer(**asdict(trainer_cfg.pltrainer_conf))
trainer.fit(
model, train_dataloaders=DataLoader(train_ds, batch_size=trainer_cfg.batch_size)
Expand Down
124 changes: 80 additions & 44 deletions tests/tools/test_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@
import unittest

import numpy as np
import pytest
import torch
from numpy.testing import assert_allclose
from numpy.testing import assert_allclose, assert_array_equal
from torch.testing import assert_close
from torch.utils.data import DataLoader

Expand All @@ -22,40 +23,40 @@
RNG = np.random.default_rng(42)


class TestStreamingDataset(unittest.TestCase):
data = None
m = None
n = None
@pytest.fixture
def setup():
m, n = 30, 3
return np.arange(30 * 3).reshape(30, 3).astype(np.float32), m, n

@classmethod
def setUpClass(cls) -> None:
cls.m = 30
cls.n = 3
cls.data = np.arange(cls.m * cls.n).reshape(30, 3)

def test_dataset(self):
dataset = StreamingDataset(self.data, seq_len=SEQ_LEN)
class TestStreamingDataset:
def test_dataset(self, setup):
data, m, n = setup
dataset = StreamingDataset(data, seq_len=SEQ_LEN)
for seq in dataset:
self.assertTupleEqual((SEQ_LEN, self.n), seq.shape)
self.assertEqual(self.data.shape[0] - SEQ_LEN + 1, len(dataset))
assert_allclose(np.ravel(dataset[0]), np.ravel(self.data[:12, :]))
assert_allclose(self.data, dataset.data)

def test_dataset_getitem(self):
ds = StreamingDataset(self.data, seq_len=SEQ_LEN)
self.assertEqual(len(self.data) - SEQ_LEN + 1, len(ds))
self.assertTupleEqual((15 - SEQ_LEN + 1, SEQ_LEN, self.n), ds[:15].shape)
self.assertTupleEqual((1, SEQ_LEN, self.n), ds[3:15].shape)
self.assertTupleEqual((self.m - SEQ_LEN + 1, SEQ_LEN, self.n), ds[:50].shape)

def test_as_array(self):
ds = StreamingDataset(self.data, seq_len=SEQ_LEN)
self.assertTupleEqual((self.m - SEQ_LEN + 1, SEQ_LEN, self.n), ds.as_array().shape)

def test_w_dataloader_01(self):
assert (SEQ_LEN, n) == seq.shape
assert (data.shape[0] - SEQ_LEN + 1) == len(dataset)
assert_allclose(np.ravel(dataset[0]), np.ravel(data[:12, :]))
assert_allclose(data, dataset.data)

def test_dataset_getitem(self, setup):
data, m, n = setup
ds = StreamingDataset(data, seq_len=SEQ_LEN)
assert (len(data) - SEQ_LEN + 1) == len(ds)
assert (15 - SEQ_LEN + 1, SEQ_LEN, n) == ds[:15].shape
assert (1, SEQ_LEN, n) == ds[3:15].shape
assert (m - SEQ_LEN + 1, SEQ_LEN, n) == ds[:50].shape

def test_as_array(self, setup):
data, m, n = setup
ds = StreamingDataset(data, seq_len=SEQ_LEN)
assert (m - SEQ_LEN + 1, SEQ_LEN, n) == ds.as_array().shape

def test_w_dataloader_01(self, setup):
data, m, n = setup
batch_size = 4
dl = DataLoader(
StreamingDataset(self.data, seq_len=SEQ_LEN),
StreamingDataset(data, seq_len=SEQ_LEN),
batch_size=batch_size,
num_workers=1,
drop_last=True,
Expand All @@ -64,28 +65,63 @@ def test_w_dataloader_01(self):
assert_close(batch[0, 1, :], batch[1, 0, :])
assert_close(batch[2, 1, :], batch[3, 0, :])

with self.assertRaises(NotImplementedError):
with pytest.raises(NotImplementedError):
dl = DataLoader(
StreamingDataset(self.data, seq_len=SEQ_LEN),
StreamingDataset(data, seq_len=SEQ_LEN),
batch_size=batch_size,
drop_last=True,
num_workers=2,
)
for _ in dl:
pass

def test_dataset_err_01(self):
with self.assertRaises(ValueError):
StreamingDataset(self.data, seq_len=self.m + 1)

def test_dataset_err_02(self):
dataset = StreamingDataset(self.data, seq_len=SEQ_LEN)
with self.assertRaises(IndexError):
_ = dataset[self.m - 5]

def test_dataset_err_03(self):
with self.assertRaises(InvalidDataShapeError):
StreamingDataset(self.data.ravel(), seq_len=SEQ_LEN)
def test_dataset_err_01(self, setup):
data, m, _ = setup
with pytest.raises(ValueError):
StreamingDataset(data, seq_len=m + 1)

def test_dataset_err_02(self, setup):
data, m, _ = setup
dataset = StreamingDataset(data, seq_len=SEQ_LEN)
with pytest.raises(IndexError):
_ = dataset[m - 5]

def test_dataset_err_03(self, setup):
data, _, _ = setup
with pytest.raises(InvalidDataShapeError):
StreamingDataset(data.ravel(), seq_len=SEQ_LEN)

def test_ds_with_stride(self, setup):
data, m, n = setup
stride = 4
dataset = StreamingDataset(data, seq_len=SEQ_LEN, stride=stride)
assert (m - SEQ_LEN) // stride + 1 == len(dataset)
for idx, seq in enumerate(dataset):
assert (SEQ_LEN, n) == seq.shape
assert_array_equal(seq[0], data[idx * stride])

assert (len(dataset), SEQ_LEN, n) == dataset.as_array().shape
assert (len(dataset), SEQ_LEN, n) == dataset.as_tensor().shape

def test_ds_with_stride_err(self, setup):
data, _, _ = setup
with pytest.raises(ValueError):
StreamingDataset(data, seq_len=SEQ_LEN, stride=20)

def test_ds_with_stride_dataloader(self, setup):
data, m, n = setup
stride = 2
dataset = StreamingDataset(data, seq_len=SEQ_LEN, stride=stride)
batch_size = 5
dl = DataLoader(
dataset,
batch_size=batch_size,
)
total_samples = 0
for batch in dl:
total_samples += batch.shape[0]
assert (batch_size, SEQ_LEN, n) == batch.shape
assert total_samples == len(dataset)


class TestStreamingDataLoader(unittest.TestCase):
Expand Down
Loading