Skip to content

Commit

Permalink
Merge pull request #145 from Hella/custom_moving_average
Browse files Browse the repository at this point in the history
moving custom function applied to acoustics
  • Loading branch information
deatinor authored Apr 14, 2020
2 parents 57a8443 + 4e710d1 commit 29ec056
Show file tree
Hide file tree
Showing 6 changed files with 298 additions and 0 deletions.
12 changes: 12 additions & 0 deletions gtime/custom/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
"""
The :mod:`gtime.custom` module implements custom methods for time
series.
"""

from .crest_factor_detrending import CrestFactorDetrending
from .sorted_density import SortedDensity

__all__ = [
"CrestFactorDetrending",
"SortedDensity",
]
82 changes: 82 additions & 0 deletions gtime/custom/crest_factor_detrending.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
import pandas as pd
from sklearn.utils.validation import check_is_fitted

from ..base import add_class_name
from gtime.feature_extraction import MovingCustomFunction

class CrestFactorDetrending(MovingCustomFunction):
"""Crest factor detrending model.
This class removes the trend from the data by using the crest factor definition.
Each sample is normalize by its weighted surrounding.
Generalized detrending is defined in (eq. 1) of: H. P. Tukuljac, V. Pulkki,
H. Gamper, K. Godin, I. J. Tashev and N. Raghuvanshi, "A Sparsity Measure for Echo
Density Growth in General Environments," ICASSP 2019 - 2019 IEEE International
Conference on Acoustics, Speech and Signal Processing (ICASSP), Brighton, United
Kingdom, 2019, pp. 1-5.
Parameters
----------
window_size : int, optional, default: ``1``
The number of previous points on which to compute the crest factor detrending.
is_causal : bool, optional, default: ``True``
Whether the current sample is computed based only on the past or also on the future.
Examples
>>> import pandas as pd
>>> from CrestFactorDetrending import CrestFactorDetrending
>>> ts = pd.DataFrame([0, 1, 2, 3, 4, 5])
>>> gnrl_dtr = CrestFactorDetrending(window_size=2)
>>> gnrl_dtr.fit_transform(ts)
0__CrestFactorDetrending
0 NaN
1 1.000000
2 0.800000
3 0.692308
4 0.640000
5 0.609756
--------
"""

def __init__(self, window_size: int = 1, is_causal: bool = True):
def detrend(signal):
import numpy as np
N = 2
signal = np.array(signal)
large_signal_segment = signal**N
large_segment_mean = np.sum(large_signal_segment)
if (self.is_causal):
ref_index = -1
else:
ref_index = int(len(signal)/2)
small_signal_segment = signal[ref_index]**N
return small_signal_segment/large_segment_mean # (eq. 1)
super().__init__(detrend)
self.window_size = window_size
self.is_causal = is_causal

@add_class_name
def transform(self, time_series: pd.DataFrame) -> pd.DataFrame:
"""For every row of ``time_series``, compute the moving crest factor detrending function of the
previous ``window_size`` elements.
Parameters
----------
time_series : pd.DataFrame, shape (n_samples, 1), required
The DataFrame on which to compute the rolling moving custom function.
Returns
-------
time_series_t : pd.DataFrame, shape (n_samples, 1)
A DataFrame, with the same length as ``time_series``, containing the rolling
moving custom function for each element.
"""
check_is_fitted(self)

if(self.is_causal):
time_series_mvg_dtr = time_series.rolling(self.window_size).apply(
self.custom_feature_function, raw=self.raw
)
else:
time_series_mvg_dtr = time_series.rolling(self.window_size, min_periods = int(self.window_size/2)).apply(
self.custom_feature_function, raw=self.raw
)
time_series_mvg_dtr = time_series_mvg_dtr.dropna()

time_series_t = time_series_mvg_dtr
return time_series_t
78 changes: 78 additions & 0 deletions gtime/custom/sorted_density.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
import pandas as pd
from sklearn.utils.validation import check_is_fitted

