Skip to content

Add AutoRegressivePipeline #209

Merged
merged 13 commits into from
Oct 21, 2021
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Add 'in_column' parameter to get_anomalies methods([#199](https://github.com/tinkoff-ai/etna-ts/pull/199))
- Clustering notebook ([#152](https://github.com/tinkoff-ai/etna-ts/pull/152))
- StackingEnsemble ([#195](https://github.com/tinkoff-ai/etna-ts/pull/195))
- Add AutoRegressivePipeline ([#209](https://github.com/tinkoff-ai/etna-ts/pull/209))

### Changed
- Delete offset from WindowStatisticsTransform ([#111](https://github.com/tinkoff-ai/etna-ts/pull/111))
Expand Down
1 change: 1 addition & 0 deletions docs/source/api.rst
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ API
metrics
transforms
ensembles
pipeline
analysis
model_selection
loggers
Expand Down
1 change: 1 addition & 0 deletions docs/source/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ Welcome to ETNA's documentation!
metrics
transforms
ensembles
pipeline
analysis
model_selection
loggers
Expand Down
20 changes: 20 additions & 0 deletions docs/source/pipeline.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
Pipelines
==========

.. _pipeline:

.. currentmodule:: etna

Details and available algorithms
--------------------------------

See the API documentation for further details of using pipelines:

.. currentmodule:: etna

.. moduleautosummary::
:toctree: api/
:template: custom-module-template.rst
:recursive:

etna.pipeline
2 changes: 1 addition & 1 deletion docs/source/transforms.rst
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ Transforms
Details and available algorithms
--------------------------------

See the API documentation for further details on available clustering algorithms:
See the API documentation for further details on available feature extractions and transformations:

.. currentmodule:: etna

Expand Down
1 change: 1 addition & 0 deletions etna/pipeline/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
from etna.pipeline.autoregressive_pipeline import AutoRegressivePipeline
from etna.pipeline.pipeline import Pipeline
137 changes: 137 additions & 0 deletions etna/pipeline/autoregressive_pipeline.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
import warnings
from copy import deepcopy
from typing import Iterable

import pandas as pd

from etna.datasets import TSDataset
from etna.models.base import Model
from etna.pipeline.pipeline import Pipeline
from etna.transforms import Transform


class AutoRegressivePipeline(Pipeline):
"""Pipeline that make regressive models autoregressive.

Examples
--------
>>> from etna.datasets import generate_periodic_df
>>> from etna.datasets import TSDataset
>>> from etna.models import LinearPerSegmentModel
>>> from etna.transforms import LagTransform
>>> classic_df = generate_periodic_df(
... periods=100,
... start_time="2020-01-01",
... n_segments=4,
... period=7,
... sigma=3
... )
>>> df = TSDataset.to_dataset(df=classic_df)
>>> ts = TSDataset(df, freq="D")
>>> horizon = 7
>>> transforms = [
... LagTransform(in_column="target", lags=list(range(1, horizon+1)))
... ]
>>> model = LinearPerSegmentModel()
>>> pipeline = AutoRegressivePipeline(model, horizon, transforms, step=1)
>>> _ = pipeline.fit(ts=ts)
>>> forecast = pipeline.forecast()
>>> pd.options.display.float_format = '{:,.2f}'.format
>>> forecast[:, :, "target"]
segment segment_0 segment_1 segment_2 segment_3
feature target target target target
timestamp
2020-04-10 9.00 9.00 4.00 6.00
2020-04-11 5.00 2.00 7.00 9.00
2020-04-12 0.00 4.00 7.00 9.00
2020-04-13 0.00 5.00 9.00 7.00
2020-04-14 1.00 2.00 1.00 6.00
2020-04-15 5.00 7.00 4.00 7.00
2020-04-16 8.00 6.00 2.00 0.00
"""

def __init__(self, model: Model, horizon: int, transforms: Iterable[Transform] = (), step: int = 1):
"""
Create instance of AutoRegressivePipeline with given parameters.

Parameters
----------
model:
Instance of the etna Model
horizon:
Number of timestamps in the future for forecasting
transforms:
Sequence of the transforms
step:
Size of prediction for one step of forecasting
"""
self.model = model
self.horizon = horizon
self.transforms = transforms
self.step = step
self.transforms = transforms
self.model = model

def fit(self, ts: TSDataset) -> Pipeline:
"""Fit the Pipeline.
Fit and apply given transforms to the data, then fit the model on the transformed data.

Parameters
----------
ts:
Dataset with timeseries data

Returns
-------
Pipeline:
Fitted Pipeline instance
"""
self.ts = deepcopy(ts)
ts.fit_transform(self.transforms)
self.model.fit(ts)
return self

def _create_predictions_template(self) -> pd.DataFrame:
"""Create dataframe to fill with forecasts."""
prediction_df = self.ts.to_pandas()
future_dates = pd.date_range(
start=prediction_df.index.max(), periods=self.horizon + 1, freq=self.ts.freq, closed="right"
)
prediction_df = prediction_df.reindex(prediction_df.index.append(future_dates))
prediction_df.index.name = "timestamp"
return prediction_df

def forecast(self) -> TSDataset:
"""Make predictions.

Returns
-------
TSDataset:
TSDataset with forecast
"""
prediction_df = self._create_predictions_template()

for idx_start in range(0, self.horizon, self.step):
current_step = min(self.step, self.horizon - idx_start)
current_idx_border = self.ts.index.shape[0] + idx_start
current_ts = TSDataset(prediction_df.iloc[:current_idx_border], freq=self.ts.freq)
# manually set transforms in current_ts, otherwise make_future won't know about them
current_ts.transforms = self.transforms
with warnings.catch_warnings():
warnings.filterwarnings(
message="TSDataset freq can't be inferred",
action="ignore",
)
warnings.filterwarnings(
message="You probably set wrong freq.",
action="ignore",
)
current_ts_forecast = current_ts.make_future(current_step)
current_ts_future = self.model.forecast(current_ts_forecast)
prediction_df = prediction_df.combine_first(current_ts_future.to_pandas()[prediction_df.columns])

prediction_ts = TSDataset(prediction_df.tail(self.horizon), freq=self.ts.freq)
# add all other features to forecast by making transform + inverse_transform
prediction_ts.transform(self.transforms)
prediction_ts.inverse_transform()
return prediction_ts
91 changes: 91 additions & 0 deletions tests/test_pipeline/test_autoregressive_pipeline.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
from copy import deepcopy

import numpy as np
import pandas as pd
import pytest

from etna.datasets import TSDataset
from etna.models import LinearPerSegmentModel
from etna.pipeline import AutoRegressivePipeline
from etna.transforms import DateFlagsTransform
from etna.transforms import LagTransform
from etna.transforms import LinearTrendTransform


def test_fit(example_tsds):
"""Test that AutoRegressivePipeline pipeline makes fit without failing."""
model = LinearPerSegmentModel()
transforms = [LagTransform(in_column="target", lags=[1]), DateFlagsTransform()]
pipeline = AutoRegressivePipeline(model=model, transforms=transforms, horizon=5, step=1)
pipeline.fit(example_tsds)


def test_forecast_columns(example_tsds):
"""Test that AutoRegressivePipeline generates all the columns."""
original_ts = deepcopy(example_tsds)
horizon = 5

# make predictions in AutoRegressivePipeline
model = LinearPerSegmentModel()
transforms = [LagTransform(in_column="target", lags=[1]), DateFlagsTransform(is_weekend=True)]
pipeline = AutoRegressivePipeline(model=model, transforms=transforms, horizon=horizon, step=1)
pipeline.fit(example_tsds)
forecast_pipeline = pipeline.forecast()

# generate all columns
original_ts.fit_transform(transforms)

assert set(forecast_pipeline.columns) == set(original_ts.columns)


def test_forecast_one_step(example_tsds):
"""Test that AutoRegressivePipeline gets predictions one by one if step is equal to 1."""
original_ts = deepcopy(example_tsds)
horizon = 5

# make predictions in AutoRegressivePipeline
model = LinearPerSegmentModel()
transforms = [LagTransform(in_column="target", lags=[1])]
pipeline = AutoRegressivePipeline(model=model, transforms=transforms, horizon=horizon, step=1)
pipeline.fit(example_tsds)
forecast_pipeline = pipeline.forecast()

# make predictions manually
df = original_ts.to_pandas()
original_ts.fit_transform(transforms)
model = LinearPerSegmentModel()
model.fit(original_ts)
for i in range(horizon):
cur_ts = TSDataset(df, freq=original_ts.freq)
# these transform don't fit and we can fit_transform them at each step
cur_ts.transform(transforms)
cur_forecast_ts = cur_ts.make_future(1)
cur_future_ts = model.forecast(cur_forecast_ts)
to_add_df = cur_future_ts.to_pandas()
df = pd.concat([df, to_add_df[df.columns]])

forecast_manual = TSDataset(df.tail(horizon), freq=original_ts.freq)
assert np.all(forecast_pipeline[:, :, "target"] == forecast_manual[:, :, "target"])


@pytest.mark.parametrize("horizon, step", ((1, 1), (5, 1), (5, 2), (5, 3), (5, 4), (5, 5), (20, 1), (20, 2), (20, 3)))
def test_forecast_multi_step(example_tsds, horizon, step):
"""Test that AutoRegressivePipeline gets correct number of predictions if step is more than 1."""
model = LinearPerSegmentModel()
transforms = [LagTransform(in_column="target", lags=[step])]
pipeline = AutoRegressivePipeline(model=model, transforms=transforms, horizon=horizon, step=step)
pipeline.fit(example_tsds)
forecast_pipeline = pipeline.forecast()

assert forecast_pipeline.df.shape[0] == horizon


def test_forecast_with_fit_transforms(example_tsds):
"""Test that AutoRegressivePipeline can work with transforms that need fitting."""
horizon = 5

model = LinearPerSegmentModel()
transforms = [LagTransform(in_column="target", lags=[1]), LinearTrendTransform(in_column="target")]
pipeline = AutoRegressivePipeline(model=model, transforms=transforms, horizon=horizon, step=1)
pipeline.fit(example_tsds)
pipeline.forecast()