Skip to content

Confidence intervals -> Pipeline #221

Merged
merged 9 commits into from
Oct 27, 2021
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Add AutoRegressivePipeline ([#209](https://github.com/tinkoff-ai/etna-ts/pull/209))
- Ensembles notebook ([#218](https://github.com/tinkoff-ai/etna-ts/pull/218))
- Function plot_backtest_interactive ([#225](https://github.com/tinkoff-ai/etna-ts/pull/225))
- Confidence intervals in Pipeline ([#221](https://github.com/tinkoff-ai/etna-ts/pull/221))

### Changed
- Delete offset from WindowStatisticsTransform ([#111](https://github.com/tinkoff-ai/etna-ts/pull/111))
Expand Down
4 changes: 2 additions & 2 deletions etna/analysis/outliers/confidence_interval_outliers.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,9 @@ def get_anomalies_confidence_interval(
"""
outliers_per_segment = {}
time_points = np.array(ts.index.values)
model_instance = model(interval_width=interval_width, **model_params)
model_instance = model(**model_params)
model_instance.fit(ts)
confidence_interval = model_instance.forecast(deepcopy(ts), confidence_interval=True)
confidence_interval = model_instance.forecast(deepcopy(ts), confidence_interval=True, interval_width=interval_width)
for segment in ts.segments:
segment_slice = confidence_interval[:, segment, :][segment]
anomalies_mask = (segment_slice["target"] > segment_slice["target_upper"]) | (
Expand Down
8 changes: 0 additions & 8 deletions etna/ensembles/stacking_ensemble.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,14 +104,6 @@ def _get_horizon(pipelines: List[Pipeline]) -> int:
raise ValueError("All the pipelines should have the same horizon.")
return horizons.pop()

@staticmethod
def _validate_cv(cv: int) -> int:
"""Check that given number of folds is grater than 1."""
if cv > 1:
return cv
else:
raise ValueError("At least two folds for backtest are expected.")

def _filter_features_to_use(self, forecasts: List[TSDataset]) -> Union[None, Set[str]]:
"""Return all the features from `features_to_use` which can be obtained from base models' forecasts."""
features_df = pd.concat([forecast.df for forecast in forecasts], axis=1)
Expand Down
25 changes: 19 additions & 6 deletions etna/models/prophet.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ def fit(self, df: pd.DataFrame) -> "_ProphetModel":
self.model.fit(prophet_df)
return self

def predict(self, df: pd.DataFrame, confidence_interval: bool = False):
def predict(self, df: pd.DataFrame, confidence_interval: bool, interval_width: float):
"""
Compute Prophet predictions.
Parameters
Expand All @@ -104,6 +104,8 @@ def predict(self, df: pd.DataFrame, confidence_interval: bool = False):
Features dataframe
confidence_interval:
If True returns confidence interval for forecast
interval_width:
The significance level for the confidence interval. By default a 95% confidence interval is taken
Returns
-------
y_pred: pd.DataFrame
Expand All @@ -120,11 +122,14 @@ def predict(self, df: pd.DataFrame, confidence_interval: bool = False):
else:
prophet_column_name = column_name
prophet_df[prophet_column_name] = df[column_name]
if confidence_interval:
self.model.interval_width = interval_width
forecast = self.model.predict(prophet_df)
if confidence_interval:
y_pred = forecast[["yhat_lower", "yhat", "yhat_upper"]]
else:
y_pred = pd.DataFrame(forecast["yhat"])
self.model.interval_width = self.interval_width
return y_pred


Expand Down Expand Up @@ -301,15 +306,21 @@ def __init__(
)

@staticmethod
def _forecast_segment(
model, segment: Union[str, List[str]], ts: TSDataset, confidence_interval: bool = False
def _forecast_one_segment(
model,
segment: Union[str, List[str]],
ts: TSDataset,
confidence_interval: bool,
interval_width: float,
) -> pd.DataFrame:
segment_features = ts[:, segment, :]
segment_features = segment_features.droplevel("segment", axis=1)
segment_features = segment_features.reset_index()
dates = segment_features["timestamp"]
dates.reset_index(drop=True, inplace=True)
segment_predict = model.predict(df=segment_features, confidence_interval=confidence_interval)
segment_predict = model.predict(
df=segment_features, confidence_interval=confidence_interval, interval_width=interval_width
)
segment_predict = segment_predict.rename(
{"yhat": "target", "yhat_lower": "target_lower", "yhat_upper": "target_upper"}, axis=1
)
Expand All @@ -318,7 +329,7 @@ def _forecast_segment(
return segment_predict

@log_decorator
def forecast(self, ts: TSDataset, confidence_interval: bool = False) -> TSDataset:
def forecast(self, ts: TSDataset, confidence_interval: bool = False, interval_width: float = 0.95) -> TSDataset:
"""Make predictions.

Parameters
Expand All @@ -327,6 +338,8 @@ def forecast(self, ts: TSDataset, confidence_interval: bool = False) -> TSDatase
Dataframe with features
confidence_interval:
If True returns confidence interval for forecast
interval_width:
The significance level for the confidence interval. By default a 95% confidence interval is taken
Returns
-------
TSDataset
Expand All @@ -342,7 +355,7 @@ def forecast(self, ts: TSDataset, confidence_interval: bool = False) -> TSDatase
for segment in self._segments:
model = self._models[segment]

segment_predict = self._forecast_segment(model, segment, ts, confidence_interval)
segment_predict = self._forecast_one_segment(model, segment, ts, confidence_interval, interval_width)
result_list.append(segment_predict)

# need real case to test
Expand Down
33 changes: 17 additions & 16 deletions etna/models/sarimax.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@ def __init__(
freq: Optional[str] = None,
missing: str = "none",
validate_specification: bool = True,
interval_width: float = 0.8,
**kwargs,
):
"""
Expand Down Expand Up @@ -144,8 +143,6 @@ def __init__(
If 'raise', an error is raised. Default is 'none'.
validate_specification:
If True, validation of hyperparameters is performed.
interval_width:
Float, width of the uncertainty intervals provided for the forecast.
"""
self.order = order
self.seasonal_order = seasonal_order
Expand All @@ -164,7 +161,6 @@ def __init__(
self.freq = freq
self.missing = missing
self.validate_specification = validate_specification
self.interval_width = interval_width
self.kwargs = kwargs
self._model: Optional[SARIMAX] = None
self._result: Optional[SARIMAX] = None
Expand Down Expand Up @@ -234,7 +230,7 @@ def fit(self, df: pd.DataFrame) -> "_SARIMAXModel":
self._result = self._model.fit(start_params=start_params, disp=False)
return self

def predict(self, df: pd.DataFrame, confidence_interval: bool = False) -> pd.DataFrame:
def predict(self, df: pd.DataFrame, confidence_interval: bool, interval_width: float) -> pd.DataFrame:
"""
Compute predictions from a SARIMAX model.

Expand All @@ -244,6 +240,8 @@ def predict(self, df: pd.DataFrame, confidence_interval: bool = False) -> pd.Dat
Features dataframe
confidence_interval:
If True returns confidence interval for forecast
interval_width:
The significance level for the confidence interval. By default a 95% confidence interval is taken
Returns
-------
y_pred: pd.DataFrame
Expand All @@ -266,7 +264,7 @@ def predict(self, df: pd.DataFrame, confidence_interval: bool = False) -> pd.Dat
forecast = self._result.get_prediction(
start=df["timestamp"].min(), end=df["timestamp"].max(), dynamic=False, exog=exog_future
)
y_pred = forecast.summary_frame(alpha=1 - self.interval_width)[["mean_ci_lower", "mean", "mean_ci_upper"]]
y_pred = forecast.summary_frame(alpha=1 - interval_width)[["mean_ci_lower", "mean", "mean_ci_upper"]]
else:
forecast = self._result.get_prediction(
start=df["timestamp"].min(), end=df["timestamp"].max(), dynamic=True, exog=exog_future
Expand Down Expand Up @@ -337,7 +335,6 @@ def __init__(
freq: Optional[str] = None,
missing: str = "none",
validate_specification: bool = True,
interval_width: float = 0.8,
**kwargs,
):
"""
Expand Down Expand Up @@ -426,8 +423,6 @@ def __init__(
If 'raise', an error is raised. Default is 'none'.
validate_specification:
If True, validation of hyperparameters is performed.
interval_width:
Float, width of the uncertainty intervals provided for the forecast.
"""
self.order = order
self.seasonal_order = seasonal_order
Expand All @@ -446,7 +441,6 @@ def __init__(
self.freq = freq
self.missing = missing
self.validate_specification = validate_specification
self.interval_width = interval_width
self.kwargs = kwargs
super(SARIMAXModel, self).__init__(
base_model=_SARIMAXModel(
Expand All @@ -467,21 +461,26 @@ def __init__(
freq=self.freq,
missing=self.missing,
validate_specification=self.validate_specification,
interval_width=self.interval_width,
**self.kwargs,
)
)

@staticmethod
def _forecast_segment(
model, segment: Union[str, List[str]], ts: TSDataset, confidence_interval: bool = False
def _forecast_one_segment(
model,
segment: Union[str, List[str]],
ts: TSDataset,
confidence_interval: bool,
interval_width: float,
) -> pd.DataFrame:
segment_features = ts[:, segment, :]
segment_features = segment_features.droplevel("segment", axis=1)
segment_features = segment_features.reset_index()
dates = segment_features["timestamp"]
dates.reset_index(drop=True, inplace=True)
segment_predict = model.predict(df=segment_features, confidence_interval=confidence_interval)
segment_predict = model.predict(
df=segment_features, confidence_interval=confidence_interval, interval_width=interval_width
)
segment_predict = segment_predict.rename(
{"mean": "target", "mean_ci_lower": "target_lower", "mean_ci_upper": "target_upper"}, axis=1
)
Expand All @@ -490,14 +489,16 @@ def _forecast_segment(
return segment_predict

@log_decorator
def forecast(self, ts: TSDataset, confidence_interval: bool = False) -> TSDataset:
def forecast(self, ts: TSDataset, confidence_interval: bool = False, interval_width: float = 0.95) -> TSDataset:
"""Make predictions.
Parameters
----------
ts:
Dataframe with features
confidence_interval:
If True returns confidence interval for forecast
interval_width:
The significance level for the confidence interval. By default a 95% confidence interval is taken
Returns
-------
pd.DataFrame
Expand All @@ -513,7 +514,7 @@ def forecast(self, ts: TSDataset, confidence_interval: bool = False) -> TSDatase
for segment in self._segments:
model = self._models[segment]

segment_predict = self._forecast_segment(model, segment, ts, confidence_interval)
segment_predict = self._forecast_one_segment(model, segment, ts, confidence_interval, interval_width)
result_list.append(segment_predict)

# need real case to test
Expand Down
88 changes: 84 additions & 4 deletions etna/pipeline/pipeline.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import inspect
from copy import deepcopy
from enum import Enum
from typing import Any
Expand All @@ -8,12 +9,15 @@
from typing import Tuple

import pandas as pd
import scipy
from joblib import Parallel
from joblib import delayed
from scipy.stats import norm

from etna.core import BaseMixin
from etna.datasets import TSDataset
from etna.loggers import tslogger
from etna.metrics import MAE
from etna.metrics import Metric
from etna.metrics import MetricAggregationMode
from etna.models.base import Model
Expand All @@ -30,7 +34,14 @@ class CrossValidationMode(Enum):
class Pipeline(BaseMixin):
"""Pipeline of transforms with a final estimator."""

def __init__(self, model: Model, transforms: Iterable[Transform] = (), horizon: int = 1):
def __init__(
self,
model: Model,
transforms: Iterable[Transform] = (),
horizon: int = 1,
interval_width: float = 0.95,
confidence_interval_cv: int = 3,
):
"""
Create instance of Pipeline with given parameters.

Expand All @@ -42,12 +53,47 @@ def __init__(self, model: Model, transforms: Iterable[Transform] = (), horizon:
Sequence of the transforms
horizon:
Number of timestamps in the future for forecasting
interval_width:
The significance level for the confidence interval. By default a 95% confidence interval is taken
confidence_interval_cv:
Number of folds to use in the backtest for confidence interval estimation

Raises
------
ValueError:
If the horizon is less than 1, interval_width is out of (0,1) or confidence_interval_cv is less than 2.
"""
self.model = model
self.transforms = transforms
self.horizon = horizon
self.horizon = self._validate_horizon(horizon)
self.interval_width = self._validate_interval_width(interval_width)
self.confidence_interval_cv = self._validate_cv(confidence_interval_cv)
self.ts = None

@staticmethod
def _validate_horizon(horizon: int) -> int:
"""Check that given number of folds is grater than 1."""
if horizon > 0:
return horizon
else:
raise ValueError("At least one point in the future is expected.")

@staticmethod
def _validate_interval_width(interval_width: float) -> float:
"""Check that given number of folds is grater than 1."""
if 0 < interval_width < 1:
return interval_width
else:
raise ValueError("Interval width should be a number from (0,1).")

@staticmethod
def _validate_cv(cv: int) -> int:
"""Check that given number of folds is grater than 1."""
if cv > 1:
return cv
else:
raise ValueError("At least two folds for backtest are expected.")

def fit(self, ts: TSDataset) -> "Pipeline":
"""Fit the Pipeline.
Fit and apply given transforms to the data, then fit the model on the transformed data.
Expand All @@ -66,16 +112,50 @@ def fit(self, ts: TSDataset) -> "Pipeline":
self.model.fit(self.ts)
return self

def forecast(self) -> TSDataset:
def _forecast_confidence_interval(self, future: TSDataset) -> TSDataset:
"""Forecast confidence interval for the future."""
_, forecasts, _ = self.backtest(self.ts, metrics=[MAE()], n_folds=self.confidence_interval_cv)
forecasts = TSDataset(df=forecasts, freq=self.ts.freq)
residuals = (
forecasts.loc[:, pd.IndexSlice[:, "target"]]
- self.ts[forecasts.index.min() : forecasts.index.max(), :, "target"]
)

predictions = self.model.forecast(ts=future)
se = scipy.stats.sem(residuals)
quantile = norm.ppf(q=(1 + self.interval_width) / 2)
lower_border = predictions[:, :, "target"] - se * quantile
upper_border = predictions[:, :, "target"] + se * quantile
lower_border = lower_border.rename({"target": "target_lower"}, axis=1)
upper_border = upper_border.rename({"target": "target_upper"}, axis=1)
predictions.df = pd.concat([predictions.df, lower_border, upper_border], axis=1).sort_index(
axis=1, level=(0, 1)
)
return predictions

def forecast(self, confidence_interval: bool = False) -> TSDataset:
"""Make predictions.

Parameters
----------
confidence_interval:
If True returns confidence interval for forecast

Returns
-------
TSDataset
TSDataset with forecast
"""
future = self.ts.make_future(self.horizon)
predictions = self.model.forecast(future)
if confidence_interval:
if "confidence_interval" in inspect.signature(self.model.forecast).parameters:
predictions = self.model.forecast(
ts=future, confidence_interval=confidence_interval, interval_width=self.interval_width
)
else:
predictions = self._forecast_confidence_interval(future=future)
else:
predictions = self.model.forecast(ts=future)
return predictions

def _init_backtest(self):
Expand Down
Loading