Skip to content

Commit

Permalink
Resample beliefs about instantaneous sensors (#118)
Browse files Browse the repository at this point in the history
* Support downsampling instantaneous sensors

* More robust implementation of resampling instantaneous sensor data, taking into account DST transitions

* Add tests

* Add NotImplementedError case

* Simplify util function

* Expand docstring

* isort

* Fix test

* Add two regular (non-DST-transition) resampling cases

* Move index reset dance to within util function

* Remove upsampling example in a test for downsampling (upsampling should use a different method, such as pad or interpolate)

* Remove redundante call to drop_duplicates

* typo

* Clarify inline comment

* Add inline explanation about handling DST transitions

* Revert "Remove upsampling example in a test for downsampling (upsampling should use a different method, such as pad or interpolate)"

This reverts commit 707ea63.

* Add new property: event_frequency

* Rename function to resample instantaneous events, rewrite its logic by resampling per unique offset, and support multiple resampling methods, with some of them updating the event resolution of the resulting BeliefsDataFrame

* Add a lot more test cases for resampling instantaneous sensor data

* Test resolution and frequency for resample instantaneous BeliefsDataFrame

* Expand test for resampling instantaneous BeliefsDataFrame with 'first' method.

* clarifications

* Do not cast floats to ints (loss of information)

* Restrict resampling to BeliefsDataFrames with 1 row per event

* isort and flake8

* Rename test

* Clarify docstring of function parameters

* Expand resample_events docstring
  • Loading branch information
Flix6x authored Nov 22, 2022
1 parent 009a602 commit 47a28a0
Show file tree
Hide file tree
Showing 6 changed files with 509 additions and 21 deletions.
80 changes: 73 additions & 7 deletions timely_beliefs/beliefs/classes.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ class TimedBelief(object):
- a cumulative probability (the likelihood of the value being equal or lower than stated)*
* The default assumption is that the mean value is given (cp=0.5), but if no beliefs about possible other outcomes
are given, then this will be treated as a deterministic belief (cp=1). As an alternative to specifying an cumulative
are given, then this will be treated as a deterministic belief (cp=1). As an alternative to specifying a cumulative
probability explicitly, you can specify an integer number of standard deviations which is translated
into a cumulative probability assuming a normal distribution (e.g. sigma=-1 becomes cp=0.1587).
"""
Expand Down Expand Up @@ -697,6 +697,15 @@ def __repr__(self):
"""Add the sensor and event resolution to the string representation of the BeliefsSeries."""
return super().__repr__() + "\n" + meta_repr(self)

@property
def event_frequency(self) -> Optional[timedelta]:
"""Duration between observations of events.
:returns: a timedelta for regularly spaced observations
None for irregularly spaced observations
"""
return pd.Timedelta(pd.infer_freq(self.index.unique("event_start")))


class BeliefsDataFrame(pd.DataFrame):
"""Beliefs about a sensor.
Expand Down Expand Up @@ -1053,6 +1062,15 @@ def set_event_value_from_source(
)
)

@property
def event_frequency(self) -> Optional[timedelta]:
"""Duration between observations of events.
:returns: a timedelta for regularly spaced observations
None for irregularly spaced observations
"""
return pd.Timedelta(pd.infer_freq(self.index.unique("event_start")))

