Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix](followup) Stop tasks if waken by downstream tasks #41628

Merged
merged 1 commit into from
Oct 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 0 additions & 4 deletions be/src/exprs/runtime_filter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1307,10 +1307,6 @@ std::string IRuntimeFilter::formatted_state() const {
_wrapper->_context->ignored);
}

BloomFilterFuncBase* IRuntimeFilter::get_bloomfilter() const {
return _wrapper->get_bloomfilter();
}

Status IRuntimeFilter::init_with_desc(const TRuntimeFilterDesc* desc, const TQueryOptions* options,
int node_id, bool build_bf_exactly) {
// if node_id == -1 , it shouldn't be a consumer
Expand Down
7 changes: 0 additions & 7 deletions be/src/exprs/runtime_filter.h
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,6 @@ class IRuntimeFilter {
_is_broadcast_join(true),
_has_remote_target(false),
_has_local_target(false),
_rf_state(RuntimeFilterState::NOT_READY),
_rf_state_atomic(RuntimeFilterState::NOT_READY),
_role(RuntimeFilterRole::PRODUCER),
_expr_order(-1),
Expand Down Expand Up @@ -264,8 +263,6 @@ class IRuntimeFilter {
Status init_with_desc(const TRuntimeFilterDesc* desc, const TQueryOptions* options,
int node_id = -1, bool build_bf_exactly = false);

BloomFilterFuncBase* get_bloomfilter() const;

// serialize _wrapper to protobuf
Status serialize(PMergeFilterRequest* request, void** data, int* len);
Status serialize(PPublishFilterRequest* request, void** data = nullptr, int* len = nullptr);
Expand Down Expand Up @@ -366,9 +363,6 @@ class IRuntimeFilter {
void to_protobuf(PInFilter* filter);
void to_protobuf(PMinMaxFilter* filter);

template <class T>
Status _update_filter(const T* param);

template <class T>
Status serialize_impl(T* request, void** data, int* len);

Expand Down Expand Up @@ -398,7 +392,6 @@ class IRuntimeFilter {
// will apply to local node
bool _has_local_target;
// filter is ready for consumer
RuntimeFilterState _rf_state;
std::atomic<RuntimeFilterState> _rf_state_atomic;
// role consumer or producer
RuntimeFilterRole _role;
Expand Down
1 change: 0 additions & 1 deletion be/src/pipeline/common/runtime_filter_consumer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,6 @@ void RuntimeFilterConsumer::init_runtime_filter_dependency(
auto runtime_filter = _runtime_filter_ctxs[i].runtime_filter;
runtime_filter_dependencies[i] = std::make_shared<pipeline::RuntimeFilterDependency>(
id, node_id, name, runtime_filter.get());
_runtime_filter_ctxs[i].runtime_filter_dependency = runtime_filter_dependencies[i].get();
runtime_filter_timers[i] = std::make_shared<pipeline::RuntimeFilterTimer>(
runtime_filter->registration_time(), runtime_filter->wait_time_ms(),
runtime_filter_dependencies[i]);
Expand Down
1 change: 0 additions & 1 deletion be/src/pipeline/common/runtime_filter_consumer.h
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@ class RuntimeFilterConsumer {
// set to true if this runtime filter is already applied to vconjunct_ctx_ptr
bool apply_mark = false;
std::shared_ptr<IRuntimeFilter> runtime_filter;
pipeline::RuntimeFilterDependency* runtime_filter_dependency = nullptr;
};

std::vector<RuntimeFilterContext> _runtime_filter_ctxs;
Expand Down
4 changes: 3 additions & 1 deletion be/src/pipeline/exec/hashjoin_build_sink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -590,6 +590,7 @@ Status HashJoinBuildSinkOperatorX::sink(RuntimeState* state, vectorized::Block*
local_state.process_build_block(state, (*local_state._shared_state->build_block)));
if (_shared_hashtable_controller) {
_shared_hash_table_context->status = Status::OK();
_shared_hash_table_context->complete_build_stage = true;
// arena will be shared with other instances.
_shared_hash_table_context->arena = local_state._shared_state->arena;
_shared_hash_table_context->hash_table_variants =
Expand All @@ -601,7 +602,8 @@ Status HashJoinBuildSinkOperatorX::sink(RuntimeState* state, vectorized::Block*
local_state._shared_state->build_indexes_null;
local_state._runtime_filter_slots->copy_to_shared_context(_shared_hash_table_context);
}
} else if (!local_state._should_build_hash_table) {
} else if (!local_state._should_build_hash_table &&
_shared_hash_table_context->complete_build_stage) {
DCHECK(_shared_hashtable_controller != nullptr);
DCHECK(_shared_hash_table_context != nullptr);
// the instance which is not build hash table, it's should wait the signal of hash table build finished.
Expand Down
15 changes: 10 additions & 5 deletions be/src/pipeline/exec/multi_cast_data_stream_sink.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,15 +42,15 @@ class MultiCastDataStreamSinkOperatorX final
using Base = DataSinkOperatorX<MultiCastDataStreamSinkLocalState>;

public:
MultiCastDataStreamSinkOperatorX(int sink_id, std::vector<int>& sources,
const int cast_sender_count, ObjectPool* pool,
MultiCastDataStreamSinkOperatorX(int sink_id, std::vector<int>& sources, ObjectPool* pool,
const TMultiCastDataStreamSink& sink,
const RowDescriptor& row_desc)
: Base(sink_id, -1, sources),
_pool(pool),
_row_desc(row_desc),
_cast_sender_count(cast_sender_count),
_sink(sink) {}
_cast_sender_count(sources.size()),
_sink(sink),
_num_dests(sources.size()) {}
~MultiCastDataStreamSinkOperatorX() override = default;

Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos) override;
Expand All @@ -60,14 +60,19 @@ class MultiCastDataStreamSinkOperatorX final
std::shared_ptr<BasicSharedState> create_shared_state() const override;

const TMultiCastDataStreamSink& sink_node() { return _sink; }
bool count_down_destination() override {
DCHECK_GT(_num_dests, 0);
return _num_dests.fetch_sub(1) == 1;
}

private:
friend class MultiCastDataStreamSinkLocalState;
ObjectPool* _pool;
RowDescriptor _row_desc;
const int _cast_sender_count;
const size_t _cast_sender_count;
const TMultiCastDataStreamSink& _sink;
friend class MultiCastDataStreamSinkLocalState;
std::atomic<size_t> _num_dests;
};

} // namespace doris::pipeline
2 changes: 2 additions & 0 deletions be/src/pipeline/exec/operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -515,6 +515,8 @@ class DataSinkOperatorXBase : public OperatorBase {

virtual bool should_dry_run(RuntimeState* state) { return false; }

[[nodiscard]] virtual bool count_down_destination() { return true; }

protected:
template <typename Writer, typename Parent>
requires(std::is_base_of_v<vectorized::AsyncResultWriter, Writer>)
Expand Down
8 changes: 5 additions & 3 deletions be/src/pipeline/pipeline.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,11 @@ Status Pipeline::set_sink(DataSinkOperatorPtr& sink) {
}

void Pipeline::make_all_runnable() {
for (auto* task : _tasks) {
if (task) {
task->clear_blocking_state(true);
if (_sink->count_down_destination()) {
for (auto* task : _tasks) {
if (task) {
task->clear_blocking_state(true);
}
}
}
}
Expand Down
3 changes: 1 addition & 2 deletions be/src/pipeline/pipeline_fragment_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1142,8 +1142,7 @@ Status PipelineFragmentContext::_create_data_sink(ObjectPool* pool, const TDataS
}

_sink.reset(new MultiCastDataStreamSinkOperatorX(
sink_id, sources, cast_set<int>(thrift_sink.multi_cast_stream_sink.sinks.size()),
pool, thrift_sink.multi_cast_stream_sink, row_desc));
sink_id, sources, pool, thrift_sink.multi_cast_stream_sink, row_desc));
for (int i = 0; i < sender_size; ++i) {
auto new_pipeline = add_pipeline();
RowDescriptor* _row_desc = nullptr;
Expand Down
23 changes: 17 additions & 6 deletions be/src/pipeline/pipeline_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -227,14 +227,14 @@ bool PipelineTask::_wait_to_start() {
_blocked_dep = _execution_dep->is_blocked_by(this);
if (_blocked_dep != nullptr) {
static_cast<Dependency*>(_blocked_dep)->start_watcher();
return true;
return !_wake_up_by_downstream;
}

for (auto* op_dep : _filter_dependencies) {
_blocked_dep = op_dep->is_blocked_by(this);
if (_blocked_dep != nullptr) {
_blocked_dep->start_watcher();
return true;
return !_wake_up_by_downstream;
}
}
return false;
Expand All @@ -249,7 +249,7 @@ bool PipelineTask::_is_blocked() {
_blocked_dep = dep->is_blocked_by(this);
if (_blocked_dep != nullptr) {
_blocked_dep->start_watcher();
return true;
return !_wake_up_by_downstream;
}
}
// If all dependencies are ready for this operator, we can execute this task if no datum is needed from upstream operators.
Expand All @@ -268,7 +268,7 @@ bool PipelineTask::_is_blocked() {
_blocked_dep = op_dep->is_blocked_by(this);
if (_blocked_dep != nullptr) {
_blocked_dep->start_watcher();
return true;
return !_wake_up_by_downstream;
}
}
return false;
Expand Down Expand Up @@ -306,6 +306,11 @@ Status PipelineTask::execute(bool* eos) {
if (_wait_to_start()) {
return Status::OK();
}
if (_wake_up_by_downstream) {
_eos = true;
*eos = true;
return Status::OK();
}
// The status must be runnable
if (!_opened && !_fragment_context->is_canceled()) {
RETURN_IF_ERROR(_open());
Expand All @@ -315,6 +320,11 @@ Status PipelineTask::execute(bool* eos) {
if (_is_blocked()) {
return Status::OK();
}
if (_wake_up_by_downstream) {
_eos = true;
*eos = true;
return Status::OK();
}

/// When a task is cancelled,
/// its blocking state will be cleared and it will transition to a ready state (though it is not truly ready).
Expand Down Expand Up @@ -481,9 +491,10 @@ std::string PipelineTask::debug_string() {
auto elapsed = _fragment_context->elapsed_time() / 1000000000.0;
fmt::format_to(debug_string_buffer,
"PipelineTask[this = {}, id = {}, open = {}, eos = {}, finish = {}, dry run = "
"{}, elapse time "
"= {}s], block dependency = {}, is running = {}\noperators: ",
"{}, elapse time = {}s, _wake_up_by_downstream = {}], block dependency = {}, is "
"running = {}\noperators: ",
(void*)this, _index, _opened, _eos, _finalized, _dry_run, elapsed,
_wake_up_by_downstream.load(),
cur_blocked_dep && !_finalized ? cur_blocked_dep->debug_string() : "NULL",
is_running());
for (size_t i = 0; i < _operators.size(); i++) {
Expand Down
2 changes: 1 addition & 1 deletion be/src/pipeline/pipeline_task.h
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ class PipelineTask {
_blocked_dep = fin_dep->is_blocked_by(this);
if (_blocked_dep != nullptr) {
_blocked_dep->start_watcher();
return true;
return !_wake_up_by_downstream;
}
}
return false;
Expand Down
1 change: 1 addition & 0 deletions be/src/vec/runtime/shared_hash_table_controller.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ struct SharedHashTableContext {
std::map<int, RuntimeFilterContextSPtr> runtime_filters;
std::atomic<bool> signaled = false;
bool short_circuit_for_null_in_probe_side = false;
std::atomic<bool> complete_build_stage = false;
};

using SharedHashTableContextPtr = std::shared_ptr<SharedHashTableContext>;
Expand Down
5 changes: 0 additions & 5 deletions be/test/exprs/runtime_filter_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -105,11 +105,6 @@ std::shared_ptr<IRuntimeFilter> create_runtime_filter(TRuntimeFilterType::type t

EXPECT_TRUE(status.ok()) << status.to_string();

if (auto bf = runtime_filter->get_bloomfilter()) {
status = bf->init_with_fixed_length();
EXPECT_TRUE(status.ok()) << status.to_string();
}

return status.ok() ? runtime_filter : nullptr;
}

Expand Down
Loading