Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixed point of origin for exponential decay window functions to the last value in window — backport of ClickHouse/ClickHouse#39593 #190

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
224 changes: 119 additions & 105 deletions src/Processors/Transforms/WindowTransform.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -965,9 +965,6 @@ void WindowTransform::updateAggregationState()
}
}
}

prev_frame_start = frame_start;
prev_frame_end = frame_end;
}

void WindowTransform::writeOutCurrentRow()
Expand Down Expand Up @@ -1209,6 +1206,9 @@ void WindowTransform::appendChunk(Chunk & chunk)
return;
}

prev_frame_start = frame_start;
prev_frame_end = frame_end;

// Move to the next row. The frame will have to be recalculated.
// The peer group start is updated at the beginning of the loop,
// because current_row might now be past-the-end.
Expand Down Expand Up @@ -1614,16 +1614,12 @@ struct StatefulWindowFunction : public WindowFunction

struct ExponentialTimeDecayedSumState
{
RowNumber previous_frame_start;
RowNumber previous_frame_end;
Float64 previous_time;
Float64 previous_sum;
};

struct ExponentialTimeDecayedAvgState
{
RowNumber previous_frame_start;
RowNumber previous_frame_end;
Float64 previous_time;
Float64 previous_sum;
Float64 previous_count;
Expand Down Expand Up @@ -1682,40 +1678,43 @@ struct WindowFunctionExponentialTimeDecayedSum final : public StatefulWindowFunc
auto & state = getState(workspace);

Float64 result = 0;
Float64 curr_t = WindowFunctionHelpers::getValue<Float64>(transform, function_index, ARGUMENT_TIME, transform->current_row);

if (state.previous_frame_start <= transform->frame_start
&& transform->frame_start < state.previous_frame_end
&& state.previous_frame_end <= transform->frame_end)
if (transform->frame_start < transform->frame_end)
{
for (RowNumber i = state.previous_frame_start; i < transform->frame_start; transform->advanceRowNumber(i))
{
Float64 prev_val = WindowFunctionHelpers::getValue<Float64>(transform, function_index, ARGUMENT_VALUE, i);
Float64 prev_t = WindowFunctionHelpers::getValue<Float64>(transform, function_index, ARGUMENT_TIME, i);
result -= std::exp((prev_t - curr_t) / decay_length) * prev_val;
}
result += std::exp((state.previous_time - curr_t) / decay_length) * state.previous_sum;
for (RowNumber i = state.previous_frame_end; i < transform->frame_end; transform->advanceRowNumber(i))
RowNumber frame_back = transform->prevRowNumber(transform->frame_end);
Float64 back_t = WindowFunctionHelpers::getValue<Float64>(transform, function_index, ARGUMENT_TIME, frame_back);

if (transform->prev_frame_start <= transform->frame_start
&& transform->frame_start < transform->prev_frame_end
&& transform->prev_frame_end <= transform->frame_end)
{
Float64 prev_val = WindowFunctionHelpers::getValue<Float64>(transform, function_index, ARGUMENT_VALUE, i);
Float64 prev_t = WindowFunctionHelpers::getValue<Float64>(transform, function_index, ARGUMENT_TIME, i);
result += std::exp((prev_t - curr_t) / decay_length) * prev_val;
for (RowNumber i = transform->prev_frame_start; i < transform->frame_start; transform->advanceRowNumber(i))
{
Float64 prev_val = WindowFunctionHelpers::getValue<Float64>(transform, function_index, ARGUMENT_VALUE, i);
Float64 prev_t = WindowFunctionHelpers::getValue<Float64>(transform, function_index, ARGUMENT_TIME, i);
result -= std::exp((prev_t - back_t) / decay_length) * prev_val;
}
result += std::exp((state.previous_time - back_t) / decay_length) * state.previous_sum;
for (RowNumber i = transform->prev_frame_end; i < transform->frame_end; transform->advanceRowNumber(i))
{
Float64 prev_val = WindowFunctionHelpers::getValue<Float64>(transform, function_index, ARGUMENT_VALUE, i);
Float64 prev_t = WindowFunctionHelpers::getValue<Float64>(transform, function_index, ARGUMENT_TIME, i);
result += std::exp((prev_t - back_t) / decay_length) * prev_val;
}
}
}
else
{
for (RowNumber i = transform->frame_start; i < transform->frame_end; transform->advanceRowNumber(i))
else
{
Float64 prev_val = WindowFunctionHelpers::getValue<Float64>(transform, function_index, ARGUMENT_VALUE, i);
Float64 prev_t = WindowFunctionHelpers::getValue<Float64>(transform, function_index, ARGUMENT_TIME, i);
result += std::exp((prev_t - curr_t) / decay_length) * prev_val;
for (RowNumber i = transform->frame_start; i < transform->frame_end; transform->advanceRowNumber(i))
{
Float64 prev_val = WindowFunctionHelpers::getValue<Float64>(transform, function_index, ARGUMENT_VALUE, i);
Float64 prev_t = WindowFunctionHelpers::getValue<Float64>(transform, function_index, ARGUMENT_TIME, i);
result += std::exp((prev_t - back_t) / decay_length) * prev_val;
}
}
}

state.previous_sum = result;
state.previous_time = curr_t;
state.previous_frame_start = transform->frame_start;
state.previous_frame_end = transform->frame_end;
state.previous_sum = result;
state.previous_time = back_t;
}

