-
Notifications
You must be signed in to change notification settings - Fork 80
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Co-authored-by: martins0n <[email protected]>
- Loading branch information
1 parent
e288524
commit 91bcbea
Showing
8 changed files
with
253 additions
and
1 deletion.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -13,6 +13,7 @@ API | |
metrics | ||
transforms | ||
ensembles | ||
pipeline | ||
analysis | ||
model_selection | ||
loggers | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,20 @@ | ||
Pipelines | ||
========== | ||
|
||
.. _pipeline: | ||
|
||
.. currentmodule:: etna | ||
|
||
Details and available algorithms | ||
-------------------------------- | ||
|
||
See the API documentation for further details of using pipelines: | ||
|
||
.. currentmodule:: etna | ||
|
||
.. moduleautosummary:: | ||
:toctree: api/ | ||
:template: custom-module-template.rst | ||
:recursive: | ||
|
||
etna.pipeline |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1 +1,2 @@ | ||
from etna.pipeline.autoregressive_pipeline import AutoRegressivePipeline | ||
from etna.pipeline.pipeline import Pipeline |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,137 @@ | ||
import warnings | ||
from copy import deepcopy | ||
from typing import Iterable | ||
|
||
import pandas as pd | ||
|
||
from etna.datasets import TSDataset | ||
from etna.models.base import Model | ||
from etna.pipeline.pipeline import Pipeline | ||
from etna.transforms import Transform | ||
|
||
|
||
class AutoRegressivePipeline(Pipeline): | ||
"""Pipeline that make regressive models autoregressive. | ||
Examples | ||
-------- | ||
>>> from etna.datasets import generate_periodic_df | ||
>>> from etna.datasets import TSDataset | ||
>>> from etna.models import LinearPerSegmentModel | ||
>>> from etna.transforms import LagTransform | ||
>>> classic_df = generate_periodic_df( | ||
... periods=100, | ||
... start_time="2020-01-01", | ||
... n_segments=4, | ||
... period=7, | ||
... sigma=3 | ||
... ) | ||
>>> df = TSDataset.to_dataset(df=classic_df) | ||
>>> ts = TSDataset(df, freq="D") | ||
>>> horizon = 7 | ||
>>> transforms = [ | ||
... LagTransform(in_column="target", lags=list(range(1, horizon+1))) | ||
... ] | ||
>>> model = LinearPerSegmentModel() | ||
>>> pipeline = AutoRegressivePipeline(model, horizon, transforms, step=1) | ||
>>> _ = pipeline.fit(ts=ts) | ||
>>> forecast = pipeline.forecast() | ||
>>> pd.options.display.float_format = '{:,.2f}'.format | ||
>>> forecast[:, :, "target"] | ||
segment segment_0 segment_1 segment_2 segment_3 | ||
feature target target target target | ||
timestamp | ||
2020-04-10 9.00 9.00 4.00 6.00 | ||
2020-04-11 5.00 2.00 7.00 9.00 | ||
2020-04-12 0.00 4.00 7.00 9.00 | ||
2020-04-13 0.00 5.00 9.00 7.00 | ||
2020-04-14 1.00 2.00 1.00 6.00 | ||
2020-04-15 5.00 7.00 4.00 7.00 | ||
2020-04-16 8.00 6.00 2.00 0.00 | ||
""" | ||
|
||
def __init__(self, model: Model, horizon: int, transforms: Iterable[Transform] = (), step: int = 1): | ||
""" | ||
Create instance of AutoRegressivePipeline with given parameters. | ||
Parameters | ||
---------- | ||
model: | ||
Instance of the etna Model | ||
horizon: | ||
Number of timestamps in the future for forecasting | ||
transforms: | ||
Sequence of the transforms | ||
step: | ||
Size of prediction for one step of forecasting | ||
""" | ||
self.model = model | ||
self.horizon = horizon | ||
self.transforms = transforms | ||
self.step = step | ||
self.transforms = transforms | ||
self.model = model | ||
|
||
def fit(self, ts: TSDataset) -> Pipeline: | ||
"""Fit the Pipeline. | ||
Fit and apply given transforms to the data, then fit the model on the transformed data. | ||
Parameters | ||
---------- | ||
ts: | ||
Dataset with timeseries data | ||
Returns | ||
------- | ||
Pipeline: | ||
Fitted Pipeline instance | ||
""" | ||
self.ts = deepcopy(ts) | ||
ts.fit_transform(self.transforms) | ||
self.model.fit(ts) | ||
return self | ||
|
||
def _create_predictions_template(self) -> pd.DataFrame: | ||
"""Create dataframe to fill with forecasts.""" | ||
prediction_df = self.ts.to_pandas() | ||
future_dates = pd.date_range( | ||
start=prediction_df.index.max(), periods=self.horizon + 1, freq=self.ts.freq, closed="right" | ||
) | ||
prediction_df = prediction_df.reindex(prediction_df.index.append(future_dates)) | ||
prediction_df.index.name = "timestamp" | ||
return prediction_df | ||
|
||
def forecast(self) -> TSDataset: | ||
"""Make predictions. | ||
Returns | ||
------- | ||
TSDataset: | ||
TSDataset with forecast | ||
""" | ||
prediction_df = self._create_predictions_template() | ||
|
||
for idx_start in range(0, self.horizon, self.step): | ||
current_step = min(self.step, self.horizon - idx_start) | ||
current_idx_border = self.ts.index.shape[0] + idx_start | ||
current_ts = TSDataset(prediction_df.iloc[:current_idx_border], freq=self.ts.freq) | ||
# manually set transforms in current_ts, otherwise make_future won't know about them | ||
current_ts.transforms = self.transforms | ||
with warnings.catch_warnings(): | ||
warnings.filterwarnings( | ||
message="TSDataset freq can't be inferred", | ||
action="ignore", | ||
) | ||
warnings.filterwarnings( | ||
message="You probably set wrong freq.", | ||
action="ignore", | ||
) | ||
current_ts_forecast = current_ts.make_future(current_step) | ||
current_ts_future = self.model.forecast(current_ts_forecast) | ||
prediction_df = prediction_df.combine_first(current_ts_future.to_pandas()[prediction_df.columns]) | ||
|
||
prediction_ts = TSDataset(prediction_df.tail(self.horizon), freq=self.ts.freq) | ||
# add all other features to forecast by making transform + inverse_transform | ||
prediction_ts.transform(self.transforms) | ||
prediction_ts.inverse_transform() | ||
return prediction_ts |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,91 @@ | ||
from copy import deepcopy | ||
|
||
import numpy as np | ||
import pandas as pd | ||
import pytest | ||
|
||
from etna.datasets import TSDataset | ||
from etna.models import LinearPerSegmentModel | ||
from etna.pipeline import AutoRegressivePipeline | ||
from etna.transforms import DateFlagsTransform | ||
from etna.transforms import LagTransform | ||
from etna.transforms import LinearTrendTransform | ||
|
||
|
||
def test_fit(example_tsds): | ||
"""Test that AutoRegressivePipeline pipeline makes fit without failing.""" | ||
model = LinearPerSegmentModel() | ||
transforms = [LagTransform(in_column="target", lags=[1]), DateFlagsTransform()] | ||
pipeline = AutoRegressivePipeline(model=model, transforms=transforms, horizon=5, step=1) | ||
pipeline.fit(example_tsds) | ||
|
||
|
||
def test_forecast_columns(example_tsds): | ||
"""Test that AutoRegressivePipeline generates all the columns.""" | ||
original_ts = deepcopy(example_tsds) | ||
horizon = 5 | ||
|
||
# make predictions in AutoRegressivePipeline | ||
model = LinearPerSegmentModel() | ||
transforms = [LagTransform(in_column="target", lags=[1]), DateFlagsTransform(is_weekend=True)] | ||
pipeline = AutoRegressivePipeline(model=model, transforms=transforms, horizon=horizon, step=1) | ||
pipeline.fit(example_tsds) | ||
forecast_pipeline = pipeline.forecast() | ||
|
||
# generate all columns | ||
original_ts.fit_transform(transforms) | ||
|
||
assert set(forecast_pipeline.columns) == set(original_ts.columns) | ||
|
||
|
||
def test_forecast_one_step(example_tsds): | ||
"""Test that AutoRegressivePipeline gets predictions one by one if step is equal to 1.""" | ||
original_ts = deepcopy(example_tsds) | ||
horizon = 5 | ||
|
||
# make predictions in AutoRegressivePipeline | ||
model = LinearPerSegmentModel() | ||
transforms = [LagTransform(in_column="target", lags=[1])] | ||
pipeline = AutoRegressivePipeline(model=model, transforms=transforms, horizon=horizon, step=1) | ||
pipeline.fit(example_tsds) | ||
forecast_pipeline = pipeline.forecast() | ||
|
||
# make predictions manually | ||
df = original_ts.to_pandas() | ||
original_ts.fit_transform(transforms) | ||
model = LinearPerSegmentModel() | ||
model.fit(original_ts) | ||
for i in range(horizon): | ||
cur_ts = TSDataset(df, freq=original_ts.freq) | ||
# these transform don't fit and we can fit_transform them at each step | ||
cur_ts.transform(transforms) | ||
cur_forecast_ts = cur_ts.make_future(1) | ||
cur_future_ts = model.forecast(cur_forecast_ts) | ||
to_add_df = cur_future_ts.to_pandas() | ||
df = pd.concat([df, to_add_df[df.columns]]) | ||
|
||
forecast_manual = TSDataset(df.tail(horizon), freq=original_ts.freq) | ||
assert np.all(forecast_pipeline[:, :, "target"] == forecast_manual[:, :, "target"]) | ||
|
||
|
||
@pytest.mark.parametrize("horizon, step", ((1, 1), (5, 1), (5, 2), (5, 3), (5, 4), (5, 5), (20, 1), (20, 2), (20, 3))) | ||
def test_forecast_multi_step(example_tsds, horizon, step): | ||
"""Test that AutoRegressivePipeline gets correct number of predictions if step is more than 1.""" | ||
model = LinearPerSegmentModel() | ||
transforms = [LagTransform(in_column="target", lags=[step])] | ||
pipeline = AutoRegressivePipeline(model=model, transforms=transforms, horizon=horizon, step=step) | ||
pipeline.fit(example_tsds) | ||
forecast_pipeline = pipeline.forecast() | ||
|
||
assert forecast_pipeline.df.shape[0] == horizon | ||
|
||
|
||
def test_forecast_with_fit_transforms(example_tsds): | ||
"""Test that AutoRegressivePipeline can work with transforms that need fitting.""" | ||
horizon = 5 | ||
|
||
model = LinearPerSegmentModel() | ||
transforms = [LagTransform(in_column="target", lags=[1]), LinearTrendTransform(in_column="target")] | ||
pipeline = AutoRegressivePipeline(model=model, transforms=transforms, horizon=horizon, step=1) | ||
pipeline.fit(example_tsds) | ||
pipeline.forecast() |