From 54408795d63e4f198a99e2287c6d79d79274c438 Mon Sep 17 00:00:00 2001 From: alex-hse-repository <55380696+alex-hse-repository@users.noreply.github.com> Date: Wed, 27 Oct 2021 18:26:58 +0300 Subject: [PATCH] Confidence intervals -> Pipeline (#221) --- CHANGELOG.md | 1 + .../outliers/confidence_interval_outliers.py | 4 +- etna/ensembles/stacking_ensemble.py | 8 -- etna/models/prophet.py | 25 +++- etna/models/sarimax.py | 33 +++--- etna/pipeline/pipeline.py | 88 +++++++++++++- tests/test_models/test_prophet.py | 8 +- tests/test_models/test_sarimax_model.py | 8 +- tests/test_pipeline/conftest.py | 101 ++++++++++++++++ tests/test_pipeline/test_pipeline.py | 109 ++++++++++++++++++ 10 files changed, 341 insertions(+), 44 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 30bda200a..80b7585a9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -41,6 +41,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Add AutoRegressivePipeline ([#209](https://github.com/tinkoff-ai/etna-ts/pull/209)) - Ensembles notebook ([#218](https://github.com/tinkoff-ai/etna-ts/pull/218)) - Function plot_backtest_interactive ([#225](https://github.com/tinkoff-ai/etna-ts/pull/225)) +- Confidence intervals in Pipeline ([#221](https://github.com/tinkoff-ai/etna-ts/pull/221)) ### Changed - Delete offset from WindowStatisticsTransform ([#111](https://github.com/tinkoff-ai/etna-ts/pull/111)) diff --git a/etna/analysis/outliers/confidence_interval_outliers.py b/etna/analysis/outliers/confidence_interval_outliers.py index dac647c11..e8709b1d5 100644 --- a/etna/analysis/outliers/confidence_interval_outliers.py +++ b/etna/analysis/outliers/confidence_interval_outliers.py @@ -44,9 +44,9 @@ def get_anomalies_confidence_interval( """ outliers_per_segment = {} time_points = np.array(ts.index.values) - model_instance = model(interval_width=interval_width, **model_params) + model_instance = model(**model_params) model_instance.fit(ts) - confidence_interval = model_instance.forecast(deepcopy(ts), confidence_interval=True) + confidence_interval = model_instance.forecast(deepcopy(ts), confidence_interval=True, interval_width=interval_width) for segment in ts.segments: segment_slice = confidence_interval[:, segment, :][segment] anomalies_mask = (segment_slice["target"] > segment_slice["target_upper"]) | ( diff --git a/etna/ensembles/stacking_ensemble.py b/etna/ensembles/stacking_ensemble.py index dc36c5e97..255d66892 100644 --- a/etna/ensembles/stacking_ensemble.py +++ b/etna/ensembles/stacking_ensemble.py @@ -104,14 +104,6 @@ def _get_horizon(pipelines: List[Pipeline]) -> int: raise ValueError("All the pipelines should have the same horizon.") return horizons.pop() - @staticmethod - def _validate_cv(cv: int) -> int: - """Check that given number of folds is grater than 1.""" - if cv > 1: - return cv - else: - raise ValueError("At least two folds for backtest are expected.") - def _filter_features_to_use(self, forecasts: List[TSDataset]) -> Union[None, Set[str]]: """Return all the features from `features_to_use` which can be obtained from base models' forecasts.""" features_df = pd.concat([forecast.df for forecast in forecasts], axis=1) diff --git a/etna/models/prophet.py b/etna/models/prophet.py index 1164c5624..920802d2f 100644 --- a/etna/models/prophet.py +++ b/etna/models/prophet.py @@ -95,7 +95,7 @@ def fit(self, df: pd.DataFrame) -> "_ProphetModel": self.model.fit(prophet_df) return self - def predict(self, df: pd.DataFrame, confidence_interval: bool = False): + def predict(self, df: pd.DataFrame, confidence_interval: bool, interval_width: float): """ Compute Prophet predictions. Parameters @@ -104,6 +104,8 @@ def predict(self, df: pd.DataFrame, confidence_interval: bool = False): Features dataframe confidence_interval: If True returns confidence interval for forecast + interval_width: + The significance level for the confidence interval. By default a 95% confidence interval is taken Returns ------- y_pred: pd.DataFrame @@ -120,11 +122,14 @@ def predict(self, df: pd.DataFrame, confidence_interval: bool = False): else: prophet_column_name = column_name prophet_df[prophet_column_name] = df[column_name] + if confidence_interval: + self.model.interval_width = interval_width forecast = self.model.predict(prophet_df) if confidence_interval: y_pred = forecast[["yhat_lower", "yhat", "yhat_upper"]] else: y_pred = pd.DataFrame(forecast["yhat"]) + self.model.interval_width = self.interval_width return y_pred @@ -301,15 +306,21 @@ def __init__( ) @staticmethod - def _forecast_segment( - model, segment: Union[str, List[str]], ts: TSDataset, confidence_interval: bool = False + def _forecast_one_segment( + model, + segment: Union[str, List[str]], + ts: TSDataset, + confidence_interval: bool, + interval_width: float, ) -> pd.DataFrame: segment_features = ts[:, segment, :] segment_features = segment_features.droplevel("segment", axis=1) segment_features = segment_features.reset_index() dates = segment_features["timestamp"] dates.reset_index(drop=True, inplace=True) - segment_predict = model.predict(df=segment_features, confidence_interval=confidence_interval) + segment_predict = model.predict( + df=segment_features, confidence_interval=confidence_interval, interval_width=interval_width + ) segment_predict = segment_predict.rename( {"yhat": "target", "yhat_lower": "target_lower", "yhat_upper": "target_upper"}, axis=1 ) @@ -318,7 +329,7 @@ def _forecast_segment( return segment_predict @log_decorator - def forecast(self, ts: TSDataset, confidence_interval: bool = False) -> TSDataset: + def forecast(self, ts: TSDataset, confidence_interval: bool = False, interval_width: float = 0.95) -> TSDataset: """Make predictions. Parameters @@ -327,6 +338,8 @@ def forecast(self, ts: TSDataset, confidence_interval: bool = False) -> TSDatase Dataframe with features confidence_interval: If True returns confidence interval for forecast + interval_width: + The significance level for the confidence interval. By default a 95% confidence interval is taken Returns ------- TSDataset @@ -342,7 +355,7 @@ def forecast(self, ts: TSDataset, confidence_interval: bool = False) -> TSDatase for segment in self._segments: model = self._models[segment] - segment_predict = self._forecast_segment(model, segment, ts, confidence_interval) + segment_predict = self._forecast_one_segment(model, segment, ts, confidence_interval, interval_width) result_list.append(segment_predict) # need real case to test diff --git a/etna/models/sarimax.py b/etna/models/sarimax.py index 45c3c6ad8..2c019daab 100644 --- a/etna/models/sarimax.py +++ b/etna/models/sarimax.py @@ -55,7 +55,6 @@ def __init__( freq: Optional[str] = None, missing: str = "none", validate_specification: bool = True, - interval_width: float = 0.8, **kwargs, ): """ @@ -144,8 +143,6 @@ def __init__( If 'raise', an error is raised. Default is 'none'. validate_specification: If True, validation of hyperparameters is performed. - interval_width: - Float, width of the uncertainty intervals provided for the forecast. """ self.order = order self.seasonal_order = seasonal_order @@ -164,7 +161,6 @@ def __init__( self.freq = freq self.missing = missing self.validate_specification = validate_specification - self.interval_width = interval_width self.kwargs = kwargs self._model: Optional[SARIMAX] = None self._result: Optional[SARIMAX] = None @@ -234,7 +230,7 @@ def fit(self, df: pd.DataFrame) -> "_SARIMAXModel": self._result = self._model.fit(start_params=start_params, disp=False) return self - def predict(self, df: pd.DataFrame, confidence_interval: bool = False) -> pd.DataFrame: + def predict(self, df: pd.DataFrame, confidence_interval: bool, interval_width: float) -> pd.DataFrame: """ Compute predictions from a SARIMAX model. @@ -244,6 +240,8 @@ def predict(self, df: pd.DataFrame, confidence_interval: bool = False) -> pd.Dat Features dataframe confidence_interval: If True returns confidence interval for forecast + interval_width: + The significance level for the confidence interval. By default a 95% confidence interval is taken Returns ------- y_pred: pd.DataFrame @@ -266,7 +264,7 @@ def predict(self, df: pd.DataFrame, confidence_interval: bool = False) -> pd.Dat forecast = self._result.get_prediction( start=df["timestamp"].min(), end=df["timestamp"].max(), dynamic=False, exog=exog_future ) - y_pred = forecast.summary_frame(alpha=1 - self.interval_width)[["mean_ci_lower", "mean", "mean_ci_upper"]] + y_pred = forecast.summary_frame(alpha=1 - interval_width)[["mean_ci_lower", "mean", "mean_ci_upper"]] else: forecast = self._result.get_prediction( start=df["timestamp"].min(), end=df["timestamp"].max(), dynamic=True, exog=exog_future @@ -337,7 +335,6 @@ def __init__( freq: Optional[str] = None, missing: str = "none", validate_specification: bool = True, - interval_width: float = 0.8, **kwargs, ): """ @@ -426,8 +423,6 @@ def __init__( If 'raise', an error is raised. Default is 'none'. validate_specification: If True, validation of hyperparameters is performed. - interval_width: - Float, width of the uncertainty intervals provided for the forecast. """ self.order = order self.seasonal_order = seasonal_order @@ -446,7 +441,6 @@ def __init__( self.freq = freq self.missing = missing self.validate_specification = validate_specification - self.interval_width = interval_width self.kwargs = kwargs super(SARIMAXModel, self).__init__( base_model=_SARIMAXModel( @@ -467,21 +461,26 @@ def __init__( freq=self.freq, missing=self.missing, validate_specification=self.validate_specification, - interval_width=self.interval_width, **self.kwargs, ) ) @staticmethod - def _forecast_segment( - model, segment: Union[str, List[str]], ts: TSDataset, confidence_interval: bool = False + def _forecast_one_segment( + model, + segment: Union[str, List[str]], + ts: TSDataset, + confidence_interval: bool, + interval_width: float, ) -> pd.DataFrame: segment_features = ts[:, segment, :] segment_features = segment_features.droplevel("segment", axis=1) segment_features = segment_features.reset_index() dates = segment_features["timestamp"] dates.reset_index(drop=True, inplace=True) - segment_predict = model.predict(df=segment_features, confidence_interval=confidence_interval) + segment_predict = model.predict( + df=segment_features, confidence_interval=confidence_interval, interval_width=interval_width + ) segment_predict = segment_predict.rename( {"mean": "target", "mean_ci_lower": "target_lower", "mean_ci_upper": "target_upper"}, axis=1 ) @@ -490,7 +489,7 @@ def _forecast_segment( return segment_predict @log_decorator - def forecast(self, ts: TSDataset, confidence_interval: bool = False) -> TSDataset: + def forecast(self, ts: TSDataset, confidence_interval: bool = False, interval_width: float = 0.95) -> TSDataset: """Make predictions. Parameters ---------- @@ -498,6 +497,8 @@ def forecast(self, ts: TSDataset, confidence_interval: bool = False) -> TSDatase Dataframe with features confidence_interval: If True returns confidence interval for forecast + interval_width: + The significance level for the confidence interval. By default a 95% confidence interval is taken Returns ------- pd.DataFrame @@ -513,7 +514,7 @@ def forecast(self, ts: TSDataset, confidence_interval: bool = False) -> TSDatase for segment in self._segments: model = self._models[segment] - segment_predict = self._forecast_segment(model, segment, ts, confidence_interval) + segment_predict = self._forecast_one_segment(model, segment, ts, confidence_interval, interval_width) result_list.append(segment_predict) # need real case to test diff --git a/etna/pipeline/pipeline.py b/etna/pipeline/pipeline.py index 75c29ddb0..f6abfa71a 100644 --- a/etna/pipeline/pipeline.py +++ b/etna/pipeline/pipeline.py @@ -1,3 +1,4 @@ +import inspect from copy import deepcopy from enum import Enum from typing import Any @@ -8,12 +9,15 @@ from typing import Tuple import pandas as pd +import scipy from joblib import Parallel from joblib import delayed +from scipy.stats import norm from etna.core import BaseMixin from etna.datasets import TSDataset from etna.loggers import tslogger +from etna.metrics import MAE from etna.metrics import Metric from etna.metrics import MetricAggregationMode from etna.models.base import Model @@ -30,7 +34,14 @@ class CrossValidationMode(Enum): class Pipeline(BaseMixin): """Pipeline of transforms with a final estimator.""" - def __init__(self, model: Model, transforms: Iterable[Transform] = (), horizon: int = 1): + def __init__( + self, + model: Model, + transforms: Iterable[Transform] = (), + horizon: int = 1, + interval_width: float = 0.95, + confidence_interval_cv: int = 3, + ): """ Create instance of Pipeline with given parameters. @@ -42,12 +53,47 @@ def __init__(self, model: Model, transforms: Iterable[Transform] = (), horizon: Sequence of the transforms horizon: Number of timestamps in the future for forecasting + interval_width: + The significance level for the confidence interval. By default a 95% confidence interval is taken + confidence_interval_cv: + Number of folds to use in the backtest for confidence interval estimation + + Raises + ------ + ValueError: + If the horizon is less than 1, interval_width is out of (0,1) or confidence_interval_cv is less than 2. """ self.model = model self.transforms = transforms - self.horizon = horizon + self.horizon = self._validate_horizon(horizon) + self.interval_width = self._validate_interval_width(interval_width) + self.confidence_interval_cv = self._validate_cv(confidence_interval_cv) self.ts = None + @staticmethod + def _validate_horizon(horizon: int) -> int: + """Check that given number of folds is grater than 1.""" + if horizon > 0: + return horizon + else: + raise ValueError("At least one point in the future is expected.") + + @staticmethod + def _validate_interval_width(interval_width: float) -> float: + """Check that given number of folds is grater than 1.""" + if 0 < interval_width < 1: + return interval_width + else: + raise ValueError("Interval width should be a number from (0,1).") + + @staticmethod + def _validate_cv(cv: int) -> int: + """Check that given number of folds is grater than 1.""" + if cv > 1: + return cv + else: + raise ValueError("At least two folds for backtest are expected.") + def fit(self, ts: TSDataset) -> "Pipeline": """Fit the Pipeline. Fit and apply given transforms to the data, then fit the model on the transformed data. @@ -66,16 +112,50 @@ def fit(self, ts: TSDataset) -> "Pipeline": self.model.fit(self.ts) return self - def forecast(self) -> TSDataset: + def _forecast_confidence_interval(self, future: TSDataset) -> TSDataset: + """Forecast confidence interval for the future.""" + _, forecasts, _ = self.backtest(self.ts, metrics=[MAE()], n_folds=self.confidence_interval_cv) + forecasts = TSDataset(df=forecasts, freq=self.ts.freq) + residuals = ( + forecasts.loc[:, pd.IndexSlice[:, "target"]] + - self.ts[forecasts.index.min() : forecasts.index.max(), :, "target"] + ) + + predictions = self.model.forecast(ts=future) + se = scipy.stats.sem(residuals) + quantile = norm.ppf(q=(1 + self.interval_width) / 2) + lower_border = predictions[:, :, "target"] - se * quantile + upper_border = predictions[:, :, "target"] + se * quantile + lower_border = lower_border.rename({"target": "target_lower"}, axis=1) + upper_border = upper_border.rename({"target": "target_upper"}, axis=1) + predictions.df = pd.concat([predictions.df, lower_border, upper_border], axis=1).sort_index( + axis=1, level=(0, 1) + ) + return predictions + + def forecast(self, confidence_interval: bool = False) -> TSDataset: """Make predictions. + Parameters + ---------- + confidence_interval: + If True returns confidence interval for forecast + Returns ------- TSDataset TSDataset with forecast """ future = self.ts.make_future(self.horizon) - predictions = self.model.forecast(future) + if confidence_interval: + if "confidence_interval" in inspect.signature(self.model.forecast).parameters: + predictions = self.model.forecast( + ts=future, confidence_interval=confidence_interval, interval_width=self.interval_width + ) + else: + predictions = self._forecast_confidence_interval(future=future) + else: + predictions = self.model.forecast(ts=future) return predictions def _init_backtest(self): diff --git a/tests/test_models/test_prophet.py b/tests/test_models/test_prophet.py index e4b66e95f..60f9d042d 100644 --- a/tests/test_models/test_prophet.py +++ b/tests/test_models/test_prophet.py @@ -45,9 +45,9 @@ def test_run_with_reg(new_format_df, new_format_exog): def test_confidence_interval_run_insample(example_tsds): - model = ProphetModel(interval_width=0.95) + model = ProphetModel() model.fit(example_tsds) - forecast = model.forecast(example_tsds, confidence_interval=True) + forecast = model.forecast(example_tsds, confidence_interval=True, interval_width=0.95) for segment in forecast.segments: segment_slice = forecast[:, segment, :][segment] assert {"target_lower", "target_upper", "target"}.issubset(segment_slice.columns) @@ -55,10 +55,10 @@ def test_confidence_interval_run_insample(example_tsds): def test_confidence_interval_run_infuture(example_tsds): - model = ProphetModel(interval_width=0.95) + model = ProphetModel() model.fit(example_tsds) future = example_tsds.make_future(10) - forecast = model.forecast(future, confidence_interval=True) + forecast = model.forecast(future, confidence_interval=True, interval_width=0.95) for segment in forecast.segments: segment_slice = forecast[:, segment, :][segment] assert {"target_lower", "target_upper", "target"}.issubset(segment_slice.columns) diff --git a/tests/test_models/test_sarimax_model.py b/tests/test_models/test_sarimax_model.py index 2ae5d01e2..b23bc3229 100644 --- a/tests/test_models/test_sarimax_model.py +++ b/tests/test_models/test_sarimax_model.py @@ -37,9 +37,9 @@ def test_sarimax_forecaster_run_with_reg(example_reg_tsds): def test_confidence_interval_run_insample(example_tsds): - model = SARIMAXModel(interval_width=0.95) + model = SARIMAXModel() model.fit(example_tsds) - forecast = model.forecast(example_tsds, confidence_interval=True) + forecast = model.forecast(example_tsds, confidence_interval=True, interval_width=0.95) for segment in forecast.segments: segment_slice = forecast[:, segment, :][segment] assert {"target_lower", "target_upper", "target"}.issubset(segment_slice.columns) @@ -47,10 +47,10 @@ def test_confidence_interval_run_insample(example_tsds): def test_confidence_interval_run_infuture(example_tsds): - model = SARIMAXModel(interval_width=0.95) + model = SARIMAXModel() model.fit(example_tsds) future = example_tsds.make_future(10) - forecast = model.forecast(future, confidence_interval=True) + forecast = model.forecast(future, confidence_interval=True, interval_width=0.95) for segment in forecast.segments: segment_slice = forecast[:, segment, :][segment] assert {"target_lower", "target_upper", "target"}.issubset(segment_slice.columns) diff --git a/tests/test_pipeline/conftest.py b/tests/test_pipeline/conftest.py index 9fe46e197..c1516fd43 100644 --- a/tests/test_pipeline/conftest.py +++ b/tests/test_pipeline/conftest.py @@ -1,9 +1,18 @@ +from typing import Tuple + +import pandas as pd import pytest +import scipy +from numpy.random import RandomState +from scipy.stats import norm +from etna.datasets import TSDataset from etna.models import CatBoostModelPerSegment from etna.pipeline import Pipeline from etna.transforms import LagTransform +INTERVAL_WIDTH = 0.95 + @pytest.fixture def catboost_pipeline() -> Pipeline: @@ -25,3 +34,95 @@ def catboost_pipeline_big() -> Pipeline: horizon=24, ) return pipeline + + +@pytest.fixture +def weekly_period_ts(n_repeats: int = 15, horizon: int = 7) -> Tuple["TSDataset", "TSDataset"]: + segment_1 = [7.0, 7.0, 3.0, 1.0] + segment_2 = [40.0, 70.0, 20.0, 10.0] + ts_range = list(pd.date_range("2020-01-03", freq="1D", periods=n_repeats * len(segment_1))) + df = pd.DataFrame( + { + "timestamp": ts_range * 2, + "target": segment_1 * n_repeats + segment_2 * n_repeats, + "segment": ["segment_1"] * n_repeats * len(segment_1) + ["segment_2"] * n_repeats * len(segment_2), + } + ) + ts_start = sorted(set(df.timestamp))[-horizon] + train, test = ( + df[lambda x: x.timestamp < ts_start], + df[lambda x: x.timestamp >= ts_start], + ) + train = TSDataset(TSDataset.to_dataset(train), "D") + test = TSDataset(TSDataset.to_dataset(test), "D") + + return train, test + + +@pytest.fixture +def splited_piecewise_constant_ts( + first_constant_len=40, constant_1_1=7, constant_1_2=2, constant_2_1=50, constant_2_2=10, horizon=5 +) -> Tuple["TSDataset", "TSDataset"]: + + segment_1 = [constant_1_1] * first_constant_len + [constant_1_2] * horizon * 2 + segment_2 = [constant_2_1] * first_constant_len + [constant_2_2] * horizon * 2 + + quantile = norm.ppf(q=(1 + INTERVAL_WIDTH) / 2) + se_1 = scipy.stats.sem([0] * horizon * 2 + [constant_1_1 - constant_1_2] * horizon) + se_2 = scipy.stats.sem([0] * horizon * 2 + [constant_2_1 - constant_2_2] * horizon) + lower = [x - se_1 * quantile for x in segment_1] + [x - se_2 * quantile for x in segment_2] + upper = [x + se_1 * quantile for x in segment_1] + [x + se_2 * quantile for x in segment_2] + + ts_range = list(pd.date_range("2020-01-03", freq="1D", periods=len(segment_1))) + df = pd.DataFrame( + { + "timestamp": ts_range * 2, + "target": segment_1 + segment_2, + "target_lower": lower, + "target_upper": upper, + "segment": ["segment_1"] * len(segment_1) + ["segment_2"] * len(segment_2), + } + ) + ts_start = sorted(set(df.timestamp))[-horizon] + train, test = ( + df[lambda x: x.timestamp < ts_start], + df[lambda x: x.timestamp >= ts_start], + ) + train = TSDataset(TSDataset.to_dataset(train.drop(["target_lower", "target_upper"], axis=1)), "D") + test = TSDataset(TSDataset.to_dataset(test), "D") + return train, test + + +@pytest.fixture +def constant_ts(size=40) -> TSDataset: + segment_1 = [7] * size + segment_2 = [50] * size + ts_range = list(pd.date_range("2020-01-03", freq="1D", periods=size)) + df = pd.DataFrame( + { + "timestamp": ts_range * 2, + "target": segment_1 + segment_2, + "segment": ["segment_1"] * size + ["segment_2"] * size, + } + ) + ts = TSDataset(TSDataset.to_dataset(df), "D") + return ts + + +@pytest.fixture +def constant_noisy_ts(size=40, use_noise=True) -> TSDataset: + noise = RandomState(seed=42).normal(scale=3, size=size * 2) + segment_1 = [7] * size + segment_2 = [50] * size + ts_range = list(pd.date_range("2020-01-03", freq="1D", periods=size)) + df = pd.DataFrame( + { + "timestamp": ts_range * 2, + "target": segment_1 + segment_2, + "segment": ["segment_1"] * size + ["segment_2"] * size, + } + ) + if use_noise: + df.loc[:, "target"] += noise + ts = TSDataset(TSDataset.to_dataset(df), "D") + return ts diff --git a/tests/test_pipeline/test_pipeline.py b/tests/test_pipeline/test_pipeline.py index 877c36a11..e0e641bd7 100644 --- a/tests/test_pipeline/test_pipeline.py +++ b/tests/test_pipeline/test_pipeline.py @@ -13,6 +13,10 @@ from etna.metrics import Metric from etna.metrics import MetricAggregationMode from etna.models import LinearPerSegmentModel +from etna.models import MovingAverageModel +from etna.models import NaiveModel +from etna.models import ProphetModel +from etna.models import SARIMAXModel from etna.pipeline import Pipeline from etna.transforms import AddConstTransform from etna.transforms import DateFlagsTransform @@ -20,6 +24,43 @@ DEFAULT_METRICS = [MAE(mode=MetricAggregationMode.per_segment)] +@pytest.mark.parametrize("horizon,interval_width,confidence_interval_cv", ([(1, 0.5, 2)])) +def test_init_pass(horizon, interval_width, confidence_interval_cv): + """Check that Pipeline initialization works correctly in case of valid parameters.""" + pipeline = Pipeline( + model=LinearPerSegmentModel(), + transforms=[], + horizon=horizon, + interval_width=interval_width, + confidence_interval_cv=confidence_interval_cv, + ) + assert pipeline.horizon == horizon + assert pipeline.interval_width == interval_width + assert confidence_interval_cv == confidence_interval_cv + + +@pytest.mark.parametrize( + "horizon,interval_width,confidence_interval_cv,error_msg", + ( + [ + (-1, 0.5, 2, "At least one point in the future is expected."), + (2, 2, 2, "Interval width should be a number from."), + (2, 0.5, 1, "At least two folds for backtest are expected."), + ] + ), +) +def test_init_fail(horizon, interval_width, confidence_interval_cv, error_msg): + """Check that Pipeline initialization works correctly in case of invalid parameters.""" + with pytest.raises(ValueError, match=error_msg): + _ = Pipeline( + model=LinearPerSegmentModel(), + transforms=[], + horizon=horizon, + interval_width=interval_width, + confidence_interval_cv=confidence_interval_cv, + ) + + def test_fit(example_tsds): """Test that Pipeline correctly transforms dataset on fit stage.""" original_ts = deepcopy(example_tsds) @@ -49,6 +90,74 @@ def test_forecast(example_tsds): assert np.all(forecast_pipeline.df.values == forecast_manual.df.values) +@pytest.mark.parametrize("model", (ProphetModel(), SARIMAXModel())) +def test_forecast_confidence_interval_builtin(example_tsds, model): + """Test that forecast method uses built-in confidence intervals for the listed models.""" + np.random.seed(1234) + pipeline = Pipeline(model=model, transforms=[], horizon=5) + pipeline.fit(example_tsds) + forecast_pipeline = pipeline.forecast(confidence_interval=True) + + np.random.seed(1234) + model = model.fit(example_tsds) + future = example_tsds.make_future(5) + forecast_model = model.forecast(ts=future, confidence_interval=True) + + assert forecast_model.df.equals(forecast_pipeline.df) + + +@pytest.mark.parametrize("model", (MovingAverageModel(), LinearPerSegmentModel())) +def test_forecast_confidence_interval_interface(example_tsds, model): + """Test the forecast interface for the models without built-in confidence intervals.""" + pipeline = Pipeline(model=model, transforms=[DateFlagsTransform()], horizon=5) + pipeline.fit(example_tsds) + forecast = pipeline.forecast(confidence_interval=True) + for segment in forecast.segments: + segment_slice = forecast[:, segment, :][segment] + assert {"target_lower", "target_upper", "target"}.issubset(segment_slice.columns) + assert (segment_slice["target_upper"] - segment_slice["target_lower"] >= 0).all() + + +def test_forecast_confidence_interval(splited_piecewise_constant_ts): + """Test that the confidence interval for piecewise-constant dataset is correct.""" + train, test = splited_piecewise_constant_ts + pipeline = Pipeline(model=NaiveModel(lag=1), transforms=[], horizon=5) + pipeline.fit(train) + forecast = pipeline.forecast(confidence_interval=True) + assert (forecast.df.values == test.df.values).all() + + +@pytest.mark.parametrize("interval_width_lower,interval_width_upper", ([(0.6, 0.95)])) +def test_forecast_confidence_interval_size(example_tsds, interval_width_lower, interval_width_upper): + """Test that the higher value for interval_width parameter is passed, the wider confidence interval is forecasted.""" + pipeline = Pipeline(model=MovingAverageModel(), transforms=[], horizon=5, interval_width=interval_width_lower) + pipeline.fit(example_tsds) + forecast = pipeline.forecast(confidence_interval=True) + lower_interval_length = forecast[:, :, "target_upper"].values - forecast[:, :, "target_lower"].values + + pipeline = Pipeline(model=MovingAverageModel(), transforms=[], horizon=5, interval_width=interval_width_upper) + pipeline.fit(example_tsds) + forecast = pipeline.forecast(confidence_interval=True) + upper_interval_length = forecast[:, :, "target_upper"].values - forecast[:, :, "target_lower"].values + + assert (lower_interval_length <= upper_interval_length).all() + + +def test_forecast_confidence_interval_noise(constant_ts, constant_noisy_ts): + """Test that confidence interval for noisy dataset is wider then for the dataset without noise.""" + pipeline = Pipeline(model=MovingAverageModel(), transforms=[], horizon=5) + pipeline.fit(constant_ts) + forecast = pipeline.forecast(confidence_interval=True) + lower_interval_length = forecast[:, :, "target_upper"].values - forecast[:, :, "target_lower"].values + + pipeline = Pipeline(model=MovingAverageModel(), transforms=[], horizon=5) + pipeline.fit(constant_noisy_ts) + forecast = pipeline.forecast(confidence_interval=True) + upper_interval_length = forecast[:, :, "target_upper"].values - forecast[:, :, "target_lower"].values + + assert (lower_interval_length <= upper_interval_length).all() + + @pytest.mark.parametrize("n_folds", (0, -1)) def test_invalid_n_folds(catboost_pipeline: Pipeline, n_folds: int, example_tsdf: TSDataset): """Test Pipeline.backtest behavior in case of invalid n_folds."""