diff --git a/python/chronos/src/bigdl/chronos/data/tsdataset.py b/python/chronos/src/bigdl/chronos/data/tsdataset.py index 3b8c6a8201d..4041cd999f1 100644 --- a/python/chronos/src/bigdl/chronos/data/tsdataset.py +++ b/python/chronos/src/bigdl/chronos/data/tsdataset.py @@ -832,6 +832,7 @@ def to_tf_dataset(self, batch_size=32, shuffle=False): "Please call 'roll' method " "before transform a TSDataset to tf dataset!") data = tf.data.Dataset.from_tensor_slices((self.numpy_x, self.numpy_y)) + batch_size = 32 if batch_size is None else batch_size if shuffle: data = data.cache().shuffle(self.numpy_x.shape[0]).batch(batch_size) else: diff --git a/python/chronos/src/bigdl/chronos/forecaster/tf/base_forecaster.py b/python/chronos/src/bigdl/chronos/forecaster/tf/base_forecaster.py index 16cfd210fe7..440707e4264 100644 --- a/python/chronos/src/bigdl/chronos/forecaster/tf/base_forecaster.py +++ b/python/chronos/src/bigdl/chronos/forecaster/tf/base_forecaster.py @@ -15,8 +15,11 @@ # from bigdl.chronos.forecaster.abstract import Forecaster +from bigdl.chronos.data import TSDataset from bigdl.chronos.metric.forecast_metrics import Evaluator import keras +import tensorflow as tf +import numpy as np class BaseTF2Forecaster(Forecaster): @@ -44,11 +47,22 @@ def fit(self, data, epochs=1, batch_size=32): | A TFDataset instance which contains x and y with same shape as the tuple. | x's shape is (num_samples, lookback, feature_dim), | y's shape is (num_samples, horizon, target_dim). + | + | 3. A bigdl.chronos.data.tsdataset.TSDataset instance. + | Forecaster will automatically process the TSDataset. + | By default, TSDataset will be transformed to a tfdataset, + | Users may call `roll` on the TSDataset before calling `fit` + | Then the training speed will be faster but will consume more memory. :params epochs: Number of epochs you want to train. The value defaults to 1. :params batch_size: Number of batch size you want to train. The value defaults to 32. Do not specify the batch_size, if your data in the form of tf.data datasets. """ + if isinstance(data, TSDataset): + if data.lookback is None: + data.roll(lookback=self.model_config['past_seq_len'], + horizon=self.model_config['future_seq_len']) + data = data.to_tf_dataset(shuffle=True, batch_size=batch_size) if isinstance(data, tuple): self.internal.fit(x=data[0], y=data[1], epochs=epochs, batch_size=batch_size) else: @@ -62,6 +76,18 @@ def predict(self, data, batch_size=32): | 1. a numpy ndarray x: | x's shape is (num_samples, lookback, feature_dim) where lookback and feature_dim | should be the same as past_seq_len and input_feature_num. + | 2. a tfdataset + | A TFDataset instance which contains x and y with same shape as the tuple. + | the tfdataset needs to return at least x in each iteration + | with the shape as following: + | x's shape is (num_samples, lookback, feature_dim) where lookback and feature_dim + | should be the same as past_seq_len and input_feature_num. + | If returns x and y only get x. + | 3. A bigdl.chronos.data.tsdataset.TSDataset instance + | Forecaster will automatically process the TSDataset. + | By default, TSDataset will be transformed to a tfdataset, + | Users may call `roll` on the TSDataset before calling `fit` + | Then the training speed will be faster but will consume more memory. :params batch_size: predict batch size. The value will not affect evaluate result but will affect resources cost(e.g. memory and time). @@ -74,7 +100,13 @@ def predict(self, data, batch_size=32): if not self.fitted: invalidInputError(False, "You must call fit or restore first before calling predict!") - if batch_size: + if isinstance(data, TSDataset): + if data.lookback is None: + data.roll(lookback=self.model_config['past_seq_len'], + horizon=self.model_config['future_seq_len']) + data = data.to_tf_dataset(shuffle=False, batch_size=batch_size) + + if batch_size or isinstance(data, tf.data.Dataset): yhat = self.internal.predict(data, batch_size=batch_size) else: yhat = self.internal(data, training=False).numpy() @@ -99,6 +131,15 @@ def evaluate(self, data, batch_size=32, multioutput="raw_values"): | should be the same as past_seq_len and input_feature_num. | y's shape is (num_samples, horizon, target_dim), where horizon and target_dim | should be the same as future_seq_len and output_feature_num. + | 2. a tfdataset + | A TFDataset instance which contains x and y with same shape as the tuple. + | x's shape is (num_samples, lookback, feature_dim), + | y's shape is (num_samples, horizon, target_dim). + | 3. A bigdl.chronos.data.tsdataset.TSDataset instance + | Forecaster will automatically process the TSDataset. + | By default, TSDataset will be transformed to a tfdataset, + | Users may call `roll` on the TSDataset before calling `fit` + | Then the training speed will be faster but will consume more memory. :params batch_size: evaluate batch size. The value will not affect evaluate result but will affect resources cost(e.g. memory and time). @@ -113,10 +154,21 @@ def evaluate(self, data, batch_size=32, multioutput="raw_values"): if not self.fitted: invalidInputError(False, "You must call fit or restore first before calling evaluate!") - yhat = self.internal.predict(data[0], batch_size=batch_size) + if isinstance(data, TSDataset): + if data.lookback is None: + data.roll(lookback=self.model_config['past_seq_len'], + horizon=self.model_config['future_seq_len']) + data = data.to_tf_dataset(shuffle=False, batch_size=batch_size) + + if isinstance(data, tuple): + input_data, target = data + else: + input_data = data + target = np.asarray(tuple(map(lambda x: x[1], data.as_numpy_iterator()))) + yhat = self.internal.predict(input_data, batch_size=batch_size) aggregate = 'mean' if multioutput == 'uniform_average' else None - return Evaluator.evaluate(self.metrics, y_true=data[1], y_pred=yhat, aggregate=aggregate) + return Evaluator.evaluate(self.metrics, y_true=target, y_pred=yhat, aggregate=aggregate) def save(self, checkpoint_file): """ @@ -139,3 +191,66 @@ def load(self, checkpoint_file): self.internal = keras.models.load_model(checkpoint_file, custom_objects=self.custom_objects_config) self.fitted = True + + @classmethod + def from_tsdataset(cls, tsdataset, past_seq_len=None, future_seq_len=None, **kwargs): + """ + Build a Forecaster Model + + :param tsdataset: A bigdl.chronos.data.tsdataset.TSDataset instance. + :param past_seq_len: Specify history time step (i.e. lookback) + Do not specify the 'past_seq_len' if your tsdataset has called + the 'TSDataset.roll' method or 'TSDataset.to_tf_dataset'. + :param future_seq_len: Specify output time step (i.e. horizon) + Do not specify the 'future_seq_len' if your tsdataset has called + the 'TSDataset.roll' method or 'TSDataset.to_tf_dataset'. + :param kwargs: Specify parameters of Forecaster, + e.g. loss and optimizer, etc. + More info, please refer to Forecaster.__init__ methods. + + :return: A Forecaster Model + """ + from bigdl.nano.utils.log4Error import invalidInputError + + def check_time_steps(tsdataset, past_seq_len, future_seq_len): + if tsdataset.lookback and past_seq_len: + future_seq_len = future_seq_len if isinstance(future_seq_len, int)\ + else max(future_seq_len) + return tsdataset.lookback == past_seq_len and tsdataset.horizon == future_seq_len + return True + + invalidInputError(not tsdataset._has_generate_agg_feature, + "We will add support for 'gen_rolling_feature' method later.") + + if tsdataset.lookback: + past_seq_len = tsdataset.lookback + future_seq_len = tsdataset.horizon if isinstance(tsdataset.horizon, int) \ + else max(tsdataset.horizon) + output_feature_num = len(tsdataset.roll_target) + input_feature_num = len(tsdataset.roll_feature) + output_feature_num + elif past_seq_len and future_seq_len: + past_seq_len = past_seq_len if isinstance(past_seq_len, int)\ + else tsdataset.get_cycle_length() + future_seq_len = future_seq_len if isinstance(future_seq_len, int) \ + else max(future_seq_len) + output_feature_num = len(tsdataset.target_col) + input_feature_num = len(tsdataset.feature_col) + output_feature_num + else: + invalidInputError(False, + "Forecaster needs 'past_seq_len' and 'future_seq_len' " + "to specify the history time step of training.") + + invalidInputError(check_time_steps(tsdataset, past_seq_len, future_seq_len), + "tsdataset already has history time steps and " + "differs from the given past_seq_len and future_seq_len " + "Expected past_seq_len and future_seq_len to be " + f"{tsdataset.lookback, tsdataset.horizon}, " + f"but found {past_seq_len, future_seq_len}.", + fixMsg="Do not specify past_seq_len and future seq_len " + "or call tsdataset.roll method again and specify time step") + + return cls(past_seq_len=past_seq_len, + future_seq_len=future_seq_len, + input_feature_num=input_feature_num, + output_feature_num=output_feature_num, + **kwargs) diff --git a/python/chronos/src/bigdl/chronos/forecaster/tf/lstm_forecaster.py b/python/chronos/src/bigdl/chronos/forecaster/tf/lstm_forecaster.py index b69a8246d36..c11b4091f21 100644 --- a/python/chronos/src/bigdl/chronos/forecaster/tf/lstm_forecaster.py +++ b/python/chronos/src/bigdl/chronos/forecaster/tf/lstm_forecaster.py @@ -126,3 +126,55 @@ def __init__(self, # self.quantize_available = True # self.checkpoint_callback = False super(LSTMForecaster, self).__init__() + + @classmethod + def from_tsdataset(cls, tsdataset, past_seq_len=None, **kwargs): + """ + Build a LSTMForecaster Model + + :param tsdataset: A bigdl.chronos.data.tsdataset.TSDataset instance. + :param past_seq_len: past_seq_len: Specify the history time steps (i.e. lookback). + Do not specify the 'past_seq_len' if your tsdataset has called + the 'TSDataset.roll' method or 'TSDataset.to_tf_dataset'. + :param kwargs: Specify parameters of Forecaster, + e.g. loss and optimizer, etc. More info, please refer to + LSTMForecaster.__init__ methods. + + :return: A LSTMForecaster Model + """ + from bigdl.nano.utils.log4Error import invalidInputError + + def check_time_steps(tsdataset, past_seq_len): + if tsdataset.lookback and past_seq_len: + return tsdataset.lookback == past_seq_len + return True + + invalidInputError(not tsdataset._has_generate_agg_feature, + "We will add support for 'gen_rolling_feature' method later.") + + if tsdataset.lookback: + past_seq_len = tsdataset.lookback + output_feature_num = len(tsdataset.roll_target) + input_feature_num = len(tsdataset.roll_feature) + output_feature_num + elif past_seq_len: + past_seq_len = past_seq_len if isinstance(past_seq_len, int)\ + else tsdataset.get_cycle_length() + output_feature_num = len(tsdataset.target_col) + input_feature_num = len(tsdataset.feature_col) + output_feature_num + else: + invalidInputError(False, + "Forecaster needs 'past_seq_len' to specify " + "the history time step of training.") + + invalidInputError(check_time_steps(tsdataset, past_seq_len), + "tsdataset already has history time steps and " + "differs from the given past_seq_len " + f"Expected past_seq_len to be {tsdataset.lookback}, " + f"but found {past_seq_len}.", + fixMsg="Do not specify past_seq_len " + "or call tsdataset.roll method again and specify time step.") + + return cls(past_seq_len=past_seq_len, + input_feature_num=input_feature_num, + output_feature_num=output_feature_num, + **kwargs) diff --git a/python/chronos/test/bigdl/chronos/forecaster/tf/test_lstm_keras_forecaster.py b/python/chronos/test/bigdl/chronos/forecaster/tf/test_lstm_keras_forecaster.py index fd80a8092ea..5eeed0abc62 100644 --- a/python/chronos/test/bigdl/chronos/forecaster/tf/test_lstm_keras_forecaster.py +++ b/python/chronos/test/bigdl/chronos/forecaster/tf/test_lstm_keras_forecaster.py @@ -18,6 +18,7 @@ import tempfile import os +from bigdl.chronos.forecaster.tf.lstm_forecaster import LSTMForecaster from unittest import TestCase import numpy as np import tensorflow as tf @@ -50,28 +51,48 @@ def get_x_y(num_sample): return train_data, test_data +def create_tsdataset(roll=True): + from bigdl.chronos.data import TSDataset + import pandas as pd + timeseries = pd.date_range(start='2020-01-01', freq='D', periods=1000) + df = pd.DataFrame(np.random.rand(1000, 2), + columns=['value1', 'value2'], + index=timeseries, + dtype=np.float32) + df.reset_index(inplace=True) + df.rename(columns={'index': 'timeseries'}, inplace=True) + train, _, test = TSDataset.from_pandas(df=df, + dt_col='timeseries', + target_col=['value1', 'value2'], + with_split=True) + if roll: + for tsdata in [train, test]: + tsdata.roll(lookback=24, horizon=1) + return train, test + + @pytest.mark.skipif(tf.__version__ < '2.0.0', reason="Run only when tf > 2.0.0.") class TestLSTMForecaster(TestCase): def setUp(self): from bigdl.chronos.forecaster.tf.lstm_forecaster import LSTMForecaster - self. forecaster = LSTMForecaster(past_seq_len=10, - input_feature_num=10, - output_feature_num=2) + self.forecaster = LSTMForecaster(past_seq_len=10, + input_feature_num=10, + output_feature_num=2) def tearDown(self): - pass + del self.forecaster def test_lstm_forecaster_fit_predict_evaluate(self): train_data, test_data = create_data() self.forecaster.fit(train_data, - epochs=2, - batch_size=32) + epochs=2, + batch_size=32) yhat = self.forecaster.predict(test_data[0], - batch_size=32) + batch_size=32) assert yhat.shape == (400, 1, 2) mse = self.forecaster.evaluate(test_data, - batch_size=32, - multioutput="raw_values") + batch_size=32, + multioutput="raw_values") assert mse[0].shape == test_data[1].shape[1:] def test_lstm_forecaster_fit_tf_data(self): @@ -121,5 +142,35 @@ def customized_metric(y_true, y_pred): assert yhat.shape == (400, 1, 2) np.testing.assert_almost_equal(yhat, load_model_yhat, decimal=5) + def test_lstm_from_tsdataset(self): + train, test = create_tsdataset(roll=True) + lstm = LSTMForecaster.from_tsdataset(train, + hidden_dim=16, + layer_num=2) + lstm.fit(train, + epochs=2, + batch_size=32) + yhat = lstm.predict(test, batch_size=32) + test.roll(lookback=lstm.model_config['past_seq_len'], + horizon=lstm.model_config['future_seq_len']) + _, y_test = test.to_numpy() + assert yhat.shape == y_test.shape + + del lstm + + train, test = create_tsdataset(roll=False) + lstm = LSTMForecaster.from_tsdataset(train, + past_seq_len=24, + hidden_dim=16, + layer_num=2) + lstm.fit(train, + epochs=2, + batch_size=32) + yhat = lstm.predict(test, batch_size=None) + test.roll(lookback=lstm.model_config['past_seq_len'], + horizon=lstm.model_config['future_seq_len']) + _, y_test = test.to_numpy() + assert yhat.shape == y_test.shape + if __name__ == '__main__': pytest.main([__file__]) diff --git a/python/chronos/test/bigdl/chronos/forecaster/tf/test_seq2seq_keras_forecaster.py b/python/chronos/test/bigdl/chronos/forecaster/tf/test_seq2seq_keras_forecaster.py index 8ad140f421d..9352b05635a 100644 --- a/python/chronos/test/bigdl/chronos/forecaster/tf/test_seq2seq_keras_forecaster.py +++ b/python/chronos/test/bigdl/chronos/forecaster/tf/test_seq2seq_keras_forecaster.py @@ -20,6 +20,7 @@ from unittest import TestCase import numpy as np import tensorflow as tf +from bigdl.chronos.forecaster.tf.seq2seq_forecaster import Seq2SeqForecaster def create_data(tf_data=False, batch_size=32): @@ -49,6 +50,25 @@ def get_x_y(num_sample): .prefetch(tf.data.AUTOTUNE) return train_data, test_data +def create_tsdataset(roll=True): + from bigdl.chronos.data import TSDataset + import pandas as pd + timeseries = pd.date_range(start='2020-01-01', freq='D', periods=1000) + df = pd.DataFrame(np.random.rand(1000, 2), + columns=['value1', 'value2'], + index=timeseries, + dtype=np.float32) + df.reset_index(inplace=True) + df.rename(columns={'index': 'timeseries'}, inplace=True) + train, _, test = TSDataset.from_pandas(df=df, + dt_col='timeseries', + target_col=['value1', 'value2'], + with_split=True) + if roll: + for tsdata in [train, test]: + tsdata.roll(lookback=24, horizon=2) + return train, test + @pytest.mark.skipif(tf.__version__ < '2.0.0', reason="Run only when tf > 2.0.0.") class TestSeq2SeqForecaster(TestCase): @@ -61,7 +81,8 @@ def setUp(self): output_feature_num=2) def tearDown(self): - pass + del self.forecaster + def test_seq2seq_fit_predict_evaluate(self): train_data, test_data = create_data() self.forecaster.fit(train_data, @@ -121,5 +142,37 @@ def customized_metric(y_true, y_pred): assert yhat.shape == (400, 2, 2) np.testing.assert_almost_equal(yhat, load_model_yhat, decimal=5) + def test_s2s_from_tsdataset(self): + train, test = create_tsdataset(roll=True) + s2s = Seq2SeqForecaster.from_tsdataset(train, + lstm_hidden_dim=16, + lstm_layer_num=2) + s2s.fit(train, + epochs=2, + batch_size=32) + yhat = s2s.predict(test, batch_size=32) + test.roll(lookback=s2s.model_config['past_seq_len'], + horizon=s2s.model_config['future_seq_len']) + _, y_test = test.to_numpy() + assert yhat.shape == y_test.shape + + del s2s + + train, test = create_tsdataset(roll=False) + s2s = Seq2SeqForecaster.from_tsdataset(train, + past_seq_len=24, + future_seq_len=2, + lstm_hidden_dim=16, + lstm_layer_num=2) + s2s.fit(train, + epochs=2, + batch_size=32) + yhat = s2s.predict(test, batch_size=None) + test.roll(lookback=s2s.model_config['past_seq_len'], + horizon=s2s.model_config['future_seq_len']) + _, y_test = test.to_numpy() + assert yhat.shape == y_test.shape + + if __name__ == '__main__': pytest.main([__file__]) diff --git a/python/chronos/test/bigdl/chronos/forecaster/tf/test_tcn_keras_forecaster.py b/python/chronos/test/bigdl/chronos/forecaster/tf/test_tcn_keras_forecaster.py index 6b58c59608e..88d95edd601 100644 --- a/python/chronos/test/bigdl/chronos/forecaster/tf/test_tcn_keras_forecaster.py +++ b/python/chronos/test/bigdl/chronos/forecaster/tf/test_tcn_keras_forecaster.py @@ -21,6 +21,7 @@ from unittest import TestCase import numpy as np import tensorflow as tf +from bigdl.chronos.forecaster.tf.tcn_forecaster import TCNForecaster def create_data(tf_data=False, batch_size=32): @@ -51,6 +52,26 @@ def get_x_y(num_sample): return train_data, test_data +def create_tsdataset(roll=True): + from bigdl.chronos.data import TSDataset + import pandas as pd + timeseries = pd.date_range(start='2020-01-01', freq='D', periods=1000) + df = pd.DataFrame(np.random.rand(1000, 2), + columns=['value1', 'value2'], + index=timeseries, + dtype=np.float32) + df.reset_index(inplace=True) + df.rename(columns={'index': 'timeseries'}, inplace=True) + train, _, test = TSDataset.from_pandas(df=df, + dt_col='timeseries', + target_col=['value1', 'value2'], + with_split=True) + if roll: + for tsdata in [train, test]: + tsdata.roll(lookback=24, horizon=5) + return train, test + + @pytest.mark.skipif(tf.__version__ < '2.0.0', reason="Run only when tf > 2.0.0.") class TestTCNForecaster(TestCase): def setUp(self): @@ -62,7 +83,7 @@ def setUp(self): num_channels=[15]*7) def tearDown(self): - pass + del self.forecaster def test_tcn_forecaster_fit_predict_evaluate(self): train_data, test_data = create_data() @@ -126,5 +147,36 @@ def customized_metric(y_true, y_pred): assert yhat.shape == (400, 2, 2) np.testing.assert_almost_equal(yhat, load_model_yhat, decimal=5) + def test_tcn_from_tsdataset(self): + train, test = create_tsdataset(roll=True) + + tcn = TCNForecaster.from_tsdataset(train, + num_channels=[16]*2) + tcn.fit(train, + epochs=2, + batch_size=32) + yhat = tcn.predict(test, batch_size=32) + test.roll(lookback=tcn.model_config['past_seq_len'], + horizon=tcn.model_config['future_seq_len']) + _, y_test = test.to_numpy() + assert yhat.shape == y_test.shape + + del tcn + + train, test = create_tsdataset(roll=False) + tcn = TCNForecaster.from_tsdataset(train, + past_seq_len=24, + future_seq_len=5, + num_channels=[16]*2) + tcn.fit(train, + epochs=2, + batch_size=32) + yhat = tcn.predict(test, batch_size=None) + test.roll(lookback=tcn.model_config['past_seq_len'], + horizon=tcn.model_config['future_seq_len']) + _, y_test = test.to_numpy() + assert yhat.shape == y_test.shape + + if __name__ == '__main__': pytest.main([__file__])