Skip to content

[BUG] CLI forecast command fails with pipeline ensembles #1331

Merged
merged 5 commits into from
Jul 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Remove upper limitation on version of numba ([#1321](https://github.com/tinkoff-ai/etna/pull/1321))

### Fixed
-
- Pipeline ensembles fail in `etna forecast` CLI ([#1331](https://github.com/tinkoff-ai/etna/pull/1331))
-
- Fix performance of `DeepARModel` and `TFTModel` ([#1322](https://github.com/tinkoff-ai/etna/pull/1322))
- `mrmr` feature selection working with categoricals ([#1311](https://github.com/tinkoff-ai/etna/pull/1311))
Expand Down
12 changes: 9 additions & 3 deletions etna/commands/forecast_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,14 @@ def compute_horizon(horizon: int, forecast_params: Dict[str, Any], tsdataset: TS
return horizon


def update_horizon(pipeline_configs: Dict[str, Any], forecast_params: Dict[str, Any], tsdataset: TSDataset):
"""Update the ``horizon`` parameter in the pipeline config if ``start_timestamp`` is set."""
for config in pipeline_configs.get("pipelines", [pipeline_configs]):
horizon: int = config["horizon"] # type: ignore
horizon = compute_horizon(horizon=horizon, forecast_params=forecast_params, tsdataset=tsdataset)
config["horizon"] = horizon # type: ignore


def filter_forecast(forecast_ts: TSDataset, forecast_params: Dict[str, Any]) -> TSDataset:
"""Filter out forecasts before `start_timestamp` if `start_timestamp` presented in `forecast_params`.."""
if "start_timestamp" in forecast_params:
Expand Down Expand Up @@ -122,9 +130,7 @@ def forecast(

tsdataset = TSDataset(df=df_timeseries, freq=freq, df_exog=df_exog, known_future=k_f)

horizon: int = pipeline_configs["horizon"] # type: ignore
horizon = compute_horizon(horizon=horizon, forecast_params=forecast_params, tsdataset=tsdataset)
pipeline_configs["horizon"] = horizon # type: ignore
update_horizon(pipeline_configs=pipeline_configs, forecast_params=forecast_params, tsdataset=tsdataset)

pipeline_args = remove_params(params=pipeline_configs, to_remove=ADDITIONAL_PIPELINE_PARAMETERS)
pipeline: Pipeline = hydra_slayer.get_from_params(**pipeline_args)
Expand Down
36 changes: 36 additions & 0 deletions tests/test_commands/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,42 @@ def base_pipeline_with_context_size_yaml_path():
tmp.close()


@pytest.fixture
def base_ensemble_yaml_path():
tmp = NamedTemporaryFile("w")
tmp.write(
"""
_target_: etna.ensembles.VotingEnsemble
pipelines:
- _target_: etna.pipeline.Pipeline
horizon: 4
model:
_target_: etna.models.SeasonalMovingAverageModel
seasonality: 4
window: 1
transforms: []
- _target_: etna.pipeline.Pipeline
horizon: 4
model:
_target_: etna.models.SeasonalMovingAverageModel
seasonality: 7
window: 2
transforms: []
- _target_: etna.pipeline.Pipeline
horizon: 4
model:
_target_: etna.models.SeasonalMovingAverageModel
seasonality: 7
window: 7
transforms: []
context_size: 49
"""
)
tmp.flush()
yield Path(tmp.name)
tmp.close()


@pytest.fixture
def elementary_linear_model_pipeline():
tmp = NamedTemporaryFile("w")
Expand Down
24 changes: 15 additions & 9 deletions tests/test_commands/test_backtest.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,14 +67,16 @@ def backtest_with_stride_yaml_path():
tmp.close()


def test_dummy_run(base_pipeline_yaml_path, base_backtest_yaml_path, base_timeseries_path):
@pytest.mark.parametrize("pipeline_path_name", ("base_pipeline_yaml_path", "base_ensemble_yaml_path"))
def test_dummy_run(pipeline_path_name, base_backtest_yaml_path, base_timeseries_path, request):
tmp_output = TemporaryDirectory()
tmp_output_path = Path(tmp_output.name)
pipeline_path = request.getfixturevalue(pipeline_path_name)
run(
[
"etna",
"backtest",
str(base_pipeline_yaml_path),
str(pipeline_path),
str(base_backtest_yaml_path),
str(base_timeseries_path),
"D",
Expand All @@ -85,16 +87,18 @@ def test_dummy_run(base_pipeline_yaml_path, base_backtest_yaml_path, base_timese
assert Path.exists(tmp_output_path / file_name)


@pytest.mark.parametrize("pipeline_path_name", ("base_pipeline_yaml_path", "base_ensemble_yaml_path"))
def test_dummy_run_with_exog(
base_pipeline_yaml_path, base_backtest_yaml_path, base_timeseries_path, base_timeseries_exog_path
pipeline_path_name, base_backtest_yaml_path, base_timeseries_path, base_timeseries_exog_path, request
):
tmp_output = TemporaryDirectory()
tmp_output_path = Path(tmp_output.name)
pipeline_path = request.getfixturevalue(pipeline_path_name)
run(
[
"etna",
"backtest",
str(base_pipeline_yaml_path),
str(pipeline_path),
str(base_backtest_yaml_path),
str(base_timeseries_path),
"D",
Expand Down Expand Up @@ -126,24 +130,26 @@ def test_forecast_format(base_pipeline_yaml_path, base_backtest_yaml_path, base_


@pytest.mark.parametrize(
"backtest_config_path_name,expected",
"pipeline_path_name,backtest_config_path_name,expected",
(
("backtest_with_folds_estimation_yaml_path", 24),
("backtest_with_stride_yaml_path", 1),
("base_pipeline_with_context_size_yaml_path", "backtest_with_folds_estimation_yaml_path", 24),
("base_ensemble_yaml_path", "backtest_with_folds_estimation_yaml_path", 12),
("base_pipeline_with_context_size_yaml_path", "backtest_with_stride_yaml_path", 1),
),
)
def test_backtest_estimate_n_folds(
base_pipeline_with_context_size_yaml_path, backtest_config_path_name, base_timeseries_path, expected, request
pipeline_path_name, backtest_config_path_name, base_timeseries_path, expected, request
):
backtest_config_path = request.getfixturevalue(backtest_config_path_name)
pipeline_path = request.getfixturevalue(pipeline_path_name)

tmp_output = TemporaryDirectory()
tmp_output_path = Path(tmp_output.name)
run(
[
"etna",
"backtest",
str(base_pipeline_with_context_size_yaml_path),
str(pipeline_path),
str(backtest_config_path),
str(base_timeseries_path),
"D",
Expand Down
64 changes: 49 additions & 15 deletions tests/test_commands/test_forecast.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,17 @@
from subprocess import run
from tempfile import NamedTemporaryFile

import hydra_slayer
import numpy as np
import pandas as pd
import pytest
from omegaconf import OmegaConf

from etna.commands.forecast_command import ADDITIONAL_PIPELINE_PARAMETERS
from etna.commands.forecast_command import compute_horizon
from etna.commands.forecast_command import filter_forecast
from etna.commands.forecast_command import update_horizon
from etna.commands.utils import remove_params
from etna.datasets import TSDataset


Expand Down Expand Up @@ -59,14 +64,16 @@ def base_forecast_with_folds_estimation_omegaconf_path():
tmp.close()


def test_dummy_run_with_exog(base_pipeline_yaml_path, base_timeseries_path, base_timeseries_exog_path):
@pytest.mark.parametrize("pipeline_path_name", ("base_pipeline_yaml_path", "base_ensemble_yaml_path"))
def test_dummy_run_with_exog(pipeline_path_name, base_timeseries_path, base_timeseries_exog_path, request):
tmp_output = NamedTemporaryFile("w")
tmp_output_path = Path(tmp_output.name)
pipeline_path = request.getfixturevalue(pipeline_path_name)
run(
[
"etna",
"forecast",
str(base_pipeline_yaml_path),
str(pipeline_path),
str(base_timeseries_path),
"D",
str(tmp_output_path),
Expand Down Expand Up @@ -103,16 +110,18 @@ def test_dummy_run(base_pipeline_yaml_path, base_timeseries_path):
assert len(df_output) == 2 * 4


@pytest.mark.parametrize("pipeline_path_name", ("base_pipeline_yaml_path", "base_ensemble_yaml_path"))
def test_run_with_predictive_intervals(
base_pipeline_yaml_path, base_timeseries_path, base_timeseries_exog_path, base_forecast_omegaconf_path
pipeline_path_name, base_timeseries_path, base_timeseries_exog_path, base_forecast_omegaconf_path, request
):
tmp_output = NamedTemporaryFile("w")
tmp_output_path = Path(tmp_output.name)
pipeline_path = request.getfixturevalue(pipeline_path_name)
run(
[
"etna",
"forecast",
str(base_pipeline_yaml_path),
str(pipeline_path),
str(base_timeseries_path),
"D",
str(tmp_output_path),
Expand Down Expand Up @@ -213,24 +222,45 @@ def test_filter_forecast(forecast_params, expected, example_tsds):


@pytest.mark.parametrize(
"model_pipeline",
[
"elementary_linear_model_pipeline",
"elementary_boosting_model_pipeline",
],
"forecast_params,pipeline_path_name,expected",
(
({"start_timestamp": "2020-04-10"}, "base_pipeline_with_context_size_yaml_path", 4),
({"start_timestamp": "2020-04-12"}, "base_pipeline_with_context_size_yaml_path", 6),
({"start_timestamp": "2020-04-11"}, "base_ensemble_yaml_path", 5),
),
)
def test_update_horizon(pipeline_path_name, forecast_params, example_tsds, expected, request):
pipeline_path = request.getfixturevalue(pipeline_path_name)
pipeline_conf = OmegaConf.to_object(OmegaConf.load(pipeline_path))

update_horizon(pipeline_configs=pipeline_conf, forecast_params=forecast_params, tsdataset=example_tsds)

pipeline_conf = remove_params(params=pipeline_conf, to_remove=ADDITIONAL_PIPELINE_PARAMETERS)
pipeline = hydra_slayer.get_from_params(**pipeline_conf)

assert pipeline.horizon == expected


@pytest.mark.parametrize(
"pipeline_path_name",
("base_pipeline_with_context_size_yaml_path", "base_ensemble_yaml_path"),
)
def test_forecast_start_timestamp(
model_pipeline, base_timeseries_path, base_timeseries_exog_path, start_timestamp_forecast_omegaconf_path, request
pipeline_path_name,
base_timeseries_path,
base_timeseries_exog_path,
start_timestamp_forecast_omegaconf_path,
request,
):
tmp_output = NamedTemporaryFile("w")
tmp_output_path = Path(tmp_output.name)
model_pipeline = request.getfixturevalue(model_pipeline)
pipeline_path = request.getfixturevalue(pipeline_path_name)

run(
[
"etna",
"forecast",
str(model_pipeline),
str(pipeline_path),
str(base_timeseries_path),
"D",
str(tmp_output_path),
Expand All @@ -240,24 +270,28 @@ def test_forecast_start_timestamp(
)
df_output = pd.read_csv(tmp_output_path)

assert len(df_output) == 3 * 2 # 3 predictions for 2 segments
assert len(df_output) == 4 * 2 # 4 predictions for 2 segments
assert df_output["timestamp"].min() == "2021-09-10" # start_timestamp
assert not np.any(df_output.isna().values)


@pytest.mark.parametrize("pipeline_path_name", ("base_pipeline_with_context_size_yaml_path", "base_ensemble_yaml_path"))
def test_forecast_estimate_n_folds(
base_pipeline_with_context_size_yaml_path,
pipeline_path_name,
base_forecast_with_folds_estimation_omegaconf_path,
base_timeseries_path,
base_timeseries_exog_path,
request,
):
tmp_output = NamedTemporaryFile("w")
tmp_output_path = Path(tmp_output.name)
pipeline_path = request.getfixturevalue(pipeline_path_name)

run(
[
"etna",
"forecast",
str(base_pipeline_with_context_size_yaml_path),
str(pipeline_path),
str(base_timeseries_path),
"D",
str(tmp_output_path),
Expand Down