Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

expose tcmf num_workers to zouwu #2884

Merged
merged 11 commits into from
Sep 28, 2020
8 changes: 8 additions & 0 deletions pyzoo/dev/run-pytests-horovod-pytorch
Original file line number Diff line number Diff line change
Expand Up @@ -30,3 +30,11 @@ if [ $exit_status_1 -ne 0 ];
then
exit $exit_status_1
fi

echo "Running zouwu tcmf distributed test"
python -m pytest -v ../test/zoo/zouwu/model/forecast/test_tcmf_forecaster.py::TestZouwuModelTCMFForecaster::test_forecast_tcmf_distributed
exit_status_2=$?
if [ $exit_status_2 -ne 0 ];
then
exit $exit_status_2
fi
3 changes: 2 additions & 1 deletion pyzoo/dev/run-pytests-ray
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,8 @@ then
fi

echo "Running zouwu tests"
python -m pytest -v ../test/zoo/zouwu
python -m pytest -v ../test/zoo/zouwu \
-k "not test_forecast_tcmf_distributed"
exit_status_4=$?
if [ $exit_status_4 -ne 0 ];
then
Expand Down
76 changes: 76 additions & 0 deletions pyzoo/test/zoo/zouwu/model/forecast/test_lstm_forecaster.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
#
# Copyright 2018 Analytics Zoo Authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

import pytest
import numpy as np
from zoo.automl.feature.time_sequence import TimeSequenceFeatureTransformer
import tensorflow as tf
import pandas as pd

from zoo.zouwu.model.forecast.lstm_forecaster import LSTMForecaster
from unittest import TestCase


class TestZouwuModelLSTMForecaster(TestCase):

def setUp(self):
tf.keras.backend.clear_session()
self.ft = TimeSequenceFeatureTransformer()
self.create_data()

def tearDown(self):
pass

def create_data(self):
def gen_train_sample(data, past_seq_len, future_seq_len):
data = pd.DataFrame(data)
x, y = self.ft._roll_train(data,
past_seq_len=past_seq_len,
future_seq_len=future_seq_len
)
return x, y

def gen_test_sample(data, past_seq_len):
test_data = pd.DataFrame(data)
x = self.ft._roll_test(test_data, past_seq_len=past_seq_len)
return x

self.long_num = 6
self.time_step = 2
look_back = (self.long_num + 1) * self.time_step
look_forward = 1
self.x_train, self.y_train = gen_train_sample(data=np.random.randn(
64, 4), past_seq_len=look_back, future_seq_len=look_forward)
self.x_val, self.y_val = gen_train_sample(data=np.random.randn(16, 4),
past_seq_len=look_back,
future_seq_len=look_forward)
self.x_test = gen_test_sample(data=np.random.randn(16, 4),
past_seq_len=look_back)

def test_forecast_lstm(self):
# TODO hacking to fix a bug
model = LSTMForecaster(target_dim=1, feature_dim=self.x_train.shape[-1])
model.fit(self.x_train,
self.y_train,
validation_data=(self.x_val, self.y_val),
batch_size=8,
distributed=False)
model.evaluate(self.x_val, self.y_val)
model.predict(self.x_test)


if __name__ == "__main__":
pytest.main([__file__])
84 changes: 84 additions & 0 deletions pyzoo/test/zoo/zouwu/model/forecast/test_mtnet_forecaster.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
#
# Copyright 2018 Analytics Zoo Authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

import pytest
import numpy as np
from zoo.automl.feature.time_sequence import TimeSequenceFeatureTransformer
import tensorflow as tf
import pandas as pd

from zoo.zouwu.model.forecast.mtnet_forecaster import MTNetForecaster
from unittest import TestCase


class TestZouwuModelMTNetForecaster(TestCase):

def setUp(self):
tf.keras.backend.clear_session()
self.ft = TimeSequenceFeatureTransformer()
self.create_data()

def tearDown(self):
pass

def create_data(self):
def gen_train_sample(data, past_seq_len, future_seq_len):
data = pd.DataFrame(data)
x, y = self.ft._roll_train(data,
past_seq_len=past_seq_len,
future_seq_len=future_seq_len
)
return x, y

def gen_test_sample(data, past_seq_len):
test_data = pd.DataFrame(data)
x = self.ft._roll_test(test_data, past_seq_len=past_seq_len)
return x

self.long_num = 6
self.time_step = 2
look_back = (self.long_num + 1) * self.time_step
look_forward = 1
self.x_train, self.y_train = gen_train_sample(data=np.random.randn(
64, 4), past_seq_len=look_back, future_seq_len=look_forward)
self.x_val, self.y_val = gen_train_sample(data=np.random.randn(16, 4),
past_seq_len=look_back,
future_seq_len=look_forward)
self.x_test = gen_test_sample(data=np.random.randn(16, 4),
past_seq_len=look_back)

def test_forecast_mtnet(self):
# TODO hacking to fix a bug
model = MTNetForecaster(target_dim=1,
feature_dim=self.x_train.shape[-1],
long_series_num=self.long_num,
series_length=self.time_step
)
x_train_long, x_train_short = model.preprocess_input(self.x_train)
x_val_long, x_val_short = model.preprocess_input(self.x_val)
x_test_long, x_test_short = model.preprocess_input(self.x_test)

model.fit([x_train_long, x_train_short],
self.y_train,
validation_data=([x_val_long, x_val_short], self.y_val),
batch_size=32,
distributed=False)
model.evaluate([x_val_long, x_val_short], self.y_val)
model.predict([x_test_long, x_test_short])


if __name__ == "__main__":
pytest.main([__file__])
Original file line number Diff line number Diff line change
Expand Up @@ -16,85 +16,14 @@

