Skip to content

Commit

Permalink
PR #190 Fixed point of origin for exponential decay window functions …
Browse files Browse the repository at this point in the history
…to the last value in window — backport of ClickHouse#39593

Fixed point of origin for exponential decay window functions to the last value in window.
Fixed tests on Debug build type.
  • Loading branch information
quickhouse authored and Enmk committed Sep 17, 2022
1 parent a9c09c7 commit 04e7964
Show file tree
Hide file tree
Showing 3 changed files with 157 additions and 125 deletions.
224 changes: 119 additions & 105 deletions src/Processors/Transforms/WindowTransform.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -965,9 +965,6 @@ void WindowTransform::updateAggregationState()
}
}
}

prev_frame_start = frame_start;
prev_frame_end = frame_end;
}

void WindowTransform::writeOutCurrentRow()
Expand Down Expand Up @@ -1209,6 +1206,9 @@ void WindowTransform::appendChunk(Chunk & chunk)
return;
}

prev_frame_start = frame_start;
prev_frame_end = frame_end;

// Move to the next row. The frame will have to be recalculated.
// The peer group start is updated at the beginning of the loop,
// because current_row might now be past-the-end.
Expand Down Expand Up @@ -1614,16 +1614,12 @@ struct StatefulWindowFunction : public WindowFunction

struct ExponentialTimeDecayedSumState
{
RowNumber previous_frame_start;
RowNumber previous_frame_end;
Float64 previous_time;
Float64 previous_sum;
};

struct ExponentialTimeDecayedAvgState
{
RowNumber previous_frame_start;
RowNumber previous_frame_end;
Float64 previous_time;
Float64 previous_sum;
Float64 previous_count;
Expand Down Expand Up @@ -1682,40 +1678,43 @@ struct WindowFunctionExponentialTimeDecayedSum final : public StatefulWindowFunc
auto & state = getState(workspace);

Float64 result = 0;
Float64 curr_t = WindowFunctionHelpers::getValue<Float64>(transform, function_index, ARGUMENT_TIME, transform->current_row);

if (state.previous_frame_start <= transform->frame_start
&& transform->frame_start < state.previous_frame_end
&& state.previous_frame_end <= transform->frame_end)
if (transform->frame_start < transform->frame_end)
{
for (RowNumber i = state.previous_frame_start; i < transform->frame_start; transform->advanceRowNumber(i))
{
Float64 prev_val = WindowFunctionHelpers::getValue<Float64>(transform, function_index, ARGUMENT_VALUE, i);
Float64 prev_t = WindowFunctionHelpers::getValue<Float64>(transform, function_index, ARGUMENT_TIME, i);
result -= std::exp((prev_t - curr_t) / decay_length) * prev_val;
}
result += std::exp((state.previous_time - curr_t) / decay_length) * state.previous_sum;
for (RowNumber i = state.previous_frame_end; i < transform->frame_end; transform->advanceRowNumber(i))
RowNumber frame_back = transform->prevRowNumber(transform->frame_end);
Float64 back_t = WindowFunctionHelpers::getValue<Float64>(transform, function_index, ARGUMENT_TIME, frame_back);

if (transform->prev_frame_start <= transform->frame_start
&& transform->frame_start < transform->prev_frame_end
&& transform->prev_frame_end <= transform->frame_end)
{
Float64 prev_val = WindowFunctionHelpers::getValue<Float64>(transform, function_index, ARGUMENT_VALUE, i);
Float64 prev_t = WindowFunctionHelpers::getValue<Float64>(transform, function_index, ARGUMENT_TIME, i);
result += std::exp((prev_t - curr_t) / decay_length) * prev_val;
for (RowNumber i = transform->prev_frame_start; i < transform->frame_start; transform->advanceRowNumber(i))
{
Float64 prev_val = WindowFunctionHelpers::getValue<Float64>(transform, function_index, ARGUMENT_VALUE, i);
Float64 prev_t = WindowFunctionHelpers::getValue<Float64>(transform, function_index, ARGUMENT_TIME, i);
result -= std::exp((prev_t - back_t) / decay_length) * prev_val;
}
result += std::exp((state.previous_time - back_t) / decay_length) * state.previous_sum;
for (RowNumber i = transform->prev_frame_end; i < transform->frame_end; transform->advanceRowNumber(i))
{
Float64 prev_val = WindowFunctionHelpers::getValue<Float64>(transform, function_index, ARGUMENT_VALUE, i);
Float64 prev_t = WindowFunctionHelpers::getValue<Float64>(transform, function_index, ARGUMENT_TIME, i);
result += std::exp((prev_t - back_t) / decay_length) * prev_val;
}
}
}
else
{
for (RowNumber i = transform->frame_start; i < transform->frame_end; transform->advanceRowNumber(i))
else
{
Float64 prev_val = WindowFunctionHelpers::getValue<Float64>(transform, function_index, ARGUMENT_VALUE, i);
Float64 prev_t = WindowFunctionHelpers::getValue<Float64>(transform, function_index, ARGUMENT_TIME, i);
result += std::exp((prev_t - curr_t) / decay_length) * prev_val;
for (RowNumber i = transform->frame_start; i < transform->frame_end; transform->advanceRowNumber(i))
{
Float64 prev_val = WindowFunctionHelpers::getValue<Float64>(transform, function_index, ARGUMENT_VALUE, i);
Float64 prev_t = WindowFunctionHelpers::getValue<Float64>(transform, function_index, ARGUMENT_TIME, i);
result += std::exp((prev_t - back_t) / decay_length) * prev_val;
}
}
}

state.previous_sum = result;
state.previous_time = curr_t;
state.previous_frame_start = transform->frame_start;
state.previous_frame_end = transform->frame_end;
state.previous_sum = result;
state.previous_time = back_t;
}

