From e91b5d0a4eb45c243193cb66936291ea40ae7cf9 Mon Sep 17 00:00:00 2001 From: SeaRise Date: Thu, 8 Sep 2022 11:29:56 +0800 Subject: [PATCH 1/2] update --- .../DataStreams/WindowBlockInputStream.cpp | 27 ++++++++++++------- dbms/src/DataStreams/WindowBlockInputStream.h | 1 + 2 files changed, 18 insertions(+), 10 deletions(-) diff --git a/dbms/src/DataStreams/WindowBlockInputStream.cpp b/dbms/src/DataStreams/WindowBlockInputStream.cpp index 0058f3853d5..92cce815cbf 100644 --- a/dbms/src/DataStreams/WindowBlockInputStream.cpp +++ b/dbms/src/DataStreams/WindowBlockInputStream.cpp @@ -73,15 +73,28 @@ void WindowBlockInputStream::initialWorkspaces() only_have_pure_window = onlyHaveRowNumberAndRank(); } +std::optional WindowBlockInputStream::returnIfCancelledOrKilled() +{ + if (isCancelledOrThrowIfKilled()) + { + if (!window_blocks.empty()) + window_blocks.erase(window_blocks.begin(), window_blocks.end()); + input_is_finished = true; + return {Block{}}; + } + return {}; +} + Block WindowBlockInputStream::readImpl() { const auto & stream = children.back(); while (!input_is_finished) { + if (auto res = returnIfCancelledOrKilled(); res.has_value()) + return res.value(); + if (Block output_block = tryGetOutputBlock()) - { return output_block; - } Block block = stream->read(); if (!block) @@ -91,6 +104,8 @@ Block WindowBlockInputStream::readImpl() tryCalculate(); } + if (auto res = returnIfCancelledOrKilled(); res.has_value()) + return res.value(); // return last partition block, if already return then return null return tryGetOutputBlock(); } @@ -374,14 +389,6 @@ void WindowBlockInputStream::writeOutCurrentRow() Block WindowBlockInputStream::tryGetOutputBlock() { - if (isCancelledOrThrowIfKilled()) - { - if (!window_blocks.empty()) - window_blocks.erase(window_blocks.begin(), window_blocks.end()); - input_is_finished = true; - return {}; - } - assert(first_not_ready_row.block >= first_block_number); // The first_not_ready_row might be past-the-end if we have already // calculated the window functions for all input rows. That's why the diff --git a/dbms/src/DataStreams/WindowBlockInputStream.h b/dbms/src/DataStreams/WindowBlockInputStream.h index e99f5ab5fe2..8f129318fc5 100644 --- a/dbms/src/DataStreams/WindowBlockInputStream.h +++ b/dbms/src/DataStreams/WindowBlockInputStream.h @@ -166,6 +166,7 @@ class WindowBlockInputStream : public IProfilingBlockInputStream protected: Block readImpl() override; void appendInfo(FmtBuffer & buffer) const override; + std::optional returnIfCancelledOrKilled(); LoggerPtr log; From bddb2ce0696018cc3a4cb4d8a4098bbae089d9fb Mon Sep 17 00:00:00 2001 From: SeaRise Date: Thu, 8 Sep 2022 14:54:36 +0800 Subject: [PATCH 2/2] refine --- dbms/src/DataStreams/WindowBlockInputStream.cpp | 14 +++++++------- dbms/src/DataStreams/WindowBlockInputStream.h | 2 +- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/dbms/src/DataStreams/WindowBlockInputStream.cpp b/dbms/src/DataStreams/WindowBlockInputStream.cpp index 92cce815cbf..fdbc6a67440 100644 --- a/dbms/src/DataStreams/WindowBlockInputStream.cpp +++ b/dbms/src/DataStreams/WindowBlockInputStream.cpp @@ -73,16 +73,16 @@ void WindowBlockInputStream::initialWorkspaces() only_have_pure_window = onlyHaveRowNumberAndRank(); } -std::optional WindowBlockInputStream::returnIfCancelledOrKilled() +bool WindowBlockInputStream::returnIfCancelledOrKilled() { if (isCancelledOrThrowIfKilled()) { if (!window_blocks.empty()) window_blocks.erase(window_blocks.begin(), window_blocks.end()); input_is_finished = true; - return {Block{}}; + return true; } - return {}; + return false; } Block WindowBlockInputStream::readImpl() @@ -90,8 +90,8 @@ Block WindowBlockInputStream::readImpl() const auto & stream = children.back(); while (!input_is_finished) { - if (auto res = returnIfCancelledOrKilled(); res.has_value()) - return res.value(); + if (returnIfCancelledOrKilled()) + return {}; if (Block output_block = tryGetOutputBlock()) return output_block; @@ -104,8 +104,8 @@ Block WindowBlockInputStream::readImpl() tryCalculate(); } - if (auto res = returnIfCancelledOrKilled(); res.has_value()) - return res.value(); + if (returnIfCancelledOrKilled()) + return {}; // return last partition block, if already return then return null return tryGetOutputBlock(); } diff --git a/dbms/src/DataStreams/WindowBlockInputStream.h b/dbms/src/DataStreams/WindowBlockInputStream.h index 8f129318fc5..2a1d7e5e85b 100644 --- a/dbms/src/DataStreams/WindowBlockInputStream.h +++ b/dbms/src/DataStreams/WindowBlockInputStream.h @@ -166,7 +166,7 @@ class WindowBlockInputStream : public IProfilingBlockInputStream protected: Block readImpl() override; void appendInfo(FmtBuffer & buffer) const override; - std::optional returnIfCancelledOrKilled(); + bool returnIfCancelledOrKilled(); LoggerPtr log;