From 30b0b8978d82e4a6794e284b0c0125cb38da31f4 Mon Sep 17 00:00:00 2001 From: Gabriel Date: Sat, 12 Oct 2024 11:16:57 +0800 Subject: [PATCH] [fix](pipeline) Prevent re-scheduling a task at the same time --- be/src/pipeline/pipeline_task.cpp | 20 ++++++++++++++++---- be/src/pipeline/pipeline_task.h | 2 +- 2 files changed, 17 insertions(+), 5 deletions(-) diff --git a/be/src/pipeline/pipeline_task.cpp b/be/src/pipeline/pipeline_task.cpp index 36f2066c9abed7..e06b8028c9c730 100644 --- a/be/src/pipeline/pipeline_task.cpp +++ b/be/src/pipeline/pipeline_task.cpp @@ -227,14 +227,20 @@ bool PipelineTask::_wait_to_start() { _blocked_dep = _execution_dep->is_blocked_by(this); if (_blocked_dep != nullptr) { static_cast(_blocked_dep)->start_watcher(); - return !_wake_up_by_downstream; + if (_wake_up_by_downstream) { + _eos = true; + } + return true; } for (auto* op_dep : _filter_dependencies) { _blocked_dep = op_dep->is_blocked_by(this); if (_blocked_dep != nullptr) { _blocked_dep->start_watcher(); - return !_wake_up_by_downstream; + if (_wake_up_by_downstream) { + _eos = true; + } + return true; } } return false; @@ -249,7 +255,10 @@ bool PipelineTask::_is_blocked() { _blocked_dep = dep->is_blocked_by(this); if (_blocked_dep != nullptr) { _blocked_dep->start_watcher(); - return !_wake_up_by_downstream; + if (_wake_up_by_downstream) { + _eos = true; + } + return true; } } // If all dependencies are ready for this operator, we can execute this task if no datum is needed from upstream operators. @@ -268,7 +277,10 @@ bool PipelineTask::_is_blocked() { _blocked_dep = op_dep->is_blocked_by(this); if (_blocked_dep != nullptr) { _blocked_dep->start_watcher(); - return !_wake_up_by_downstream; + if (_wake_up_by_downstream) { + _eos = true; + } + return true; } } return false; diff --git a/be/src/pipeline/pipeline_task.h b/be/src/pipeline/pipeline_task.h index a73393fb270885..e216ef103766fd 100644 --- a/be/src/pipeline/pipeline_task.h +++ b/be/src/pipeline/pipeline_task.h @@ -90,7 +90,7 @@ class PipelineTask { _blocked_dep = fin_dep->is_blocked_by(this); if (_blocked_dep != nullptr) { _blocked_dep->start_watcher(); - return !_wake_up_by_downstream; + return true; } } return false;