import pytest
import numpy as np
from test.zoo.pipeline.utils.test_utils import ZooTestCase
from zoo.automl.feature.time_sequence import TimeSequenceFeatureTransformer
import tensorflow as tf
import pandas as pd
from zoo.zouwu.model.forecast.tcmf_forecaster import TCMFForecaster
from unittest import TestCase
import tempfile

from zoo.zouwu.model.forecast.lstm_forecaster import LSTMForecaster
from zoo.zouwu.model.forecast.mtnet_forecaster import MTNetForecaster


class TestZouwuModelForecast(ZooTestCase):

def setup_method(self, method):
tf.keras.backend.clear_session()
# super(TestZouwuModelForecast, self).setup_method(method)
self.ft = TimeSequenceFeatureTransformer()
self.create_data()

def teardown_method(self, method):
pass

def create_data(self):
def gen_train_sample(data, past_seq_len, future_seq_len):
data = pd.DataFrame(data)
x, y = self.ft._roll_train(data,
past_seq_len=past_seq_len,
future_seq_len=future_seq_len
)
return x, y

def gen_test_sample(data, past_seq_len):
test_data = pd.DataFrame(data)
x = self.ft._roll_test(test_data, past_seq_len=past_seq_len)
return x

self.long_num = 6
self.time_step = 2
look_back = (self.long_num + 1) * self.time_step
look_forward = 1
self.x_train, self.y_train = gen_train_sample(data=np.random.randn(
64, 4), past_seq_len=look_back, future_seq_len=look_forward)
self.x_val, self.y_val = gen_train_sample(data=np.random.randn(16, 4),
past_seq_len=look_back,
future_seq_len=look_forward)
self.x_test = gen_test_sample(data=np.random.randn(16, 4),
past_seq_len=look_back)

def test_forecast_lstm(self):
# TODO hacking to fix a bug
model = LSTMForecaster(target_dim=1, feature_dim=self.x_train.shape[-1])
model.fit(self.x_train,
self.y_train,
validation_data=(self.x_val, self.y_val),
batch_size=8,
distributed=False)
model.evaluate(self.x_val, self.y_val)
model.predict(self.x_test)

def test_forecast_mtnet(self):
# TODO hacking to fix a bug
model = MTNetForecaster(target_dim=1,
feature_dim=self.x_train.shape[-1],
long_series_num=self.long_num,
series_length=self.time_step
)
x_train_long, x_train_short = model.preprocess_input(self.x_train)
x_val_long, x_val_short = model.preprocess_input(self.x_val)
x_test_long, x_test_short = model.preprocess_input(self.x_test)

model.fit([x_train_long, x_train_short],
self.y_train,
validation_data=([x_val_long, x_val_short], self.y_val),
batch_size=32,
distributed=False)
model.evaluate([x_val_long, x_val_short], self.y_val)
model.predict([x_test_long, x_test_short])
class TestZouwuModelTCMFForecaster(TestCase):

def test_forecast_tcmf(self):
from zoo.zouwu.model.forecast.tcmf_forecaster import TCMFForecaster
import tempfile
model = TCMFForecaster(y_iters=1,
init_FX_epoch=1,
max_FX_epoch=1,
Expand Down Expand Up @@ -137,8 +66,6 @@ def test_forecast_tcmf(self):
assert model.evaluate(x=None, target_value=target_value, metric=['mse'])

def test_forecast_tcmf_without_id(self):
from zoo.zouwu.model.forecast.tcmf_forecaster import TCMFForecaster
import tempfile
model = TCMFForecaster(y_iters=1,
init_FX_epoch=1,
max_FX_epoch=1,
Expand Down Expand Up @@ -182,10 +109,9 @@ def test_forecast_tcmf_without_id(self):
model.evaluate(x=None, target_value=target_value, metric=['mse'])

def test_forecast_tcmf_xshards(self):
from zoo.zouwu.model.forecast.tcmf_forecaster import TCMFForecaster
from zoo.orca import OrcaContext
import zoo.orca.data.pandas
import tempfile
import pandas as pd
OrcaContext.pandas_read_backend = "pandas"

def preprocessing(df, id_name, y_name):
Expand Down Expand Up @@ -263,6 +189,40 @@ def get_pred(d):
assert final_df.shape == (300 * horizon, 3)
OrcaContext.pandas_read_backend = "spark"

def test_forecast_tcmf_distributed(self):
model = TCMFForecaster(y_iters=1,
init_FX_epoch=1,
max_FX_epoch=1,
max_TCN_epoch=1,
alt_iters=2)
horizon = np.random.randint(1, 50)
# construct data
id = np.arange(300)
data = np.random.rand(300, 480)
input = dict({'id': id, 'y': data})

from zoo.orca import init_orca_context, stop_orca_context

init_orca_context(cores=4, spark_log_level="INFO", init_ray_on_spark=True,
object_store_memory="1g")
model.fit(input, num_workers=4)

with tempfile.TemporaryDirectory() as tempdirname:
model.save(tempdirname)
loaded_model = TCMFForecaster.load(tempdirname, distributed=False)
yhat = model.predict(x=None, horizon=horizon, num_workers=4)
yhat_loaded = loaded_model.predict(x=None, horizon=horizon, num_workers=4)
yhat_id = yhat_loaded["id"]
assert (yhat_id == id).all()
yhat = yhat["prediction"]
yhat_loaded = yhat_loaded["prediction"]
assert yhat.shape == (300, horizon)
np.testing.assert_equal(yhat, yhat_loaded)
target_value = np.random.rand(300, horizon)
target_value = dict({"y": target_value})
assert model.evaluate(x=None, target_value=target_value, metric=['mse'])
stop_orca_context()


if __name__ == "__main__":
pytest.main([__file__])
Loading