WindowFunctionHelpers::setValueToOutputColumn<Float64>(transform, function_index, result);
}
Expand Down Expand Up @@ -1773,18 +1772,24 @@ struct WindowFunctionExponentialTimeDecayedMax final : public WindowFunction
void windowInsertResultInto(const WindowTransform * transform,
size_t function_index) override
{
Float64 result = std::numeric_limits<Float64>::lowest();
Float64 curr_t = WindowFunctionHelpers::getValue<Float64>(transform, function_index, ARGUMENT_TIME, transform->current_row);
Float64 result = std::numeric_limits<Float64>::quiet_NaN();

for (RowNumber i = transform->frame_start; i < transform->frame_end; transform->advanceRowNumber(i))
if (transform->frame_start < transform->frame_end)
{
Float64 value = WindowFunctionHelpers::getValue<Float64>(transform, function_index, ARGUMENT_VALUE, i);
Float64 t = WindowFunctionHelpers::getValue<Float64>(transform, function_index, ARGUMENT_TIME, i);
result = std::numeric_limits<Float64>::lowest();
RowNumber frame_back = transform->prevRowNumber(transform->frame_end);
Float64 back_t = WindowFunctionHelpers::getValue<Float64>(transform, function_index, ARGUMENT_TIME, frame_back);

/// Avoiding extra calls to `exp` and multiplications.
if (value > result || t > curr_t || result < 0)
for (RowNumber i = transform->frame_start; i < transform->frame_end; transform->advanceRowNumber(i))
{
result = std::max(std::exp((t - curr_t) / decay_length) * value, result);
Float64 value = WindowFunctionHelpers::getValue<Float64>(transform, function_index, ARGUMENT_VALUE, i);
Float64 t = WindowFunctionHelpers::getValue<Float64>(transform, function_index, ARGUMENT_TIME, i);

/// Avoiding extra calls to `exp` and multiplications.
if (value > result || t > back_t || result < 0)
{
result = std::max(std::exp((t - back_t) / decay_length) * value, result);
}
}
}

Expand Down Expand Up @@ -1839,37 +1844,40 @@ struct WindowFunctionExponentialTimeDecayedCount final : public StatefulWindowFu
auto & state = getState(workspace);

Float64 result = 0;
Float64 curr_t = WindowFunctionHelpers::getValue<Float64>(transform, function_index, ARGUMENT_TIME, transform->current_row);

if (state.previous_frame_start <= transform->frame_start
&& transform->frame_start < state.previous_frame_end
&& state.previous_frame_end <= transform->frame_end)
if (transform->frame_start < transform->frame_end)
{
for (RowNumber i = state.previous_frame_start; i < transform->frame_start; transform->advanceRowNumber(i))
{
Float64 prev_t = WindowFunctionHelpers::getValue<Float64>(transform, function_index, ARGUMENT_TIME, i);
result -= std::exp((prev_t - curr_t) / decay_length);
}
result += std::exp((state.previous_time - curr_t) / decay_length) * state.previous_sum;
for (RowNumber i = state.previous_frame_end; i < transform->frame_end; transform->advanceRowNumber(i))
RowNumber frame_back = transform->prevRowNumber(transform->frame_end);
Float64 back_t = WindowFunctionHelpers::getValue<Float64>(transform, function_index, ARGUMENT_TIME, frame_back);

if (transform->prev_frame_start <= transform->frame_start
&& transform->frame_start < transform->prev_frame_end
&& transform->prev_frame_end <= transform->frame_end)
{
Float64 prev_t = WindowFunctionHelpers::getValue<Float64>(transform, function_index, ARGUMENT_TIME, i);
result += std::exp((prev_t - curr_t) / decay_length);
for (RowNumber i = transform->prev_frame_start; i < transform->frame_start; transform->advanceRowNumber(i))
{
Float64 prev_t = WindowFunctionHelpers::getValue<Float64>(transform, function_index, ARGUMENT_TIME, i);
result -= std::exp((prev_t - back_t) / decay_length);
}
result += std::exp((state.previous_time - back_t) / decay_length) * state.previous_sum;
for (RowNumber i = transform->prev_frame_end; i < transform->frame_end; transform->advanceRowNumber(i))
{
Float64 prev_t = WindowFunctionHelpers::getValue<Float64>(transform, function_index, ARGUMENT_TIME, i);
result += std::exp((prev_t - back_t) / decay_length);
}
}
}
else
{
for (RowNumber i = transform->frame_start; i < transform->frame_end; transform->advanceRowNumber(i))
else
{
Float64 prev_t = WindowFunctionHelpers::getValue<Float64>(transform, function_index, ARGUMENT_TIME, i);
result += std::exp((prev_t - curr_t) / decay_length);
for (RowNumber i = transform->frame_start; i < transform->frame_end; transform->advanceRowNumber(i))
{
Float64 prev_t = WindowFunctionHelpers::getValue<Float64>(transform, function_index, ARGUMENT_TIME, i);
result += std::exp((prev_t - back_t) / decay_length);
}
}
}

state.previous_sum = result;
state.previous_time = curr_t;
state.previous_frame_start = transform->frame_start;
state.previous_frame_end = transform->frame_end;
state.previous_sum = result;
state.previous_time = back_t;
}

