From d24006762c5e3d7b1c78a31be946bbe759cb0f06 Mon Sep 17 00:00:00 2001 From: SeaRise Date: Thu, 8 Sep 2022 11:29:56 +0800 Subject: [PATCH 1/2] update --- .../DataStreams/WindowBlockInputStream.cpp | 27 ++++++++++++------- dbms/src/DataStreams/WindowBlockInputStream.h | 1 + 2 files changed, 18 insertions(+), 10 deletions(-) diff --git a/dbms/src/DataStreams/WindowBlockInputStream.cpp b/dbms/src/DataStreams/WindowBlockInputStream.cpp index 2cc61df8104..dcd8d84f426 100644 --- a/dbms/src/DataStreams/WindowBlockInputStream.cpp +++ b/dbms/src/DataStreams/WindowBlockInputStream.cpp @@ -76,15 +76,28 @@ void WindowBlockInputStream::initialWorkspaces() only_have_pure_window = onlyHaveRowNumberAndRank(); } +std::optional WindowBlockInputStream::returnIfCancelledOrKilled() +{ + if (isCancelledOrThrowIfKilled()) + { + if (!window_blocks.empty()) + window_blocks.erase(window_blocks.begin(), window_blocks.end()); + input_is_finished = true; + return {Block{}}; + } + return {}; +} + Block WindowBlockInputStream::readImpl() { const auto & stream = children.back(); while (!input_is_finished) { + if (auto res = returnIfCancelledOrKilled(); res.has_value()) + return res.value(); + if (Block output_block = tryGetOutputBlock()) - { return output_block; - } Block block = stream->read(); if (!block) @@ -94,6 +107,8 @@ Block WindowBlockInputStream::readImpl() tryCalculate(); } + if (auto res = returnIfCancelledOrKilled(); res.has_value()) + return res.value(); // return last partition block, if already return then return null return tryGetOutputBlock(); } @@ -361,14 +376,6 @@ void WindowBlockInputStream::writeOutCurrentRow() Block WindowBlockInputStream::tryGetOutputBlock() { - if (isCancelledOrThrowIfKilled()) - { - if (!window_blocks.empty()) - window_blocks.erase(window_blocks.begin(), window_blocks.end()); - input_is_finished = true; - return {}; - } - assert(first_not_ready_row.block >= first_block_number); // The first_not_ready_row might be past-the-end if we have already // calculated the window functions for all input rows. That's why the diff --git a/dbms/src/DataStreams/WindowBlockInputStream.h b/dbms/src/DataStreams/WindowBlockInputStream.h index 0ef23aa9f6f..b4e6289236f 100644 --- a/dbms/src/DataStreams/WindowBlockInputStream.h +++ b/dbms/src/DataStreams/WindowBlockInputStream.h @@ -171,6 +171,7 @@ class WindowBlockInputStream : public IProfilingBlockInputStream protected: Block readImpl() override; void appendInfo(FmtBuffer & buffer) const override; + std::optional returnIfCancelledOrKilled(); LoggerPtr log; From e9dcbd3463769aece8cb8d6d7cab7dce6198d3b3 Mon Sep 17 00:00:00 2001 From: SeaRise Date: Thu, 8 Sep 2022 14:54:36 +0800 Subject: [PATCH 2/2] refine --- dbms/src/DataStreams/WindowBlockInputStream.cpp | 14 +++++++------- dbms/src/DataStreams/WindowBlockInputStream.h | 2 +- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/dbms/src/DataStreams/WindowBlockInputStream.cpp b/dbms/src/DataStreams/WindowBlockInputStream.cpp index dcd8d84f426..0094af08b2d 100644 --- a/dbms/src/DataStreams/WindowBlockInputStream.cpp +++ b/dbms/src/DataStreams/WindowBlockInputStream.cpp @@ -76,16 +76,16 @@ void WindowBlockInputStream::initialWorkspaces() only_have_pure_window = onlyHaveRowNumberAndRank(); } -std::optional WindowBlockInputStream::returnIfCancelledOrKilled() +bool WindowBlockInputStream::returnIfCancelledOrKilled() { if (isCancelledOrThrowIfKilled()) { if (!window_blocks.empty()) window_blocks.erase(window_blocks.begin(), window_blocks.end()); input_is_finished = true; - return {Block{}}; + return true; } - return {}; + return false; } Block WindowBlockInputStream::readImpl() @@ -93,8 +93,8 @@ Block WindowBlockInputStream::readImpl() const auto & stream = children.back(); while (!input_is_finished) { - if (auto res = returnIfCancelledOrKilled(); res.has_value()) - return res.value(); + if (returnIfCancelledOrKilled()) + return {}; if (Block output_block = tryGetOutputBlock()) return output_block; @@ -107,8 +107,8 @@ Block WindowBlockInputStream::readImpl() tryCalculate(); } - if (auto res = returnIfCancelledOrKilled(); res.has_value()) - return res.value(); + if (returnIfCancelledOrKilled()) + return {}; // return last partition block, if already return then return null return tryGetOutputBlock(); } diff --git a/dbms/src/DataStreams/WindowBlockInputStream.h b/dbms/src/DataStreams/WindowBlockInputStream.h index b4e6289236f..f2a487880bb 100644 --- a/dbms/src/DataStreams/WindowBlockInputStream.h +++ b/dbms/src/DataStreams/WindowBlockInputStream.h @@ -171,7 +171,7 @@ class WindowBlockInputStream : public IProfilingBlockInputStream protected: Block readImpl() override; void appendInfo(FmtBuffer & buffer) const override; - std::optional returnIfCancelledOrKilled(); + bool returnIfCancelledOrKilled(); LoggerPtr log;