Skip to content

Commit

Permalink
[fix](followup) Stop tasks if waken by downstream tasks (#41628)
Browse files Browse the repository at this point in the history
Follow-up : #41292
  • Loading branch information
Gabriel39 authored Oct 11, 2024
1 parent 334b473 commit 1e72755
Show file tree
Hide file tree
Showing 13 changed files with 40 additions and 36 deletions.
4 changes: 0 additions & 4 deletions be/src/exprs/runtime_filter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1312,10 +1312,6 @@ std::string IRuntimeFilter::formatted_state() const {
_wrapper->_context->ignored);
}

BloomFilterFuncBase* IRuntimeFilter::get_bloomfilter() const {
return _wrapper->get_bloomfilter();
}

Status IRuntimeFilter::init_with_desc(const TRuntimeFilterDesc* desc, const TQueryOptions* options,
int node_id, bool build_bf_exactly) {
// if node_id == -1 , it shouldn't be a consumer
Expand Down
7 changes: 0 additions & 7 deletions be/src/exprs/runtime_filter.h
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,6 @@ class IRuntimeFilter {
_is_broadcast_join(true),
_has_remote_target(false),
_has_local_target(false),
_rf_state(RuntimeFilterState::NOT_READY),
_rf_state_atomic(RuntimeFilterState::NOT_READY),
_role(RuntimeFilterRole::PRODUCER),
_expr_order(-1),
Expand Down Expand Up @@ -264,8 +263,6 @@ class IRuntimeFilter {
Status init_with_desc(const TRuntimeFilterDesc* desc, const TQueryOptions* options,
int node_id = -1, bool build_bf_exactly = false);

BloomFilterFuncBase* get_bloomfilter() const;

// serialize _wrapper to protobuf
Status serialize(PMergeFilterRequest* request, void** data, int* len);
Status serialize(PPublishFilterRequest* request, void** data = nullptr, int* len = nullptr);
Expand Down Expand Up @@ -366,9 +363,6 @@ class IRuntimeFilter {
void to_protobuf(PInFilter* filter);
void to_protobuf(PMinMaxFilter* filter);

template <class T>
Status _update_filter(const T* param);

template <class T>
Status serialize_impl(T* request, void** data, int* len);

Expand Down Expand Up @@ -398,7 +392,6 @@ class IRuntimeFilter {
// will apply to local node
bool _has_local_target;
// filter is ready for consumer
RuntimeFilterState _rf_state;
std::atomic<RuntimeFilterState> _rf_state_atomic;
// role consumer or producer
RuntimeFilterRole _role;
Expand Down
1 change: 0 additions & 1 deletion be/src/pipeline/common/runtime_filter_consumer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,6 @@ void RuntimeFilterConsumer::init_runtime_filter_dependency(
auto runtime_filter = _runtime_filter_ctxs[i].runtime_filter;
runtime_filter_dependencies[i] = std::make_shared<pipeline::RuntimeFilterDependency>(
id, node_id, name, runtime_filter.get());
_runtime_filter_ctxs[i].runtime_filter_dependency = runtime_filter_dependencies[i].get();
runtime_filter_timers[i] = std::make_shared<pipeline::RuntimeFilterTimer>(
runtime_filter->registration_time(), runtime_filter->wait_time_ms(),
runtime_filter_dependencies[i]);
Expand Down
1 change: 0 additions & 1 deletion be/src/pipeline/common/runtime_filter_consumer.h
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@ class RuntimeFilterConsumer {
// set to true if this runtime filter is already applied to vconjunct_ctx_ptr
bool apply_mark = false;
std::shared_ptr<IRuntimeFilter> runtime_filter;
pipeline::RuntimeFilterDependency* runtime_filter_dependency = nullptr;
};

std::vector<RuntimeFilterContext> _runtime_filter_ctxs;
Expand Down
4 changes: 3 additions & 1 deletion be/src/pipeline/exec/hashjoin_build_sink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -595,6 +595,7 @@ Status HashJoinBuildSinkOperatorX::sink(RuntimeState* state, vectorized::Block*
local_state.process_build_block(state, (*local_state._shared_state->build_block)));
if (_shared_hashtable_controller) {
_shared_hash_table_context->status = Status::OK();
_shared_hash_table_context->complete_build_stage = true;
// arena will be shared with other instances.
_shared_hash_table_context->arena = local_state._shared_state->arena;
_shared_hash_table_context->hash_table_variants =
Expand All @@ -606,7 +607,8 @@ Status HashJoinBuildSinkOperatorX::sink(RuntimeState* state, vectorized::Block*
local_state._shared_state->build_indexes_null;
local_state._runtime_filter_slots->copy_to_shared_context(_shared_hash_table_context);
}
} else if (!local_state._should_build_hash_table) {
} else if (!local_state._should_build_hash_table &&
_shared_hash_table_context->complete_build_stage) {
DCHECK(_shared_hashtable_controller != nullptr);
DCHECK(_shared_hash_table_context != nullptr);
// the instance which is not build hash table, it's should wait the signal of hash table build finished.
Expand Down
15 changes: 10 additions & 5 deletions be/src/pipeline/exec/multi_cast_data_stream_sink.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,15 +42,15 @@ class MultiCastDataStreamSinkOperatorX final
using Base = DataSinkOperatorX<MultiCastDataStreamSinkLocalState>;

public:
MultiCastDataStreamSinkOperatorX(int sink_id, std::vector<int>& sources,
const int cast_sender_count, ObjectPool* pool,
MultiCastDataStreamSinkOperatorX(int sink_id, std::vector<int>& sources, ObjectPool* pool,
const TMultiCastDataStreamSink& sink,
const RowDescriptor& row_desc)
: Base(sink_id, -1, sources),
_pool(pool),
_row_desc(row_desc),
_cast_sender_count(cast_sender_count),
_sink(sink) {}
_cast_sender_count(sources.size()),
_sink(sink),
_num_dests(sources.size()) {}
~MultiCastDataStreamSinkOperatorX() override = default;

Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos) override;
Expand All @@ -60,14 +60,19 @@ class MultiCastDataStreamSinkOperatorX final
std::shared_ptr<BasicSharedState> create_shared_state() const override;

const TMultiCastDataStreamSink& sink_node() { return _sink; }
bool count_down_destination() override {
DCHECK_GT(_num_dests, 0);
return _num_dests.fetch_sub(1) == 1;
}

private:
friend class MultiCastDataStreamSinkLocalState;
ObjectPool* _pool;
RowDescriptor _row_desc;
const int _cast_sender_count;
const size_t _cast_sender_count;
const TMultiCastDataStreamSink& _sink;
friend class MultiCastDataStreamSinkLocalState;
std::atomic<size_t> _num_dests;
};

} // namespace doris::pipeline
2 changes: 2 additions & 0 deletions be/src/pipeline/exec/operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -512,6 +512,8 @@ class DataSinkOperatorXBase : public OperatorBase {

virtual bool should_dry_run(RuntimeState* state) { return false; }

[[nodiscard]] virtual bool count_down_destination() { return true; }

protected:
template <typename Writer, typename Parent>
requires(std::is_base_of_v<vectorized::AsyncResultWriter, Writer>)
Expand Down
8 changes: 5 additions & 3 deletions be/src/pipeline/pipeline.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,11 @@ Status Pipeline::set_sink(DataSinkOperatorPtr& sink) {
}

void Pipeline::make_all_runnable() {
for (auto* task : _tasks) {
if (task) {
task->clear_blocking_state(true);
if (_sink->count_down_destination()) {
for (auto* task : _tasks) {
if (task) {
task->clear_blocking_state(true);
}
}
}
}
Expand Down
3 changes: 1 addition & 2 deletions be/src/pipeline/pipeline_fragment_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1142,8 +1142,7 @@ Status PipelineFragmentContext::_create_data_sink(ObjectPool* pool, const TDataS
}

_sink.reset(new MultiCastDataStreamSinkOperatorX(
sink_id, sources, cast_set<int>(thrift_sink.multi_cast_stream_sink.sinks.size()),
pool, thrift_sink.multi_cast_stream_sink, row_desc));
sink_id, sources, pool, thrift_sink.multi_cast_stream_sink, row_desc));
for (int i = 0; i < sender_size; ++i) {
auto new_pipeline = add_pipeline();
RowDescriptor* _row_desc = nullptr;
Expand Down
23 changes: 17 additions & 6 deletions be/src/pipeline/pipeline_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -227,14 +227,14 @@ bool PipelineTask::_wait_to_start() {
_blocked_dep = _execution_dep->is_blocked_by(this);
if (_blocked_dep != nullptr) {
static_cast<Dependency*>(_blocked_dep)->start_watcher();
return true;
return !_wake_up_by_downstream;
}

for (auto* op_dep : _filter_dependencies) {
_blocked_dep = op_dep->is_blocked_by(this);
if (_blocked_dep != nullptr) {
_blocked_dep->start_watcher();
return true;
return !_wake_up_by_downstream;
}
}
return false;
Expand All @@ -249,7 +249,7 @@ bool PipelineTask::_is_blocked() {
_blocked_dep = dep->is_blocked_by(this);
if (_blocked_dep != nullptr) {
_blocked_dep->start_watcher();
return true;
return !_wake_up_by_downstream;
}
}
// If all dependencies are ready for this operator, we can execute this task if no datum is needed from upstream operators.
Expand All @@ -268,7 +268,7 @@ bool PipelineTask::_is_blocked() {
_blocked_dep = op_dep->is_blocked_by(this);
if (_blocked_dep != nullptr) {
_blocked_dep->start_watcher();
return true;
return !_wake_up_by_downstream;
}
}
return false;
Expand Down Expand Up @@ -306,6 +306,11 @@ Status PipelineTask::execute(bool* eos) {
if (_wait_to_start()) {
return Status::OK();
}
if (_wake_up_by_downstream) {
_eos = true;
*eos = true;
return Status::OK();
}
// The status must be runnable
if (!_opened && !_fragment_context->is_canceled()) {
RETURN_IF_ERROR(_open());
Expand All @@ -315,6 +320,11 @@ Status PipelineTask::execute(bool* eos) {
if (_is_blocked()) {
return Status::OK();
}
if (_wake_up_by_downstream) {
_eos = true;
*eos = true;
return Status::OK();
}

/// When a task is cancelled,
/// its blocking state will be cleared and it will transition to a ready state (though it is not truly ready).
Expand Down Expand Up @@ -481,9 +491,10 @@ std::string PipelineTask::debug_string() {
auto elapsed = _fragment_context->elapsed_time() / 1000000000.0;
fmt::format_to(debug_string_buffer,
"PipelineTask[this = {}, id = {}, open = {}, eos = {}, finish = {}, dry run = "
"{}, elapse time "
"= {}s], block dependency = {}, is running = {}\noperators: ",
"{}, elapse time = {}s, _wake_up_by_downstream = {}], block dependency = {}, is "
"running = {}\noperators: ",
(void*)this, _index, _opened, _eos, _finalized, _dry_run, elapsed,
_wake_up_by_downstream.load(),
cur_blocked_dep && !_finalized ? cur_blocked_dep->debug_string() : "NULL",
is_running());
for (size_t i = 0; i < _operators.size(); i++) {
Expand Down
2 changes: 1 addition & 1 deletion be/src/pipeline/pipeline_task.h
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ class PipelineTask {
_blocked_dep = fin_dep->is_blocked_by(this);
if (_blocked_dep != nullptr) {
_blocked_dep->start_watcher();
return true;
return !_wake_up_by_downstream;
}
}
return false;
Expand Down
1 change: 1 addition & 0 deletions be/src/vec/runtime/shared_hash_table_controller.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ struct SharedHashTableContext {
std::map<int, RuntimeFilterContextSPtr> runtime_filters;
std::atomic<bool> signaled = false;
bool short_circuit_for_null_in_probe_side = false;
std::atomic<bool> complete_build_stage = false;
};

using SharedHashTableContextPtr = std::shared_ptr<SharedHashTableContext>;
Expand Down
5 changes: 0 additions & 5 deletions be/test/exprs/runtime_filter_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -105,11 +105,6 @@ std::shared_ptr<IRuntimeFilter> create_runtime_filter(TRuntimeFilterType::type t

EXPECT_TRUE(status.ok()) << status.to_string();

if (auto bf = runtime_filter->get_bloomfilter()) {
status = bf->init_with_fixed_length();
EXPECT_TRUE(status.ok()) << status.to_string();
}

return status.ok() ? runtime_filter : nullptr;
}

Expand Down

0 comments on commit 1e72755

Please sign in to comment.