from ..base import add_class_name
from gtime.feature_extraction import MovingCustomFunction

class SortedDensity(MovingCustomFunction):
"""For each row in ``time_series``, compute the sorted density function of the
previous ``window_size`` rows. If there are not enough rows, the value is ``Nan``.
Sorted density measured is defined in (eq. 1) of: H. P. Tukuljac, V. Pulkki,
H. Gamper, K. Godin, I. J. Tashev and N. Raghuvanshi, "A Sparsity Measure for Echo
Density Growth in General Environments," ICASSP 2019 - 2019 IEEE International
Conference on Acoustics, Speech and Signal Processing (ICASSP), Brighton, United
Kingdom, 2019, pp. 1-5.
Parameters
----------
window_size : int, optional, default: ``1``
The number of previous points on which to compute the sorted density.
is_causal : bool, optional, default: ``True``
Whether the current sample is computed based only on the past or also on the future.
Examples
--------
>>> import pandas as pd
>>> from gtime.feature_extraction import SortedDensity
>>> ts = pd.DataFrame([0, 1, 2, 3, 4, 5])
>>> mv_avg = SortedDensity(window_size=2)
>>> mv_avg.fit_transform(ts)
0__SortedDensity
0 NaN
1 0.500000
2 0.666667
3 0.700000
4 0.714286
5 0.722222
--------
"""
def __init__(self, window_size: int = 1, is_causal: bool = True):
def sorted_density(signal):
import numpy as np
t = (np.array(range(len(signal))) + 1)
signal = signal[signal.argsort()[::-1]]
t = np.reshape(t, signal.shape)
SD = np.sum(np.multiply(t, signal))/np.sum(signal) # (eq. 2)
SD = SD/(len(signal))
return SD
super().__init__(sorted_density)
self.window_size = window_size
self.is_causal = is_causal

@add_class_name
def transform(self, time_series: pd.DataFrame) -> pd.DataFrame:
"""For every row of ``time_series``, compute the moving sorted density function of the
previous ``window_size`` elements.
Parameters
----------
time_series : pd.DataFrame, shape (n_samples, 1), required
The DataFrame on which to compute the rolling moving custom function.
Returns
-------
time_series_t : pd.DataFrame, shape (n_samples, 1)
A DataFrame, with the same length as ``time_series``, containing the rolling
moving custom function for each element.
"""
check_is_fitted(self)


if(self.is_causal):
time_series_mvg_sd = time_series.rolling(self.window_size).apply(
self.custom_feature_function, raw=self.raw
)
else:
time_series_mvg_sd = time_series.rolling(self.window_size, min_periods = int(self.window_size/2)).apply(
self.custom_feature_function, raw=self.raw
)
time_series_mvg_sd = time_series_mvg_sd.dropna()

time_series_t = time_series_mvg_sd
return time_series_t
Empty file added gtime/custom/tests/__init__.py
Empty file.
63 changes: 63 additions & 0 deletions gtime/custom/tests/test_crest_factor_detrending.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
import numpy as np
import pandas as pd
import pandas.util.testing as testing
import pytest

from gtime.custom.crest_factor_detrending import CrestFactorDetrending

def get_input_data():
input_data = pd.DataFrame.from_dict({"x_1": [0, 7, 2], "x_2": [2, 10, 4]})
input_data.index = [
pd.Timestamp(2000, 1, 1),
pd.Timestamp(2000, 2, 1),
pd.Timestamp(2000, 3, 1),
]
return input_data

def get_output_causal():
custom_feature = CrestFactorDetrending(window_size=2, is_causal = True)
feature_name = custom_feature.__class__.__name__
output_causal = pd.DataFrame.from_dict(
{
f"x_1__{feature_name}": [np.nan, 1.0, 0.07547169811320754],
f"x_2__{feature_name}": [np.nan, 0.9615384615384616, 0.13793103448275862],
}
)
output_causal.index = [
pd.Timestamp(2000, 1, 1),
pd.Timestamp(2000, 2, 1),
pd.Timestamp(2000, 3, 1),
]
return output_causal