WindowFunctionHelpers::setValueToOutputColumn<Float64>(transform, function_index, result);
}
Expand Down Expand Up @@ -1773,18 +1772,24 @@ struct WindowFunctionExponentialTimeDecayedMax final : public WindowFunction
void windowInsertResultInto(const WindowTransform * transform,
size_t function_index) override
{
Float64 result = std::numeric_limits<Float64>::lowest();
Float64 curr_t = WindowFunctionHelpers::getValue<Float64>(transform, function_index, ARGUMENT_TIME, transform->current_row);
Float64 result = std::numeric_limits<Float64>::quiet_NaN();

for (RowNumber i = transform->frame_start; i < transform->frame_end; transform->advanceRowNumber(i))
if (transform->frame_start < transform->frame_end)
{
Float64 value = WindowFunctionHelpers::getValue<Float64>(transform, function_index, ARGUMENT_VALUE, i);
Float64 t = WindowFunctionHelpers::getValue<Float64>(transform, function_index, ARGUMENT_TIME, i);
result = std::numeric_limits<Float64>::lowest();
RowNumber frame_back = transform->prevRowNumber(transform->frame_end);
Float64 back_t = WindowFunctionHelpers::getValue<Float64>(transform, function_index, ARGUMENT_TIME, frame_back);

/// Avoiding extra calls to `exp` and multiplications.
if (value > result || t > curr_t || result < 0)
for (RowNumber i = transform->frame_start; i < transform->frame_end; transform->advanceRowNumber(i))
{
result = std::max(std::exp((t - curr_t) / decay_length) * value, result);
Float64 value = WindowFunctionHelpers::getValue<Float64>(transform, function_index, ARGUMENT_VALUE, i);
Float64 t = WindowFunctionHelpers::getValue<Float64>(transform, function_index, ARGUMENT_TIME, i);

/// Avoiding extra calls to `exp` and multiplications.
if (value > result || t > back_t || result < 0)
{
result = std::max(std::exp((t - back_t) / decay_length) * value, result);
}
}
}

Expand Down Expand Up @@ -1839,37 +1844,40 @@ struct WindowFunctionExponentialTimeDecayedCount final : public StatefulWindowFu
auto & state = getState(workspace);

Float64 result = 0;
Float64 curr_t = WindowFunctionHelpers::getValue<Float64>(transform, function_index, ARGUMENT_TIME, transform->current_row);

if (state.previous_frame_start <= transform->frame_start
&& transform->frame_start < state.previous_frame_end
&& state.previous_frame_end <= transform->frame_end)
if (transform->frame_start < transform->frame_end)
{
for (RowNumber i = state.previous_frame_start; i < transform->frame_start; transform->advanceRowNumber(i))
{
Float64 prev_t = WindowFunctionHelpers::getValue<Float64>(transform, function_index, ARGUMENT_TIME, i);
result -= std::exp((prev_t - curr_t) / decay_length);
}
result += std::exp((state.previous_time - curr_t) / decay_length) * state.previous_sum;
for (RowNumber i = state.previous_frame_end; i < transform->frame_end; transform->advanceRowNumber(i))
RowNumber frame_back = transform->prevRowNumber(transform->frame_end);
Float64 back_t = WindowFunctionHelpers::getValue<Float64>(transform, function_index, ARGUMENT_TIME, frame_back);

if (transform->prev_frame_start <= transform->frame_start
&& transform->frame_start < transform->prev_frame_end
&& transform->prev_frame_end <= transform->frame_end)
{
Float64 prev_t = WindowFunctionHelpers::getValue<Float64>(transform, function_index, ARGUMENT_TIME, i);
result += std::exp((prev_t - curr_t) / decay_length);
for (RowNumber i = transform->prev_frame_start; i < transform->frame_start; transform->advanceRowNumber(i))
{
Float64 prev_t = WindowFunctionHelpers::getValue<Float64>(transform, function_index, ARGUMENT_TIME, i);
result -= std::exp((prev_t - back_t) / decay_length);
}
result += std::exp((state.previous_time - back_t) / decay_length) * state.previous_sum;
for (RowNumber i = transform->prev_frame_end; i < transform->frame_end; transform->advanceRowNumber(i))
{
Float64 prev_t = WindowFunctionHelpers::getValue<Float64>(transform, function_index, ARGUMENT_TIME, i);
result += std::exp((prev_t - back_t) / decay_length);
}
}
}
else
{
for (RowNumber i = transform->frame_start; i < transform->frame_end; transform->advanceRowNumber(i))
else
{
Float64 prev_t = WindowFunctionHelpers::getValue<Float64>(transform, function_index, ARGUMENT_TIME, i);
result += std::exp((prev_t - curr_t) / decay_length);
for (RowNumber i = transform->frame_start; i < transform->frame_end; transform->advanceRowNumber(i))
{
Float64 prev_t = WindowFunctionHelpers::getValue<Float64>(transform, function_index, ARGUMENT_TIME, i);
result += std::exp((prev_t - back_t) / decay_length);
}
}
}

state.previous_sum = result;
state.previous_time = curr_t;
state.previous_frame_start = transform->frame_start;
state.previous_frame_end = transform->frame_end;
state.previous_sum = result;
state.previous_time = back_t;
}

