Skip to content

Commit

Permalink
Add StackingEnsemble (#195)
Browse files Browse the repository at this point in the history
  • Loading branch information
alex-hse-repository authored Oct 20, 2021
1 parent 506cde5 commit e288524
Show file tree
Hide file tree
Showing 7 changed files with 613 additions and 23 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- ConfidenceIntervalOutliersTransform ([#196](https://github.com/tinkoff-ai/etna-ts/pull/196))
- Add 'in_column' parameter to get_anomalies methods([#199](https://github.com/tinkoff-ai/etna-ts/pull/199))
- Clustering notebook ([#152](https://github.com/tinkoff-ai/etna-ts/pull/152))
- StackingEnsemble ([#195](https://github.com/tinkoff-ai/etna-ts/pull/195))

### Changed
- Delete offset from WindowStatisticsTransform ([#111](https://github.com/tinkoff-ai/etna-ts/pull/111))
Expand Down
1 change: 1 addition & 0 deletions etna/ensembles/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
from etna.ensembles.stacking_ensemble import StackingEnsemble
from etna.ensembles.voting_ensemble import VotingEnsemble
258 changes: 258 additions & 0 deletions etna/ensembles/stacking_ensemble.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,258 @@
import warnings
from copy import deepcopy
from typing import List
from typing import Set
from typing import Tuple
from typing import Union

import numpy as np
import pandas as pd
from joblib import Parallel
from joblib import delayed
from sklearn.base import RegressorMixin
from sklearn.linear_model import LinearRegression
from typing_extensions import Literal

from etna.datasets import TSDataset
from etna.loggers import tslogger
from etna.metrics import MAE
from etna.pipeline import Pipeline


class StackingEnsemble(Pipeline):
"""StackingEnsemble is a pipeline that forecast future using the metamodel to combine the forecasts of the base models.
Examples
--------
>>> from etna.datasets import generate_ar_df
>>> from etna.datasets import TSDataset
>>> from etna.ensembles import VotingEnsemble
>>> from etna.models import NaiveModel
>>> from etna.models import MovingAverageModel
>>> from etna.pipeline import Pipeline
>>> import pandas as pd
>>> pd.options.display.float_format = '{:,.2f}'.format
>>> df = generate_ar_df(periods=100, start_time="2021-06-01", ar_coef=[0.8], n_segments=3)
>>> df_ts_format = TSDataset.to_dataset(df)
>>> ts = TSDataset(df_ts_format, "D")
>>> ma_pipeline = Pipeline(model=MovingAverageModel(window=5), transforms=[], horizon=7)
>>> naive_pipeline = Pipeline(model=NaiveModel(lag=10), transforms=[], horizon=7)
>>> ensemble = StackingEnsemble(pipelines=[ma_pipeline, naive_pipeline])
>>> _ = ensemble.fit(ts=ts)
>>> forecast = ensemble.forecast()
>>> forecast[:,:,"target"]
segment segment_0 segment_1 segment_2
feature target target target
timestamp
2021-09-09 0.70 1.47 0.20
2021-09-10 0.62 1.53 0.26
2021-09-11 0.50 1.78 0.36
2021-09-12 0.37 1.88 0.21
2021-09-13 0.46 1.87 0.25
2021-09-14 0.44 1.49 0.21
2021-09-15 0.36 1.56 0.30
"""

def __init__(
self,
pipelines: List[Pipeline],
final_model: RegressorMixin = LinearRegression(),
cv: int = 3,
features_to_use: Union[None, Literal[all], List[str]] = None,
n_jobs: int = 1,
):
"""Init StackingEnsemble.
Parameters
----------
pipelines:
List of pipelines that should be used in ensemble.
final_model:
Regression model with fit/predict interface which will be used to combine the base estimators.
cv:
Number of folds to use in the backtest. Backtest is not used for model evaluation but for prediction.
features_to_use:
Features except the forecasts of the base models to use in the `final_model`.
n_jobs:
Number of jobs to run in parallel.
Raises
------
ValueError:
If the number of the pipelines is less than 2 or pipelines have different horizons.
"""
self._validate_pipeline_number(pipelines=pipelines)
self.pipelines = pipelines
self.horizon = self._get_horizon(pipelines=pipelines)
self.final_model = final_model
self.cv = self._validate_cv(cv)
self.features_to_use = features_to_use
self.filtered_features_for_final_model: Union[None, Set[str]] = None
self.n_jobs = n_jobs

@staticmethod
def _validate_pipeline_number(pipelines: List[Pipeline]):
"""Check that given valid number of pipelines."""
if len(pipelines) < 2:
raise ValueError("At least two pipelines are expected.")

@staticmethod
def _get_horizon(pipelines: List[Pipeline]) -> int:
"""Get ensemble's horizon."""
horizons = set([pipeline.horizon for pipeline in pipelines])
if len(horizons) > 1:
raise ValueError("All the pipelines should have the same horizon.")
return horizons.pop()

@staticmethod
def _validate_cv(cv: int) -> int:
"""Check that given number of folds is grater than 1."""
if cv > 1:
return cv
else:
raise ValueError("At least two folds for backtest are expected.")

def _filter_features_to_use(self, forecasts: List[TSDataset]) -> Union[None, Set[str]]:
"""Return all the features from `features_to_use` which can be obtained from base models' forecasts."""
features_df = pd.concat([forecast.df for forecast in forecasts], axis=1)
available_features = set(features_df.columns.get_level_values("feature")) - {"fold_number"}
features_to_use = self.features_to_use
if features_to_use is None:
return None
elif features_to_use == "all":
return available_features - {"target"}
elif isinstance(features_to_use, list):
features_to_use = set(features_to_use)
if len(features_to_use) == 0:
return None
elif features_to_use.issubset(available_features):
return features_to_use
else:
unavailable_features = features_to_use - available_features
warnings.warn(f"Features {unavailable_features} are not found and will be dropped!")
return features_to_use.intersection(available_features)
else:
warnings.warn(
"Feature list is passed in the wrong format."
"Only the base models' forecasts will be used for the final forecast."
)
return None

@staticmethod
def _fit_pipeline(pipeline: Pipeline, ts: TSDataset) -> Pipeline:
"""Fit given pipeline with ts."""
tslogger.log(msg=f"Start fitting {pipeline}.")
pipeline.fit(ts=ts)
tslogger.log(msg=f"Pipeline {pipeline} is fitted.")
return pipeline

def _backtest_pipeline(self, pipeline: Pipeline, ts: TSDataset) -> TSDataset:
"""Get forecasts from backtest for given pipeline."""
_, forecasts, _ = pipeline.backtest(ts, metrics=[MAE()], n_folds=self.cv)
forecasts = TSDataset(df=forecasts, freq=ts.freq)
return forecasts

def fit(self, ts: TSDataset) -> "StackingEnsemble":
"""Fit the ensemble.
Parameters
----------
ts:
TSDataset to fit ensemble.
Returns
-------
StackingEnsemble:
Fitted ensemble.
"""
self.ts = ts

# Get forecasts from base models on backtest to fit the final model on
forecasts = Parallel(n_jobs=self.n_jobs, backend="multiprocessing", verbose=11)(
delayed(self._backtest_pipeline)(pipeline=pipeline, ts=deepcopy(ts)) for pipeline in self.pipelines
)

# Fit the final model
self.filtered_features_for_final_model = self._filter_features_to_use(forecasts)
x, y = self._make_features(forecasts=forecasts, train=True)
self.final_model.fit(x, y)

# Fit the base models
self.pipelines = Parallel(n_jobs=self.n_jobs, backend="multiprocessing", verbose=11)(
delayed(self._fit_pipeline)(pipeline=pipeline, ts=deepcopy(ts)) for pipeline in self.pipelines
)
return self

def _make_features(
self, forecasts: List[TSDataset], train: bool = False
) -> Union[Tuple[pd.DataFrame, pd.Series], pd.Series]:
"""Prepare features for the `final_model`."""
# Stack targets from the forecasts
targets = [
forecast[:, :, "target"].rename({"target": f"regressor_target_{i}"}, axis=1)
for i, forecast in enumerate(forecasts)
]
targets = pd.concat(targets, axis=1)

# Get features from filtered_features_for_final_model
features = pd.DataFrame()
if self.filtered_features_for_final_model is not None:
features_in_forecasts = [
set(forecast.columns.get_level_values("feature")).intersection(self.filtered_features_for_final_model)
for forecast in forecasts
]
features = pd.concat(
[forecast[:, :, features_in_forecasts[i]] for i, forecast in enumerate(forecasts)], axis=1
)
features = features.loc[:, ~features.columns.duplicated()]
features_df = pd.concat([features, targets], axis=1)

# Flatten the features to fit the sklearn interface
x = pd.concat([features_df.loc[:, segment] for segment in self.ts.segments], axis=0)
if train:
y = pd.concat(
[
self.ts[forecasts[0].index.min() : forecasts[0].index.max(), segment, "target"]
for segment in self.ts.segments
],
axis=0,
)
return x, y
else:
return x

@staticmethod
def _forecast_pipeline(pipeline: Pipeline) -> TSDataset:
"""Make forecast with given pipeline."""
tslogger.log(msg=f"Start forecasting with {pipeline}.")
forecast = pipeline.forecast()
tslogger.log(msg=f"Forecast is done with {pipeline}.")
return forecast

def forecast(self) -> TSDataset:
"""Forecast with ensemble: compute the combination of pipelines' forecasts using `final_model`.
Returns
-------
TSDataset:
Dataset with forecasts.
"""
# Get forecast
forecasts = Parallel(n_jobs=self.n_jobs, backend="multiprocessing", verbose=11)(
delayed(self._forecast_pipeline)(pipeline=pipeline) for pipeline in self.pipelines
)
x = self._make_features(forecasts=forecasts, train=False)
y = self.final_model.predict(x).reshape(-1, self.horizon).T

# Format the forecast into TSDataset
segment_col = [segment for segment in self.ts.segments for _ in range(self.horizon)]
x.loc[:, "segment"] = segment_col
x.loc[:, "timestamp"] = x.index.values
df_exog = TSDataset.to_dataset(x)

df = forecasts[0][:, :, "target"].copy()
df.loc[pd.IndexSlice[:], pd.IndexSlice[:, "target"]] = np.NAN

forecast = TSDataset(df=df, freq=self.ts.freq, df_exog=df_exog)
forecast.loc[pd.IndexSlice[:], pd.IndexSlice[:, "target"]] = y
return forecast
28 changes: 12 additions & 16 deletions etna/ensembles/voting_ensemble.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,16 +31,7 @@ class VotingEnsemble(Pipeline):
... pipelines=[prophet_pipeline, naive_pipeline],
... weights=[0.7, 0.3]
... )
>>> ensemble.fit(ts=ts)
VotingEnsemble(pipelines =
[Pipeline(model = ProphetModel(growth = 'linear', changepoints = None, n_changepoints = 25,
changepoint_range = 0.8, yearly_seasonality = 'auto', weekly_seasonality = 'auto',
daily_seasonality = 'auto', holidays = None, seasonality_mode = 'additive',
seasonality_prior_scale = 10.0, holidays_prior_scale = 10.0, mcmc_samples = 0,
interval_width = 0.8, uncertainty_samples = 1000, stan_backend = None,
additional_seasonality_params = (), ), transforms = [], horizon = 7, ),
Pipeline(model = NaiveModel(lag = 10, ), transforms = [], horizon = 7, )],
weights = [0.7, 0.3], n_jobs = 1, )
>>> _ = ensemble.fit(ts=ts)
>>> forecast = ensemble.forecast()
>>> forecast
segment segment_0 segment_1 segment_2
Expand All @@ -66,6 +57,11 @@ def __init__(self, pipelines: List[Pipeline], weights: Optional[List[float]] = N
list of pipelines' weights; weights will be normalized automatically.
n_jobs:
number of jobs to run in parallel
Raises
------
ValueError:
If the number of the pipelines is less than 2 or pipelines have different horizons.
"""
self._validate_pipeline_number(pipelines=pipelines)
self.horizon = self._get_horizon(pipelines=pipelines)
Expand All @@ -82,10 +78,10 @@ def _validate_pipeline_number(pipelines: List[Pipeline]):
@staticmethod
def _get_horizon(pipelines: List[Pipeline]) -> int:
"""Get ensemble's horizon."""
horizons = list(set([pipeline.horizon for pipeline in pipelines]))
horizons = set([pipeline.horizon for pipeline in pipelines])
if len(horizons) > 1:
raise ValueError("All the pipelines should have the same horizon.")
return horizons[0]
return horizons.pop()

@staticmethod
def _process_weights(weights: Optional[Iterable[float]], pipelines_number: int) -> List[float]:
Expand All @@ -101,9 +97,9 @@ def _process_weights(weights: Optional[Iterable[float]], pipelines_number: int)
@staticmethod
def _fit_pipeline(pipeline: Pipeline, ts: TSDataset) -> Pipeline:
"""Fit given pipeline with ts."""
tslogger.log(msg=f"Start fitting {pipeline.__repr__()}.")
tslogger.log(msg=f"Start fitting {pipeline}.")
pipeline.fit(ts=ts)
tslogger.log(msg=f"Pipeline {pipeline.__repr__()} is fitted.")
tslogger.log(msg=f"Pipeline {pipeline} is fitted.")
return pipeline

def fit(self, ts: TSDataset) -> "VotingEnsemble":
Expand All @@ -127,9 +123,9 @@ def fit(self, ts: TSDataset) -> "VotingEnsemble":
@staticmethod
def _forecast_pipeline(pipeline: Pipeline) -> TSDataset:
"""Make forecast with given pipeline."""
tslogger.log(msg=f"Start forecasting with {pipeline.__repr__()}.")
tslogger.log(msg=f"Start forecasting with {pipeline}.")
forecast = pipeline.forecast()
tslogger.log(msg=f"Forecast is done with {pipeline.__repr__()}.")
tslogger.log(msg=f"Forecast is done with {pipeline}.")
return forecast

def _vote(self, forecasts: List[TSDataset]) -> TSDataset:
Expand Down
Loading

0 comments on commit e288524

Please sign in to comment.