From c076916c6a98e63189e500332097719960901dc8 Mon Sep 17 00:00:00 2001 From: Vladimir Chebotaryov Date: Tue, 26 Jul 2022 09:05:31 +0300 Subject: [PATCH 1/2] Fixed point of origin for exponential decay window functions to the last value in window. --- src/Processors/Transforms/WindowTransform.cpp | 224 ++++++++++-------- src/Processors/Transforms/WindowTransform.h | 14 ++ .../02020_exponential_smoothing.reference | 34 +-- 3 files changed, 150 insertions(+), 122 deletions(-) diff --git a/src/Processors/Transforms/WindowTransform.cpp b/src/Processors/Transforms/WindowTransform.cpp index 2a2fed1cc078..b7e02c27adf0 100644 --- a/src/Processors/Transforms/WindowTransform.cpp +++ b/src/Processors/Transforms/WindowTransform.cpp @@ -965,9 +965,6 @@ void WindowTransform::updateAggregationState() } } } - - prev_frame_start = frame_start; - prev_frame_end = frame_end; } void WindowTransform::writeOutCurrentRow() @@ -1209,6 +1206,9 @@ void WindowTransform::appendChunk(Chunk & chunk) return; } + prev_frame_start = frame_start; + prev_frame_end = frame_end; + // Move to the next row. The frame will have to be recalculated. // The peer group start is updated at the beginning of the loop, // because current_row might now be past-the-end. @@ -1614,16 +1614,12 @@ struct StatefulWindowFunction : public WindowFunction struct ExponentialTimeDecayedSumState { - RowNumber previous_frame_start; - RowNumber previous_frame_end; Float64 previous_time; Float64 previous_sum; }; struct ExponentialTimeDecayedAvgState { - RowNumber previous_frame_start; - RowNumber previous_frame_end; Float64 previous_time; Float64 previous_sum; Float64 previous_count; @@ -1682,40 +1678,43 @@ struct WindowFunctionExponentialTimeDecayedSum final : public StatefulWindowFunc auto & state = getState(workspace); Float64 result = 0; - Float64 curr_t = WindowFunctionHelpers::getValue(transform, function_index, ARGUMENT_TIME, transform->current_row); - if (state.previous_frame_start <= transform->frame_start - && transform->frame_start < state.previous_frame_end - && state.previous_frame_end <= transform->frame_end) + if (transform->frame_start < transform->frame_end) { - for (RowNumber i = state.previous_frame_start; i < transform->frame_start; transform->advanceRowNumber(i)) - { - Float64 prev_val = WindowFunctionHelpers::getValue(transform, function_index, ARGUMENT_VALUE, i); - Float64 prev_t = WindowFunctionHelpers::getValue(transform, function_index, ARGUMENT_TIME, i); - result -= std::exp((prev_t - curr_t) / decay_length) * prev_val; - } - result += std::exp((state.previous_time - curr_t) / decay_length) * state.previous_sum; - for (RowNumber i = state.previous_frame_end; i < transform->frame_end; transform->advanceRowNumber(i)) + RowNumber frame_back = transform->prevRowNumber(transform->frame_end); + Float64 back_t = WindowFunctionHelpers::getValue(transform, function_index, ARGUMENT_TIME, frame_back); + + if (transform->prev_frame_start <= transform->frame_start + && transform->frame_start < transform->prev_frame_end + && transform->prev_frame_end <= transform->frame_end) { - Float64 prev_val = WindowFunctionHelpers::getValue(transform, function_index, ARGUMENT_VALUE, i); - Float64 prev_t = WindowFunctionHelpers::getValue(transform, function_index, ARGUMENT_TIME, i); - result += std::exp((prev_t - curr_t) / decay_length) * prev_val; + for (RowNumber i = transform->prev_frame_start; i < transform->frame_start; transform->advanceRowNumber(i)) + { + Float64 prev_val = WindowFunctionHelpers::getValue(transform, function_index, ARGUMENT_VALUE, i); + Float64 prev_t = WindowFunctionHelpers::getValue(transform, function_index, ARGUMENT_TIME, i); + result -= std::exp((prev_t - back_t) / decay_length) * prev_val; + } + result += std::exp((state.previous_time - back_t) / decay_length) * state.previous_sum; + for (RowNumber i = transform->prev_frame_end; i < transform->frame_end; transform->advanceRowNumber(i)) + { + Float64 prev_val = WindowFunctionHelpers::getValue(transform, function_index, ARGUMENT_VALUE, i); + Float64 prev_t = WindowFunctionHelpers::getValue(transform, function_index, ARGUMENT_TIME, i); + result += std::exp((prev_t - back_t) / decay_length) * prev_val; + } } - } - else - { - for (RowNumber i = transform->frame_start; i < transform->frame_end; transform->advanceRowNumber(i)) + else { - Float64 prev_val = WindowFunctionHelpers::getValue(transform, function_index, ARGUMENT_VALUE, i); - Float64 prev_t = WindowFunctionHelpers::getValue(transform, function_index, ARGUMENT_TIME, i); - result += std::exp((prev_t - curr_t) / decay_length) * prev_val; + for (RowNumber i = transform->frame_start; i < transform->frame_end; transform->advanceRowNumber(i)) + { + Float64 prev_val = WindowFunctionHelpers::getValue(transform, function_index, ARGUMENT_VALUE, i); + Float64 prev_t = WindowFunctionHelpers::getValue(transform, function_index, ARGUMENT_TIME, i); + result += std::exp((prev_t - back_t) / decay_length) * prev_val; + } } - } - state.previous_sum = result; - state.previous_time = curr_t; - state.previous_frame_start = transform->frame_start; - state.previous_frame_end = transform->frame_end; + state.previous_sum = result; + state.previous_time = back_t; + } WindowFunctionHelpers::setValueToOutputColumn(transform, function_index, result); } @@ -1773,18 +1772,24 @@ struct WindowFunctionExponentialTimeDecayedMax final : public WindowFunction void windowInsertResultInto(const WindowTransform * transform, size_t function_index) override { - Float64 result = std::numeric_limits::lowest(); - Float64 curr_t = WindowFunctionHelpers::getValue(transform, function_index, ARGUMENT_TIME, transform->current_row); + Float64 result = std::numeric_limits::quiet_NaN(); - for (RowNumber i = transform->frame_start; i < transform->frame_end; transform->advanceRowNumber(i)) + if (transform->frame_start < transform->frame_end) { - Float64 value = WindowFunctionHelpers::getValue(transform, function_index, ARGUMENT_VALUE, i); - Float64 t = WindowFunctionHelpers::getValue(transform, function_index, ARGUMENT_TIME, i); + result = std::numeric_limits::lowest(); + RowNumber frame_back = transform->prevRowNumber(transform->frame_end); + Float64 back_t = WindowFunctionHelpers::getValue(transform, function_index, ARGUMENT_TIME, frame_back); - /// Avoiding extra calls to `exp` and multiplications. - if (value > result || t > curr_t || result < 0) + for (RowNumber i = transform->frame_start; i < transform->frame_end; transform->advanceRowNumber(i)) { - result = std::max(std::exp((t - curr_t) / decay_length) * value, result); + Float64 value = WindowFunctionHelpers::getValue(transform, function_index, ARGUMENT_VALUE, i); + Float64 t = WindowFunctionHelpers::getValue(transform, function_index, ARGUMENT_TIME, i); + + /// Avoiding extra calls to `exp` and multiplications. + if (value > result || t > back_t || result < 0) + { + result = std::max(std::exp((t - back_t) / decay_length) * value, result); + } } } @@ -1839,37 +1844,40 @@ struct WindowFunctionExponentialTimeDecayedCount final : public StatefulWindowFu auto & state = getState(workspace); Float64 result = 0; - Float64 curr_t = WindowFunctionHelpers::getValue(transform, function_index, ARGUMENT_TIME, transform->current_row); - if (state.previous_frame_start <= transform->frame_start - && transform->frame_start < state.previous_frame_end - && state.previous_frame_end <= transform->frame_end) + if (transform->frame_start < transform->frame_end) { - for (RowNumber i = state.previous_frame_start; i < transform->frame_start; transform->advanceRowNumber(i)) - { - Float64 prev_t = WindowFunctionHelpers::getValue(transform, function_index, ARGUMENT_TIME, i); - result -= std::exp((prev_t - curr_t) / decay_length); - } - result += std::exp((state.previous_time - curr_t) / decay_length) * state.previous_sum; - for (RowNumber i = state.previous_frame_end; i < transform->frame_end; transform->advanceRowNumber(i)) + RowNumber frame_back = transform->prevRowNumber(transform->frame_end); + Float64 back_t = WindowFunctionHelpers::getValue(transform, function_index, ARGUMENT_TIME, frame_back); + + if (transform->prev_frame_start <= transform->frame_start + && transform->frame_start < transform->prev_frame_end + && transform->prev_frame_end <= transform->frame_end) { - Float64 prev_t = WindowFunctionHelpers::getValue(transform, function_index, ARGUMENT_TIME, i); - result += std::exp((prev_t - curr_t) / decay_length); + for (RowNumber i = transform->prev_frame_start; i < transform->frame_start; transform->advanceRowNumber(i)) + { + Float64 prev_t = WindowFunctionHelpers::getValue(transform, function_index, ARGUMENT_TIME, i); + result -= std::exp((prev_t - back_t) / decay_length); + } + result += std::exp((state.previous_time - back_t) / decay_length) * state.previous_sum; + for (RowNumber i = transform->prev_frame_end; i < transform->frame_end; transform->advanceRowNumber(i)) + { + Float64 prev_t = WindowFunctionHelpers::getValue(transform, function_index, ARGUMENT_TIME, i); + result += std::exp((prev_t - back_t) / decay_length); + } } - } - else - { - for (RowNumber i = transform->frame_start; i < transform->frame_end; transform->advanceRowNumber(i)) + else { - Float64 prev_t = WindowFunctionHelpers::getValue(transform, function_index, ARGUMENT_TIME, i); - result += std::exp((prev_t - curr_t) / decay_length); + for (RowNumber i = transform->frame_start; i < transform->frame_end; transform->advanceRowNumber(i)) + { + Float64 prev_t = WindowFunctionHelpers::getValue(transform, function_index, ARGUMENT_TIME, i); + result += std::exp((prev_t - back_t) / decay_length); + } } - } - state.previous_sum = result; - state.previous_time = curr_t; - state.previous_frame_start = transform->frame_start; - state.previous_frame_end = transform->frame_end; + state.previous_sum = result; + state.previous_time = back_t; + } WindowFunctionHelpers::setValueToOutputColumn(transform, function_index, result); } @@ -1932,55 +1940,61 @@ struct WindowFunctionExponentialTimeDecayedAvg final : public StatefulWindowFunc Float64 count = 0; Float64 sum = 0; - Float64 curr_t = WindowFunctionHelpers::getValue(transform, function_index, ARGUMENT_TIME, transform->current_row); + Float64 result = std::numeric_limits::quiet_NaN(); - if (state.previous_frame_start <= transform->frame_start - && transform->frame_start < state.previous_frame_end - && state.previous_frame_end <= transform->frame_end) + if (transform->frame_start < transform->frame_end) { - for (RowNumber i = state.previous_frame_start; i < transform->frame_start; transform->advanceRowNumber(i)) - { - Float64 prev_val = WindowFunctionHelpers::getValue(transform, function_index, ARGUMENT_VALUE, i); - Float64 prev_t = WindowFunctionHelpers::getValue(transform, function_index, ARGUMENT_TIME, i); - Float64 decay = std::exp((prev_t - curr_t) / decay_length); - sum -= decay * prev_val; - count -= decay; - } + RowNumber frame_back = transform->prevRowNumber(transform->frame_end); + Float64 back_t = WindowFunctionHelpers::getValue(transform, function_index, ARGUMENT_TIME, frame_back); + if (transform->prev_frame_start <= transform->frame_start + && transform->frame_start < transform->prev_frame_end + && transform->prev_frame_end <= transform->frame_end) { - Float64 decay = std::exp((state.previous_time - curr_t) / decay_length); - sum += decay * state.previous_sum; - count += decay * state.previous_count; - } + for (RowNumber i = transform->prev_frame_start; i < transform->frame_start; transform->advanceRowNumber(i)) + { + Float64 prev_val = WindowFunctionHelpers::getValue(transform, function_index, ARGUMENT_VALUE, i); + Float64 prev_t = WindowFunctionHelpers::getValue(transform, function_index, ARGUMENT_TIME, i); + Float64 decay = std::exp((prev_t - back_t) / decay_length); + sum -= decay * prev_val; + count -= decay; + } - for (RowNumber i = state.previous_frame_end; i < transform->frame_end; transform->advanceRowNumber(i)) - { - Float64 prev_val = WindowFunctionHelpers::getValue(transform, function_index, ARGUMENT_VALUE, i); - Float64 prev_t = WindowFunctionHelpers::getValue(transform, function_index, ARGUMENT_TIME, i); - Float64 decay = std::exp((prev_t - curr_t) / decay_length); - sum += decay * prev_val; - count += decay; + { + Float64 decay = std::exp((state.previous_time - back_t) / decay_length); + sum += decay * state.previous_sum; + count += decay * state.previous_count; + } + + for (RowNumber i = transform->prev_frame_end; i < transform->frame_end; transform->advanceRowNumber(i)) + { + Float64 prev_val = WindowFunctionHelpers::getValue(transform, function_index, ARGUMENT_VALUE, i); + Float64 prev_t = WindowFunctionHelpers::getValue(transform, function_index, ARGUMENT_TIME, i); + Float64 decay = std::exp((prev_t - back_t) / decay_length); + sum += decay * prev_val; + count += decay; + } } - } - else - { - for (RowNumber i = transform->frame_start; i < transform->frame_end; transform->advanceRowNumber(i)) + else { - Float64 prev_val = WindowFunctionHelpers::getValue(transform, function_index, ARGUMENT_VALUE, i); - Float64 prev_t = WindowFunctionHelpers::getValue(transform, function_index, ARGUMENT_TIME, i); - Float64 decay = std::exp((prev_t - curr_t) / decay_length); - sum += decay * prev_val; - count += decay; + for (RowNumber i = transform->frame_start; i < transform->frame_end; transform->advanceRowNumber(i)) + { + Float64 prev_val = WindowFunctionHelpers::getValue(transform, function_index, ARGUMENT_VALUE, i); + Float64 prev_t = WindowFunctionHelpers::getValue(transform, function_index, ARGUMENT_TIME, i); + Float64 decay = std::exp((prev_t - back_t) / decay_length); + sum += decay * prev_val; + count += decay; + } } - } - state.previous_sum = sum; - state.previous_count = count; - state.previous_time = curr_t; - state.previous_frame_start = transform->frame_start; - state.previous_frame_end = transform->frame_end; + state.previous_sum = sum; + state.previous_count = count; + state.previous_time = back_t; - WindowFunctionHelpers::setValueToOutputColumn(transform, function_index, sum/count); + result = sum/count; + } + + WindowFunctionHelpers::setValueToOutputColumn(transform, function_index, result); } private: diff --git a/src/Processors/Transforms/WindowTransform.h b/src/Processors/Transforms/WindowTransform.h index d536c8780d21..5bedfa1fb29a 100644 --- a/src/Processors/Transforms/WindowTransform.h +++ b/src/Processors/Transforms/WindowTransform.h @@ -198,6 +198,13 @@ class WindowTransform final : public IProcessor ++x.block; } + RowNumber nextRowNumber(const RowNumber & x) const + { + RowNumber result = x; + advanceRowNumber(result); + return result; + } + void retreatRowNumber(RowNumber & x) const { if (x.row > 0) @@ -219,6 +226,13 @@ class WindowTransform final : public IProcessor #endif } + RowNumber prevRowNumber(const RowNumber & x) const + { + RowNumber result = x; + retreatRowNumber(result); + return result; + } + auto moveRowNumber(const RowNumber & _x, int64_t offset) const; auto moveRowNumberNoCheck(const RowNumber & _x, int64_t offset) const; diff --git a/tests/queries/0_stateless/02020_exponential_smoothing.reference b/tests/queries/0_stateless/02020_exponential_smoothing.reference index 334d32e1c163..5481bfe80f8d 100644 --- a/tests/queries/0_stateless/02020_exponential_smoothing.reference +++ b/tests/queries/0_stateless/02020_exponential_smoothing.reference @@ -654,23 +654,23 @@ exponentialTimeDecayedAvg 0 48 0.201 ████████████████████ 0 49 0.196 ███████████████████▌ Check `exponentialTimeDecayed.*` supports sliding windows -2 1 3.010050167084 2 3.030251507111 0.993333444442 -1 2 7.060905027605 4.080805360107 4.02030134086 1.756312382816 -0 3 12.091654548833 5.101006700134 5.000500014167 2.418089094006 -4 4 11.050650848754 5.050250835421 5.000500014167 2.209909172572 -5 5 9.970249502081 5 5.000500014167 1.993850509716 -1 6 20.07305726224 10.202013400268 5.000500014167 4.014210020072 -0 7 15.991544871125 10.100501670842 3.98029867414 4.017674596889 +2 1 2.950447180363 1.960397346614 2.970248507056 0.993333444442 +1 2 6.921089740404 4 3.940694040604 1.756312382816 +0 3 11.85222374685 5 4.901483479757 2.418089094006 +4 4 10.831833301125 4.950249168746 4.901483479757 2.209909172572 +5 5 9.772825334477 4.900993366534 4.901483479757 1.993850509716 +1 6 19.675584097659 10 4.901483479757 4.014210020072 +0 7 15.832426341049 10 3.940694040604 4.017674596889 10 8 10.980198673307 10 2.970248507056 3.696727276261 Check `exponentialTimeDecayedMax` works with negative values -2 1 -1.010050167084 -1 2 -1 -10 3 -0.990049833749 -4 4 -0.980198673307 -5 5 -1.010050167084 -1 6 -1 -10 7 -0.990049833749 -10 8 -0.980198673307 -10 9 -9.801986733068 -9.81 10 -9.801986733068 +2 1 -0.990049833749 +1 2 -0.980198673307 +10 3 -0.970445533549 +4 4 -0.960789439152 +5 5 -0.990049833749 +1 6 -0.980198673307 +10 7 -0.970445533549 +10 8 -0.960789439152 +10 9 -9.607894391523 +9.81 10 -9.704455335485 9.9 11 -9.712388869079 From 99ff344cee324f800d17309d58ea9218575e2026 Mon Sep 17 00:00:00 2001 From: Vladimir Chebotaryov Date: Thu, 4 Aug 2022 18:08:32 +0300 Subject: [PATCH 2/2] Fixed tests on Debug build type. --- src/Processors/Transforms/WindowTransform.h | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/src/Processors/Transforms/WindowTransform.h b/src/Processors/Transforms/WindowTransform.h index 5bedfa1fb29a..dedc8c9941cc 100644 --- a/src/Processors/Transforms/WindowTransform.h +++ b/src/Processors/Transforms/WindowTransform.h @@ -207,6 +207,10 @@ class WindowTransform final : public IProcessor void retreatRowNumber(RowNumber & x) const { +#ifndef NDEBUG + auto original_x = x; +#endif + if (x.row > 0) { --x.row; @@ -220,9 +224,9 @@ class WindowTransform final : public IProcessor x.row = blockAt(x).rows - 1; #ifndef NDEBUG - auto xx = x; - advanceRowNumber(xx); - assert(xx == x); + auto advanced_retreated_x = x; + advanceRowNumber(advanced_retreated_x); + assert(advanced_retreated_x == original_x); #endif }