WindowFunctionHelpers::setValueToOutputColumn<Float64>(transform, function_index, result);
}
Expand Down Expand Up @@ -1932,55 +1940,61 @@ struct WindowFunctionExponentialTimeDecayedAvg final : public StatefulWindowFunc

Float64 count = 0;
Float64 sum = 0;
Float64 curr_t = WindowFunctionHelpers::getValue<Float64>(transform, function_index, ARGUMENT_TIME, transform->current_row);
Float64 result = std::numeric_limits<Float64>::quiet_NaN();

if (state.previous_frame_start <= transform->frame_start
&& transform->frame_start < state.previous_frame_end
&& state.previous_frame_end <= transform->frame_end)
if (transform->frame_start < transform->frame_end)
{
for (RowNumber i = state.previous_frame_start; i < transform->frame_start; transform->advanceRowNumber(i))
{
Float64 prev_val = WindowFunctionHelpers::getValue<Float64>(transform, function_index, ARGUMENT_VALUE, i);
Float64 prev_t = WindowFunctionHelpers::getValue<Float64>(transform, function_index, ARGUMENT_TIME, i);
Float64 decay = std::exp((prev_t - curr_t) / decay_length);
sum -= decay * prev_val;
count -= decay;
}
RowNumber frame_back = transform->prevRowNumber(transform->frame_end);
Float64 back_t = WindowFunctionHelpers::getValue<Float64>(transform, function_index, ARGUMENT_TIME, frame_back);

if (transform->prev_frame_start <= transform->frame_start
&& transform->frame_start < transform->prev_frame_end
&& transform->prev_frame_end <= transform->frame_end)
{
Float64 decay = std::exp((state.previous_time - curr_t) / decay_length);
sum += decay * state.previous_sum;
count += decay * state.previous_count;
}
for (RowNumber i = transform->prev_frame_start; i < transform->frame_start; transform->advanceRowNumber(i))
{
Float64 prev_val = WindowFunctionHelpers::getValue<Float64>(transform, function_index, ARGUMENT_VALUE, i);
Float64 prev_t = WindowFunctionHelpers::getValue<Float64>(transform, function_index, ARGUMENT_TIME, i);
Float64 decay = std::exp((prev_t - back_t) / decay_length);
sum -= decay * prev_val;
count -= decay;
}

for (RowNumber i = state.previous_frame_end; i < transform->frame_end; transform->advanceRowNumber(i))
{
Float64 prev_val = WindowFunctionHelpers::getValue<Float64>(transform, function_index, ARGUMENT_VALUE, i);
Float64 prev_t = WindowFunctionHelpers::getValue<Float64>(transform, function_index, ARGUMENT_TIME, i);
Float64 decay = std::exp((prev_t - curr_t) / decay_length);
sum += decay * prev_val;
count += decay;
{
Float64 decay = std::exp((state.previous_time - back_t) / decay_length);
sum += decay * state.previous_sum;
count += decay * state.previous_count;
}

for (RowNumber i = transform->prev_frame_end; i < transform->frame_end; transform->advanceRowNumber(i))
{
Float64 prev_val = WindowFunctionHelpers::getValue<Float64>(transform, function_index, ARGUMENT_VALUE, i);
Float64 prev_t = WindowFunctionHelpers::getValue<Float64>(transform, function_index, ARGUMENT_TIME, i);
Float64 decay = std::exp((prev_t - back_t) / decay_length);
sum += decay * prev_val;
count += decay;
}
}
}
else
{
for (RowNumber i = transform->frame_start; i < transform->frame_end; transform->advanceRowNumber(i))
else
{
Float64 prev_val = WindowFunctionHelpers::getValue<Float64>(transform, function_index, ARGUMENT_VALUE, i);
Float64 prev_t = WindowFunctionHelpers::getValue<Float64>(transform, function_index, ARGUMENT_TIME, i);
Float64 decay = std::exp((prev_t - curr_t) / decay_length);
sum += decay * prev_val;
count += decay;
for (RowNumber i = transform->frame_start; i < transform->frame_end; transform->advanceRowNumber(i))
{
Float64 prev_val = WindowFunctionHelpers::getValue<Float64>(transform, function_index, ARGUMENT_VALUE, i);
Float64 prev_t = WindowFunctionHelpers::getValue<Float64>(transform, function_index, ARGUMENT_TIME, i);
Float64 decay = std::exp((prev_t - back_t) / decay_length);
sum += decay * prev_val;
count += decay;
}
}
}