WindowFunctionHelpers::setValueToOutputColumn<Float64>(transform, function_index, result);
}
Expand Down Expand Up @@ -1932,55 +1940,61 @@ struct WindowFunctionExponentialTimeDecayedAvg final : public StatefulWindowFunc

Float64 count = 0;
Float64 sum = 0;
Float64 curr_t = WindowFunctionHelpers::getValue<Float64>(transform, function_index, ARGUMENT_TIME, transform->current_row);
Float64 result = std::numeric_limits<Float64>::quiet_NaN();

if (state.previous_frame_start <= transform->frame_start
&& transform->frame_start < state.previous_frame_end
&& state.previous_frame_end <= transform->frame_end)
if (transform->frame_start < transform->frame_end)
{
for (RowNumber i = state.previous_frame_start; i < transform->frame_start; transform->advanceRowNumber(i))
{
Float64 prev_val = WindowFunctionHelpers::getValue<Float64>(transform, function_index, ARGUMENT_VALUE, i);
Float64 prev_t = WindowFunctionHelpers::getValue<Float64>(transform, function_index, ARGUMENT_TIME, i);
Float64 decay = std::exp((prev_t - curr_t) / decay_length);
sum -= decay * prev_val;
count -= decay;
}
RowNumber frame_back = transform->prevRowNumber(transform->frame_end);
Float64 back_t = WindowFunctionHelpers::getValue<Float64>(transform, function_index, ARGUMENT_TIME, frame_back);

if (transform->prev_frame_start <= transform->frame_start
&& transform->frame_start < transform->prev_frame_end
&& transform->prev_frame_end <= transform->frame_end)
{
Float64 decay = std::exp((state.previous_time - curr_t) / decay_length);
sum += decay * state.previous_sum;
count += decay * state.previous_count;
}
for (RowNumber i = transform->prev_frame_start; i < transform->frame_start; transform->advanceRowNumber(i))
{
Float64 prev_val = WindowFunctionHelpers::getValue<Float64>(transform, function_index, ARGUMENT_VALUE, i);
Float64 prev_t = WindowFunctionHelpers::getValue<Float64>(transform, function_index, ARGUMENT_TIME, i);
Float64 decay = std::exp((prev_t - back_t) / decay_length);
sum -= decay * prev_val;
count -= decay;
}

for (RowNumber i = state.previous_frame_end; i < transform->frame_end; transform->advanceRowNumber(i))
{
Float64 prev_val = WindowFunctionHelpers::getValue<Float64>(transform, function_index, ARGUMENT_VALUE, i);
Float64 prev_t = WindowFunctionHelpers::getValue<Float64>(transform, function_index, ARGUMENT_TIME, i);
Float64 decay = std::exp((prev_t - curr_t) / decay_length);
sum += decay * prev_val;
count += decay;
{
Float64 decay = std::exp((state.previous_time - back_t) / decay_length);
sum += decay * state.previous_sum;
count += decay * state.previous_count;
}

for (RowNumber i = transform->prev_frame_end; i < transform->frame_end; transform->advanceRowNumber(i))
{
Float64 prev_val = WindowFunctionHelpers::getValue<Float64>(transform, function_index, ARGUMENT_VALUE, i);
Float64 prev_t = WindowFunctionHelpers::getValue<Float64>(transform, function_index, ARGUMENT_TIME, i);
Float64 decay = std::exp((prev_t - back_t) / decay_length);
sum += decay * prev_val;
count += decay;
}
}
}
else
{
for (RowNumber i = transform->frame_start; i < transform->frame_end; transform->advanceRowNumber(i))
else
{
Float64 prev_val = WindowFunctionHelpers::getValue<Float64>(transform, function_index, ARGUMENT_VALUE, i);
Float64 prev_t = WindowFunctionHelpers::getValue<Float64>(transform, function_index, ARGUMENT_TIME, i);
Float64 decay = std::exp((prev_t - curr_t) / decay_length);
sum += decay * prev_val;
count += decay;
for (RowNumber i = transform->frame_start; i < transform->frame_end; transform->advanceRowNumber(i))
{
Float64 prev_val = WindowFunctionHelpers::getValue<Float64>(transform, function_index, ARGUMENT_VALUE, i);
Float64 prev_t = WindowFunctionHelpers::getValue<Float64>(transform, function_index, ARGUMENT_TIME, i);
Float64 decay = std::exp((prev_t - back_t) / decay_length);
sum += decay * prev_val;
count += decay;
}
}
}

