From 5fe22e41afcb656c4cd5bd0f87eb83c5d12d162b Mon Sep 17 00:00:00 2001 From: Pxl Date: Mon, 14 Oct 2024 11:23:35 +0800 Subject: [PATCH] [Bug](runtime-filter) use wake_up_by_downstream to judge early close (#41751) ## Proposed changes Sometimes eos is true, the finish dependency is still not ready. Therefore, we need to use wake_up_by_downstream to determine whether it was closed early. Otherwise, it will enter the normal rf build process in this case and generate related errors. Follow-up : https://github.com/apache/doris/pull/41292 --- be/src/pipeline/exec/hashjoin_build_sink.cpp | 3 ++- be/src/pipeline/pipeline_task.h | 2 ++ 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/be/src/pipeline/exec/hashjoin_build_sink.cpp b/be/src/pipeline/exec/hashjoin_build_sink.cpp index 32b1e33be65108c..5b946326fbc099e 100644 --- a/be/src/pipeline/exec/hashjoin_build_sink.cpp +++ b/be/src/pipeline/exec/hashjoin_build_sink.cpp @@ -22,6 +22,7 @@ #include "exprs/bloom_filter_func.h" #include "pipeline/exec/hashjoin_probe_operator.h" #include "pipeline/exec/operator.h" +#include "pipeline/pipeline_task.h" #include "vec/data_types/data_type_nullable.h" #include "vec/utils/template_helpers.hpp" @@ -139,7 +140,7 @@ Status HashJoinBuildSinkLocalState::close(RuntimeState* state, Status exec_statu return Base::close(state, exec_status); } - if (!_eos) { + if (state->get_task()->wake_up_by_downstream()) { RETURN_IF_ERROR(_runtime_filter_slots->send_filter_size(state, 0, _finish_dependency)); RETURN_IF_ERROR(_runtime_filter_slots->ignore_all_filters()); } else { diff --git a/be/src/pipeline/pipeline_task.h b/be/src/pipeline/pipeline_task.h index e216ef103766fdb..223420ea55aff1f 100644 --- a/be/src/pipeline/pipeline_task.h +++ b/be/src/pipeline/pipeline_task.h @@ -234,6 +234,8 @@ class PipelineTask { PipelineId pipeline_id() const { return _pipeline->id(); } + bool wake_up_by_downstream() const { return _wake_up_by_downstream; } + private: friend class RuntimeFilterDependency; bool _is_blocked();