@property
def knowledge_times(self) -> pd.DatetimeIndex:
return pd.DatetimeIndex(
Expand Down Expand Up @@ -1401,12 +1419,53 @@ def resample_events(
) -> "BeliefsDataFrame":
"""Aggregate over multiple events (downsample) or split events into multiple sub-events (upsample).
Drops NaN values by default.
NB If you need to only keep the most recent belief,
set keep_only_most_recent_belief=True for a significant speed boost.
:param keep_nan_values: if True, place back resampled NaN values.
Resampling events in a BeliefsDataFrames can be quite a slow operation, depending on the complexity of the data.
In general, resampling events may need to deal with:
- the distinction between event resolution (the duration of events) and event frequency (the duration between event starts)
todo: this distinction was introduced in timely-beliefs==1.15.0 and still needs to be incorporated in code
- upsampling or downsampling
note: this function supports both
- different resampling methods (e.g. 'mean', 'interpolate' or 'first')
note: this function defaults to 'mean' for downsampling and 'pad' for upsampling
todo: allow to set this explicitly, and derive a default from a sensor attribute
- different event resolutions (e.g. instantaneous recordings vs. hourly averages)
note: this function only supports few less complex cases of resampling instantaneous sensors
- daylight savings time (DST) transitions
note: this function resamples such that events coincide with midnight in both DST and non-DST
note: only tested for instantaneous sensors
todo: streamline how DST transitions are handled for instantaneous and non-instantaneous sensors
- combining beliefs with different belief times
note: for BeliefsDataFrames with multiple belief times per event, consider keep_only_most_recent_belief=True for a significant speed boost
- combining beliefs from different sources
note: resampling is currently done separately for each source
- joining marginal probability distributions
Each of the above aspects needs a carefully thought out and tested implementation.
Quite a few cases have been implemented in detail already, such as:
- a quite general (but slow) implementation for sensors recording average flows.
- a much faster implementation for some less complex cases
- a separate implementation for less complex BeliefsDataFrames with instantaneous recordings,
which is robust against DST transitions.
If you encounter a case that is not supported yet, we invite you to open a GitHub ticket and describe your case.
Finally, a note on why we named this function 'resample_events'.
BeliefsDataFrames record the timing of events, the timing of beliefs, sources and probabilities.
It is conceivable to resample any of these, for example:
- resample belief times to show how beliefs about an event change every day
- resample sources to show how model versions improved accuracy
- resample probabilities given some distribution to show how that affects extreme outcomes and risk
Although the term, when applied to time series, usually is about resampling events,
we wanted the function name to be explicit about what we resample.
:param event_resolution: duration of events after resampling (except for instantaneous sensors, in which case
it is the duration between events after resampling: the event frequency).
:param distribution: Type of probability distribution to assume when taking the mean over probabilistic values.
Supported distributions are 'discrete', 'normal' and 'uniform'.
:param keep_only_most_recent_belief: If True, assign the most recent belief time to each event after resampling.
Only applies in case of multiple beliefs per event.
:param keep_nan_values: If True, place back resampled NaN values. Drops NaN values by default.
"""

if self.empty:
Expand All @@ -1416,6 +1475,13 @@ def resample_events(
return self
df = self

# Resample instantaneous sensors
# The event resolution stays zero, but the event frequency is updated
if df.event_resolution == timedelta(0):
if df.lineage.number_of_events != len(df):
raise NotImplementedError("Please file a GitHub ticket.")
return belief_utils.resample_instantaneous_events(df, event_resolution)

belief_timing_col = (
"belief_time" if "belief_time" in df.index.names else "belief_horizon"
)
Expand Down
112 changes: 111 additions & 1 deletion timely_beliefs/beliefs/utils.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
from __future__ import annotations

import warnings
from datetime import datetime, timedelta
from typing import List, Optional, Union

import numpy as np
import pandas as pd
import pytz
from packaging import version
from pandas.core.groupby import DataFrameGroupBy

Expand Down Expand Up @@ -283,7 +286,9 @@ def join_beliefs(
if output_resolution > input_resolution:

# Create new BeliefsDataFrame with downsampled event_start
if output_resolution % input_resolution != timedelta(0):
if input_resolution == timedelta(
0
) or output_resolution % input_resolution != timedelta(0):
raise NotImplementedError(
"Cannot downsample from resolution %s to %s."
% (input_resolution, output_resolution)
Expand Down Expand Up @@ -778,6 +783,111 @@ def extreme_timedeltas_not_equal(
return td_a != td_b


def resample_instantaneous_events(
df: pd.DataFrame | "classes.BeliefsDataFrame",
resolution: timedelta,
method: str | None = None,
dropna: bool = True,
) -> pd.DataFrame | "classes.BeliefsDataFrame":
"""Resample data representing instantaneous events.
Updates the event frequency of the resulting data frame, and possibly also its event resolution.
The event resolution is only updated if the resampling method computes a characteristic of a period of events,
like 'mean' or 'first'.
Note that, for resolutions over 1 hour, the data frequency may not turn out to be constant per se.
This is due to DST transitions:
- The duration between events is typically longer for the fall DST transition.
- The duration between events is typically shorter for the spring DST transition.
This is done to keep the data frequency in step with midnight in the sensor's timezone.
"""

# Default resampling method for instantaneous sensors
if method is None:
method = "asfreq"

# Use event_start as the only index level
index_names = df.index.names
df = df.reset_index().set_index("event_start")

# Resample the data in each unique fixed timezone offset that belongs to the given IANA timezone, then recombine
unique_offsets = df.index.map(lambda x: x.utcoffset()).unique()
resampled_df_offsets = []
for offset in unique_offsets:
df_offset = df.copy()
# Convert all the data to given timezone offset
df_offset.index = df.index.tz_convert(
pytz.FixedOffset(offset.seconds // 60)
) # offset is max 1439 minutes, so we don't need to check offset.days
# Resample all the data in the given timezone offset, using the given method
resampled_df_offset = getattr(df_offset.resample(resolution), method)()
# Convert back to the original timezone
if isinstance(df.index, pd.DatetimeIndex) and df.index.tz is not None:
resampled_df_timezone = resampled_df_offset.tz_convert(df.index.tz)
elif isinstance(df, classes.BeliefsDataFrame):
# As a backup, use the original timezone from the BeliefsDataFrame's sensor
resampled_df_timezone = resampled_df_offset.tz_convert(df.sensor.timezone)
else:
ValueError("Missing original timezone.")
# See which resampled rows still fall in the given offset, in this timezone
resampled_df_timezone = resampled_df_timezone[
resampled_df_timezone.index.map(lambda x: x.utcoffset()) == offset
]
resampled_df_offsets.append(resampled_df_timezone)
resampled_df = pd.concat(resampled_df_offsets).sort_index()

# If possible, infer missing frequency
if resampled_df.index.freq is None and len(resampled_df) > 2:
resampled_df.index.freq = pd.infer_freq(resampled_df.index)

# Restore the original index levels
resampled_df = resampled_df.reset_index().set_index(index_names)

if method in (
"mean",
"max",
"min",
"median",
"count",
"nunique",
"first",
"last",
"ohlc",
"prod",
"size",
"sem",
"std",
"sum",
"var",
"quantile",
):
# These methods derive properties of a period of events.
# Therefore, the event resolution is updated.
# The methods are typically used for downsampling.
resampled_df.event_resolution = resolution
elif method in (
"asfreq",
"interpolate",
"ffill",
"bfill",
"pad",
"backfill",
"nearest",
):
# These methods derive intermediate events.
# Therefore, the event resolution is unaffected.
# The methods are typically used for upsampling.
pass
else:
raise NotImplementedError(
f"Please file a GitHub ticket for timely-beliefs to support the '{method}' method."
)

if dropna:
return resampled_df.dropna()
return resampled_df


def meta_repr(
tb_structure: Union["classes.BeliefsDataFrame", "classes.BeliefsSeries"]
) -> str:
Expand Down
4 changes: 3 additions & 1 deletion timely_beliefs/sensors/classes.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,10 @@ class SensorDBMixin(Sensor):
Mixin class for a table with sensors.
"""

# db field that is not a Sensor attribute
id = Column(Integer, primary_key=True)
# overwriting name as db field

# db fields that overwrite Sensor attributes
name = Column(String(120), nullable=False, default="")
unit = Column(String(80), nullable=False, default="")
timezone = Column(String(80), nullable=False, default="UTC")
Expand Down
Loading

0 comments on commit 47a28a0

Please sign in to comment.