state.previous_sum = sum;
state.previous_count = count;
state.previous_time = curr_t;
state.previous_frame_start = transform->frame_start;
state.previous_frame_end = transform->frame_end;
state.previous_sum = sum;
state.previous_count = count;
state.previous_time = back_t;

WindowFunctionHelpers::setValueToOutputColumn<Float64>(transform, function_index, sum/count);
result = sum/count;
}

WindowFunctionHelpers::setValueToOutputColumn<Float64>(transform, function_index, result);
}

private:
Expand Down
24 changes: 21 additions & 3 deletions src/Processors/Transforms/WindowTransform.h
Original file line number Diff line number Diff line change
Expand Up @@ -198,8 +198,19 @@ class WindowTransform final : public IProcessor
++x.block;
}

RowNumber nextRowNumber(const RowNumber & x) const
{
RowNumber result = x;
advanceRowNumber(result);
return result;
}

void retreatRowNumber(RowNumber & x) const
{
#ifndef NDEBUG
auto original_x = x;
#endif

if (x.row > 0)
{
--x.row;
Expand All @@ -213,12 +224,19 @@ class WindowTransform final : public IProcessor
x.row = blockAt(x).rows - 1;

#ifndef NDEBUG
auto xx = x;
advanceRowNumber(xx);
assert(xx == x);
auto advanced_retreated_x = x;
advanceRowNumber(advanced_retreated_x);
assert(advanced_retreated_x == original_x);
#endif
}

RowNumber prevRowNumber(const RowNumber & x) const
{
RowNumber result = x;
retreatRowNumber(result);
return result;
}

auto moveRowNumber(const RowNumber & _x, int64_t offset) const;
auto moveRowNumberNoCheck(const RowNumber & _x, int64_t offset) const;

Expand Down
34 changes: 17 additions & 17 deletions tests/queries/0_stateless/02020_exponential_smoothing.reference
Original file line number Diff line number Diff line change
Expand Up @@ -654,23 +654,23 @@ exponentialTimeDecayedAvg
0 48 0.201 ████████████████████
0 49 0.196 ███████████████████▌
Check `exponentialTimeDecayed.*` supports sliding windows
2 1 3.010050167084 2 3.030251507111 0.993333444442
1 2 7.060905027605 4.080805360107 4.02030134086 1.756312382816
0 3 12.091654548833 5.101006700134 5.000500014167 2.418089094006
4 4 11.050650848754 5.050250835421 5.000500014167 2.209909172572
5 5 9.970249502081 5 5.000500014167 1.993850509716
1 6 20.07305726224 10.202013400268 5.000500014167 4.014210020072
0 7 15.991544871125 10.100501670842 3.98029867414 4.017674596889
2 1 2.950447180363 1.960397346614 2.970248507056 0.993333444442
1 2 6.921089740404 4 3.940694040604 1.756312382816
0 3 11.85222374685 5 4.901483479757 2.418089094006
4 4 10.831833301125 4.950249168746 4.901483479757 2.209909172572
5 5 9.772825334477 4.900993366534 4.901483479757 1.993850509716
1 6 19.675584097659 10 4.901483479757 4.014210020072
0 7 15.832426341049 10 3.940694040604 4.017674596889
10 8 10.980198673307 10 2.970248507056 3.696727276261
Check `exponentialTimeDecayedMax` works with negative values
2 1 -1.010050167084
1 2 -1
10 3 -0.990049833749
4 4 -0.980198673307
5 5 -1.010050167084
1 6 -1
10 7 -0.990049833749
10 8 -0.980198673307
10 9 -9.801986733068
9.81 10 -9.801986733068
2 1 -0.990049833749
1 2 -0.980198673307
10 3 -0.970445533549
4 4 -0.960789439152
5 5 -0.990049833749
1 6 -0.980198673307
10 7 -0.970445533549
10 8 -0.960789439152
10 9 -9.607894391523
9.81 10 -9.704455335485
9.9 11 -9.712388869079