def get_output_anticausal():
custom_feature = CrestFactorDetrending(window_size=2, is_causal = False)
feature_name = custom_feature.__class__.__name__
output_anticausal = pd.DataFrame.from_dict(
{
f"x_1__{feature_name}": [1.0, 0.07547169811320754],
f"x_2__{feature_name}": [0.9615384615384616, 0.13793103448275862],
}
)
output_anticausal.index = [
pd.Timestamp(2000, 2, 1),
pd.Timestamp(2000, 3, 1),
]
return output_anticausal

input_data = get_input_data()
output_causal = get_output_causal()
output_anticausal = get_output_anticausal()

class TestCrestFactorDetrending:
@pytest.mark.parametrize("test_input, expected", [(input_data, output_causal)])
def test_crest_factor_detrending_causal(self, test_input, expected):
feature = CrestFactorDetrending(window_size=2, is_causal = True)
output = feature.fit_transform(test_input)
testing.assert_frame_equal(output, expected)

@pytest.mark.parametrize("test_input, expected", [(input_data, output_anticausal)])
def test_crest_factor_detrending_anticausal(self, test_input, expected):
feature = CrestFactorDetrending(window_size=2, is_causal = False)
output = feature.fit_transform(test_input)
testing.assert_frame_equal(output, expected)
63 changes: 63 additions & 0 deletions gtime/custom/tests/test_sorted_density.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
import numpy as np
import pandas as pd
import pandas.util.testing as testing
import pytest

from gtime.custom.sorted_density import SortedDensity

def get_input_data():
input_data = pd.DataFrame.from_dict({"x_1": [0, 7, 2], "x_2": [2, 10, 4]})
input_data.index = [
pd.Timestamp(2000, 1, 1),
pd.Timestamp(2000, 2, 1),
pd.Timestamp(2000, 3, 1),
]
return input_data

def get_output_causal():
custom_feature = SortedDensity(window_size=2, is_causal = True)
feature_name = custom_feature.__class__.__name__
output_causal = pd.DataFrame.from_dict(
{
f"x_1__{feature_name}": [np.nan, 0.5, 0.6111111111111112],
f"x_2__{feature_name}": [np.nan, 0.5833333333333334, 0.6428571428571429],
}
)
output_causal.index = [
pd.Timestamp(2000, 1, 1),
pd.Timestamp(2000, 2, 1),
pd.Timestamp(2000, 3, 1),
]
return output_causal

def get_output_anticausal():
custom_feature = SortedDensity(window_size=2, is_causal = False)
feature_name = custom_feature.__class__.__name__
output_anticausal = pd.DataFrame.from_dict(
{
f"x_1__{feature_name}": [0.5, 0.6111111111111112],
f"x_2__{feature_name}": [0.5833333333333334, 0.6428571428571429],
}
)
output_anticausal.index = [
pd.Timestamp(2000, 2, 1),
pd.Timestamp(2000, 3, 1),
]
return output_anticausal

input_data = get_input_data()
output_causal = get_output_causal()
output_anticausal = get_output_anticausal()

class TestSortedDensity:
@pytest.mark.parametrize("test_input, expected", [(input_data, output_causal)])
def test_crest_factor_detrending_causal(self, test_input, expected):
feature = SortedDensity(window_size=2, is_causal = True)
output = feature.fit_transform(test_input)
testing.assert_frame_equal(output, expected)

@pytest.mark.parametrize("test_input, expected", [(input_data, output_anticausal)])
def test_crest_factor_detrending_anticausal(self, test_input, expected):
feature = SortedDensity(window_size=2, is_causal = False)
output = feature.fit_transform(test_input)
testing.assert_frame_equal(output, expected)

0 comments on commit 29ec056

Please sign in to comment.