state.previous_sum = sum;
state.previous_count = count;
state.previous_time = curr_t;
state.previous_frame_start = transform->frame_start;
state.previous_frame_end = transform->frame_end;
state.previous_sum = sum;
state.previous_count = count;
state.previous_time = back_t;

WindowFunctionHelpers::setValueToOutputColumn<Float64>(transform, function_index, sum/count);
result = sum/count;
}

WindowFunctionHelpers::setValueToOutputColumn<Float64>(transform, function_index, result);
}

private:
Expand Down
24 changes: 21 additions & 3 deletions src/Processors/Transforms/WindowTransform.h
Original file line number Diff line number Diff line change
Expand Up @@ -198,8 +198,19 @@ class WindowTransform final : public IProcessor
++x.block;
}

RowNumber nextRowNumber(const RowNumber & x) const
{
RowNumber result = x;
advanceRowNumber(result);
return result;
}

void retreatRowNumber(RowNumber & x) const
{
#ifndef NDEBUG
auto original_x = x;
#endif

if (x.row > 0)
{
--x.row;
Expand All @@ -213,12 +224,19 @@ class WindowTransform final : public IProcessor
x.row = blockAt(x).rows - 1;

#ifndef NDEBUG
auto xx = x;
advanceRowNumber(xx);
assert(xx == x);
auto advanced_retreated_x = x;
advanceRowNumber(advanced_retreated_x);
assert(advanced_retreated_x == original_x);
#endif
}

RowNumber prevRowNumber(const RowNumber & x) const
{
RowNumber result = x;
retreatRowNumber(result);
return result;
}

auto moveRowNumber(const RowNumber & _x, int64_t offset) const;
auto moveRowNumberNoCheck(const RowNumber & _x, int64_t offset) const;

Expand Down
34 changes: 17 additions & 17 deletions tests/queries/0_stateless/02020_exponential_smoothing.reference
Original file line number Diff line number Diff line change
Expand Up @@ -654,23 +654,23 @@ exponentialTimeDecayedAvg
0 48 0.201 ████████████████████
0 49 0.196 ███████████████████▌
Check `exponentialTimeDecayed.*` supports sliding windows
2 1 3.010050167084 2 3.030251507111 0.993333444442
1 2 7.060905027605 4.080805360107 4.02030134086 1.756312382816
0 3 12.091654548833 5.101006700134 5.000500014167 2.418089094006
4 4 11.050650848754 5.050250835421 5.000500014167 2.209909172572
5 5 9.970249502081 5 5.000500014167 1.993850509716
1 6 20.07305726224 10.202013400268 5.000500014167 4.014210020072
0 7 15.991544871125 10.100501670842 3.98029867414 4.017674596889
2 1 2.950447180363 1.960397346614 2.970248507056 0.993333444442
1 2 6.921089740404 4 3.940694040604 1.756312382816
0 3 11.85222374685 5 4.901483479757 2.418089094006
4 4 10.831833301125 4.950249168746 4.901483479757 2.209909172572
5 5 9.772825334477 4.900993366534 4.901483479757 1.993850509716
1 6 19.675584097659 10 4.901483479757 4.014210020072
0 7 15.832426341049 10 3.940694040604 4.017674596889
10 8 10.980198673307 10 2.970248507056 3.696727276261
Check `exponentialTimeDecayedMax` works with negative values
2 1 -1.010050167084
1 2 -1
10 3 -0.990049833749
4 4 -0.980198673307
5 5 -1.010050167084
1 6 -1
10 7 -0.990049833749
10 8 -0.980198673307
10 9 -9.801986733068
9.81 10 -9.801986733068
2 1 -0.990049833749
1 2 -0.980198673307
10 3 -0.970445533549
4 4 -0.960789439152
5 5 -0.990049833749
1 6 -0.980198673307
10 7 -0.970445533549
10 8 -0.960789439152
10 9 -9.607894391523
9.81 10 -9.704455335485
9.9 11 -9.712388869079

0 comments on commit 04e7964

Please sign in to comment.