Skip to content

Masked backtest #613

Merged
merged 11 commits into from
Mar 24, 2022
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,18 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
## Unreleased
### Added
-
-
- Masked backtest ([#613](https://github.com/tinkoff-ai/etna/pull/613))
-
-
-
- Add plot_periodogram ([#606](https://github.com/tinkoff-ai/etna/pull/606))
-
-
- Add prediction_actual_scatter_plot ([#610](https://github.com/tinkoff-ai/etna/pull/610))
-


### Changed
-
-
Expand Down
1 change: 1 addition & 0 deletions etna/pipeline/__init__.py
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
from etna.pipeline.autoregressive_pipeline import AutoRegressivePipeline
from etna.pipeline.base import FoldMask
from etna.pipeline.pipeline import Pipeline
197 changes: 153 additions & 44 deletions etna/pipeline/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from typing import Optional
from typing import Sequence
from typing import Tuple
from typing import Union

import numpy as np
import pandas as pd
Expand All @@ -24,6 +25,89 @@
from etna.metrics import Metric
from etna.metrics import MetricAggregationMode

Timestamp = Union[str, pd.Timestamp]


class CrossValidationMode(Enum):
"""Enum for different cross-validation modes."""

expand = "expand"
constant = "constant"


class FoldMask(BaseMixin):
"""Container to hold the description of the fold mask.
Fold masks are expected to be used for backtest strategy customization.
alex-hse-repository marked this conversation as resolved.
Show resolved Hide resolved
"""

def __init__(
self,
first_train_timestamp: Optional[Timestamp],
last_train_timestamp: Timestamp,
target_timestamps: List[Timestamp],
):
"""Init FoldMask.

Parameters
----------
first_train_timestamp:
First train timestamp, the first timestamp in the dataset if None is passed
last_train_timestamp:
Last train timestamp
target_timestamps:
List of target timestamps
"""
self.first_train_timestamp = pd.to_datetime(first_train_timestamp) if first_train_timestamp else None
self.last_train_timestamp = pd.to_datetime(last_train_timestamp)
self.target_timestamps = sorted([pd.to_datetime(timestamp) for timestamp in target_timestamps])

self._validate_last_train_timestamp()
self._validate_target_timestamps()

def _validate_last_train_timestamp(self):
"""Check that last train timestamp is later then first train timestamp."""
if self.first_train_timestamp and self.last_train_timestamp < self.first_train_timestamp:
raise ValueError("Last train timestamp should be not sooner than first train timestamp!")

def _validate_target_timestamps(self):
"""Check that all target timestamps are later then last train timestamp."""
first_target_timestamp = self.target_timestamps[0]
if first_target_timestamp <= self.last_train_timestamp:
raise ValueError("Target timestamps should be strictly later then last train timestamp!")

def validate_on_dataset(self, ts: TSDataset, horizon: int):
"""Validate fold mask on the dataset with specified horizon.

Parameters
----------
ts:
Dataset to validate on
horizon:
Forecasting horizon
"""
dataset_timestamps = list(ts.index)
dataset_description = ts.describe()

min_first_timestamp = dataset_description["start_timestamp"].min()
if self.first_train_timestamp and self.first_train_timestamp < min_first_timestamp:
raise ValueError(f"First train timestamp should be later than {min_first_timestamp}!")

last_timestamp = dataset_description["end_timestamp"].min()
if self.last_train_timestamp > last_timestamp:
raise ValueError(f"Last train timestamp should be not later than {last_timestamp}!")

dataset_first_target_timestamp = dataset_timestamps[dataset_timestamps.index(self.last_train_timestamp) + 1]
mask_first_target_timestamp = self.target_timestamps[0]
if mask_first_target_timestamp < dataset_first_target_timestamp:
raise ValueError(f"First target timestamp should be not sooner than {dataset_first_target_timestamp}!")

dataset_last_target_timestamp = dataset_timestamps[
dataset_timestamps.index(self.last_train_timestamp) + horizon
]
mask_last_target_timestamp = self.target_timestamps[-1]
if dataset_last_target_timestamp < mask_last_target_timestamp:
raise ValueError(f"Last target timestamp should be not later than {dataset_last_target_timestamp}!")


class AbstractPipeline(ABC):
"""Interface for pipeline."""
Expand Down Expand Up @@ -71,7 +155,7 @@ def backtest(
self,
ts: TSDataset,
metrics: List[Metric],
n_folds: int = 5,
n_folds: Union[int, List[FoldMask]] = 5,
mode: str = "expand",
aggregate_metrics: bool = False,
n_jobs: int = 1,
Expand All @@ -86,7 +170,7 @@ def backtest(
metrics:
List of metrics to compute for each fold
n_folds:
Number of folds
Number of folds or the list of fold masks
mode:
One of 'expand', 'constant' -- train generation policy
aggregate_metrics:
Expand All @@ -103,13 +187,6 @@ def backtest(
"""


class CrossValidationMode(Enum):
"""Enum for different cross-validation modes."""

expand = "expand"
constant = "constant"


class BasePipeline(AbstractPipeline, BaseMixin):
"""Base class for pipeline."""

Expand Down Expand Up @@ -219,6 +296,40 @@ def _validate_backtest_dataset(ts: TSDataset, n_folds: int, horizon: int):
f"series {segment} does not."
)

@staticmethod
def _generate_masks_from_n_folds(ts: TSDataset, n_folds: int, horizon: int, mode: str) -> List[FoldMask]:
"""Generate fold masks from n_folds."""
mode_enum = CrossValidationMode(mode.lower())
if mode_enum == CrossValidationMode.expand:
constant_history_length = 0
elif mode_enum == CrossValidationMode.constant:
constant_history_length = 1
else:
raise NotImplementedError(
f"Only '{CrossValidationMode.expand}' and '{CrossValidationMode.constant}' modes allowed"
)

masks = []
dataset_timestamps = list(ts.index)
min_timestamp_idx, max_timestamp_idx = 0, len(dataset_timestamps)
for offset in range(n_folds, 0, -1):
min_train_idx = min_timestamp_idx + (n_folds - offset) * horizon * constant_history_length
max_train_idx = max_timestamp_idx - horizon * offset - 1
min_test_idx = max_train_idx + 1
max_test_idx = max_train_idx + horizon

min_train, max_train = dataset_timestamps[min_train_idx], dataset_timestamps[max_train_idx]
min_test, max_test = dataset_timestamps[min_test_idx], dataset_timestamps[max_test_idx]

mask = FoldMask(
first_train_timestamp=min_train,
last_train_timestamp=max_train,
target_timestamps=list(pd.date_range(start=min_test, end=max_test, freq=ts.freq)),
)
masks.append(mask)

return masks

@staticmethod
def _validate_backtest_metrics(metrics: List[Metric]):
"""Check that given metrics are valid for backtest."""
Expand All @@ -233,27 +344,13 @@ def _validate_backtest_metrics(metrics: List[Metric]):

@staticmethod
def _generate_folds_datasets(
ts: TSDataset, n_folds: int, horizon: int, mode: str = "expand"
ts: TSDataset, masks: List[FoldMask], horizon: int
) -> Generator[Tuple[TSDataset, TSDataset], None, None]:
"""Generate a sequence of train-test pairs according to timestamp."""
mode_enum = CrossValidationMode[mode.lower()]
if mode_enum == CrossValidationMode.expand:
constant_history_length = 0
elif mode_enum == CrossValidationMode.constant:
constant_history_length = 1
else:
raise NotImplementedError(
f"Only '{CrossValidationMode.expand}' and '{CrossValidationMode.constant}' modes allowed"
)

timestamps = ts.index
min_timestamp_idx, max_timestamp_idx = 0, len(timestamps)
for offset in range(n_folds, 0, -1):
# if not self._constant_history_length, left border of train df is always equal to minimal timestamp value;
# it means that all the given data is used.
# if self._constant_history_length, left border of train df moves to one horizon steps on each split
min_train_idx = min_timestamp_idx + (n_folds - offset) * horizon * constant_history_length
max_train_idx = max_timestamp_idx - horizon * offset - 1
"""Generate folds."""
timestamps = list(ts.index)
for mask in masks:
min_train_idx = timestamps.index(mask.first_train_timestamp)
max_train_idx = timestamps.index(mask.last_train_timestamp)
min_test_idx = max_train_idx + 1
max_test_idx = max_train_idx + horizon

Expand All @@ -263,7 +360,6 @@ def _generate_folds_datasets(
train, test = ts.train_test_split(
train_start=min_train, train_end=max_train, test_start=min_test, test_end=max_test
)

yield train, test

@staticmethod
Expand All @@ -275,24 +371,23 @@ def _compute_metrics(metrics: List[Metric], y_true: TSDataset, y_pred: TSDataset
return metrics_values

def _run_fold(
self,
train: TSDataset,
test: TSDataset,
fold_number: int,
metrics: List[Metric],
self, train: TSDataset, test: TSDataset, fold_number: int, mask: FoldMask, metrics: List[Metric]
) -> Dict[str, Any]:
"""Run fit-forecast pipeline of model for one fold."""
tslogger.start_experiment(job_type="crossval", group=str(fold_number))

pipeline = deepcopy(self)
pipeline.fit(ts=train)
forecast = pipeline.forecast()

fold: Dict[str, Any] = {}
for stage_name, stage_df in zip(("train", "test"), (train, test)):
fold[f"{stage_name}_timerange"] = {}
fold[f"{stage_name}_timerange"]["start"] = stage_df.index.min()
fold[f"{stage_name}_timerange"]["end"] = stage_df.index.max()

forecast.df = forecast.df.loc[mask.target_timestamps]
test.df = test.df.loc[mask.target_timestamps]

fold["forecast"] = forecast
fold["metrics"] = deepcopy(self._compute_metrics(metrics=metrics, y_true=test, y_pred=forecast))

Expand Down Expand Up @@ -352,11 +447,24 @@ def _get_backtest_forecasts(self) -> pd.DataFrame:
forecasts = pd.concat(forecasts_list)
return forecasts

def _prepare_fold_masks(self, ts: TSDataset, masks: Union[int, List[FoldMask]], mode: str) -> List[FoldMask]:
"""Prepare and validate fold masks."""
if isinstance(masks, int):
self._validate_backtest_n_folds(n_folds=masks)
self._validate_backtest_dataset(ts=ts, n_folds=masks, horizon=self.horizon)
masks = self._generate_masks_from_n_folds(ts=ts, n_folds=masks, horizon=self.horizon, mode=mode)
for i, mask in enumerate(masks):
mask.first_train_timestamp = mask.first_train_timestamp if mask.first_train_timestamp else ts.index[0]
masks[i] = mask
for mask in masks:
mask.validate_on_dataset(ts=ts, horizon=self.horizon)
return masks

def backtest(
self,
ts: TSDataset,
metrics: List[Metric],
n_folds: int = 5,
n_folds: Union[int, List[FoldMask]] = 5,
mode: str = "expand",
aggregate_metrics: bool = False,
n_jobs: int = 1,
Expand All @@ -371,9 +479,9 @@ def backtest(
metrics:
List of metrics to compute for each fold
n_folds:
Number of folds
Number of folds or the list of fold masks
mode:
One of 'expand', 'constant' -- train generation policy
One of 'expand', 'constant' -- train generation policy, ignored if n_folds is a list of masks
aggregate_metrics:
If True aggregate metrics above folds, return raw metrics otherwise
n_jobs:
Expand All @@ -387,16 +495,17 @@ def backtest(
Metrics dataframe, forecast dataframe and dataframe with information about folds
"""
self._init_backtest()
self._validate_backtest_n_folds(n_folds=n_folds)
self._validate_backtest_dataset(ts=ts, n_folds=n_folds, horizon=self.horizon)
self._validate_backtest_metrics(metrics=metrics)
masks = self._prepare_fold_masks(ts=ts, masks=n_folds, mode=mode)

folds = Parallel(n_jobs=n_jobs, **joblib_params)(
delayed(self._run_fold)(train=train, test=test, fold_number=fold_number, metrics=metrics)
delayed(self._run_fold)(
train=train, test=test, fold_number=fold_number, mask=masks[fold_number], metrics=metrics
)
for fold_number, (train, test) in enumerate(
self._generate_folds_datasets(ts=ts, n_folds=n_folds, horizon=self.horizon, mode=mode)
self._generate_folds_datasets(ts=ts, masks=masks, horizon=self.horizon)
)
)

self._folds = {i: fold for i, fold in enumerate(folds)}

metrics_df = self._get_backtest_metrics(aggregate_metrics=aggregate_metrics)
Expand Down
33 changes: 33 additions & 0 deletions tests/test_pipeline/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -170,3 +170,36 @@ def step_ts() -> Tuple[TSDataset, pd.DataFrame, pd.DataFrame]:
)

return ts, metrics_df, forecast_df


@pytest.fixture
def simple_ts() -> TSDataset:
timerange = pd.date_range(start="2020-01-01", periods=10).to_list()
df = pd.DataFrame({"timestamp": timerange + timerange})
df["segment"] = ["segment_0"] * 10 + ["segment_1"] * 10
df["target"] = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10] + [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
df = TSDataset.to_dataset(df)
ts = TSDataset(df, freq="D")
return ts


@pytest.fixture
def masked_ts() -> TSDataset:
timerange = pd.date_range(start="2020-01-01", periods=11).to_list()
df = pd.DataFrame({"timestamp": timerange + timerange})
df["segment"] = ["segment_0"] * 11 + ["segment_1"] * 11
df["target"] = [0, 0, 0, 1, 1, 0, 1, 0, 0, 0, 1] + [0, 0, 0, 0, 0, 1, 0, 0, 1, 1, 0]
df = TSDataset.to_dataset(df)
ts = TSDataset(df, freq="D")
return ts


@pytest.fixture
def ts_run_fold() -> TSDataset:
timerange = pd.date_range(start="2020-01-01", periods=11).to_list()
df = pd.DataFrame({"timestamp": timerange + timerange})
df["segment"] = ["segment_0"] * 11 + ["segment_1"] * 11
df["target"] = [1, 2, 3, 4, 100, 6, 7, 100, 100, 100, 100] + [1, 2, 3, 4, 5, 6, 7, 8, 9, -6, 11]
df = TSDataset.to_dataset(df)
ts = TSDataset(df, freq="D")
return ts
Loading