Skip to content

Commit

Permalink
[fix](followup) Stop tasks if waken by downstream tasks (apache#41628)
Browse files Browse the repository at this point in the history
Follow-up : apache#41292
  • Loading branch information
Gabriel39 committed Oct 16, 2024
1 parent 00baec5 commit dd9d7ee
Show file tree
Hide file tree
Showing 10 changed files with 53 additions and 33 deletions.
4 changes: 0 additions & 4 deletions be/src/exprs/runtime_filter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1371,10 +1371,6 @@ std::string IRuntimeFilter::formatted_state() const {
_wrapper->_context->ignored);
}

BloomFilterFuncBase* IRuntimeFilter::get_bloomfilter() const {
return _wrapper->get_bloomfilter();
}

Status IRuntimeFilter::init_with_desc(const TRuntimeFilterDesc* desc, const TQueryOptions* options,
int node_id, bool build_bf_exactly) {
// if node_id == -1 , it shouldn't be a consumer
Expand Down
7 changes: 0 additions & 7 deletions be/src/exprs/runtime_filter.h
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,6 @@ class IRuntimeFilter {
_is_broadcast_join(true),
_has_remote_target(false),
_has_local_target(false),
_rf_state(RuntimeFilterState::NOT_READY),
_rf_state_atomic(RuntimeFilterState::NOT_READY),
_role(RuntimeFilterRole::PRODUCER),
_expr_order(-1),
Expand Down Expand Up @@ -278,8 +277,6 @@ class IRuntimeFilter {
Status init_with_desc(const TRuntimeFilterDesc* desc, const TQueryOptions* options,
int node_id = -1, bool build_bf_exactly = false);

BloomFilterFuncBase* get_bloomfilter() const;

// serialize _wrapper to protobuf
Status serialize(PMergeFilterRequest* request, void** data, int* len);
Status serialize(PPublishFilterRequest* request, void** data = nullptr, int* len = nullptr);
Expand Down Expand Up @@ -380,9 +377,6 @@ class IRuntimeFilter {
void to_protobuf(PInFilter* filter);
void to_protobuf(PMinMaxFilter* filter);

template <class T>
Status _update_filter(const T* param);

template <class T>
Status serialize_impl(T* request, void** data, int* len);

Expand Down Expand Up @@ -421,7 +415,6 @@ class IRuntimeFilter {
// will apply to local node
bool _has_local_target;
// filter is ready for consumer
RuntimeFilterState _rf_state;
std::atomic<RuntimeFilterState> _rf_state_atomic;
// role consumer or producer
RuntimeFilterRole _role;
Expand Down
4 changes: 3 additions & 1 deletion be/src/pipeline/exec/hashjoin_build_sink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -586,6 +586,7 @@ Status HashJoinBuildSinkOperatorX::sink(RuntimeState* state, vectorized::Block*
local_state.process_build_block(state, (*local_state._shared_state->build_block)));
if (_shared_hashtable_controller) {
_shared_hash_table_context->status = Status::OK();
_shared_hash_table_context->complete_build_stage = true;
// arena will be shared with other instances.
_shared_hash_table_context->arena = local_state._shared_state->arena;
_shared_hash_table_context->hash_table_variants =
Expand All @@ -598,7 +599,8 @@ Status HashJoinBuildSinkOperatorX::sink(RuntimeState* state, vectorized::Block*
local_state._runtime_filter_slots->copy_to_shared_context(_shared_hash_table_context);
_shared_hashtable_controller->signal(node_id());
}
} else if (!local_state._should_build_hash_table) {
} else if (!local_state._should_build_hash_table &&
_shared_hash_table_context->complete_build_stage) {
DCHECK(_shared_hashtable_controller != nullptr);
DCHECK(_shared_hash_table_context != nullptr);
// the instance which is not build hash table, it's should wait the signal of hash table build finished.
Expand Down
15 changes: 10 additions & 5 deletions be/src/pipeline/exec/multi_cast_data_stream_sink.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,15 +62,15 @@ class MultiCastDataStreamSinkOperatorX final
using Base = DataSinkOperatorX<MultiCastDataStreamSinkLocalState>;

public:
MultiCastDataStreamSinkOperatorX(int sink_id, std::vector<int>& sources,
const int cast_sender_count, ObjectPool* pool,
MultiCastDataStreamSinkOperatorX(int sink_id, std::vector<int>& sources, ObjectPool* pool,
const TMultiCastDataStreamSink& sink,
const RowDescriptor& row_desc)
: Base(sink_id, -1, sources),
_pool(pool),
_row_desc(row_desc),
_cast_sender_count(cast_sender_count),
_sink(sink) {}
_cast_sender_count(sources.size()),
_sink(sink),
_num_dests(sources.size()) {}
~MultiCastDataStreamSinkOperatorX() override = default;

Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos) override {
Expand Down Expand Up @@ -103,14 +103,19 @@ class MultiCastDataStreamSinkOperatorX final
}

const TMultiCastDataStreamSink& sink_node() { return _sink; }
bool count_down_destination() override {
DCHECK_GT(_num_dests, 0);
return _num_dests.fetch_sub(1) == 1;
}

private:
friend class MultiCastDataStreamSinkLocalState;
ObjectPool* _pool;
RowDescriptor _row_desc;
const int _cast_sender_count;
const size_t _cast_sender_count;
const TMultiCastDataStreamSink& _sink;
friend class MultiCastDataStreamSinkLocalState;
std::atomic<size_t> _num_dests;
};

} // namespace doris::pipeline
8 changes: 5 additions & 3 deletions be/src/pipeline/pipeline.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -101,9 +101,11 @@ Status Pipeline::set_sink(DataSinkOperatorXPtr& sink) {
}

void Pipeline::make_all_runnable() {
for (auto* task : _tasks) {
if (task) {
task->clear_blocking_state(true);
if (_sink->count_down_destination()) {
for (auto* task : _tasks) {
if (task) {
task->clear_blocking_state(true);
}
}
}
}
Expand Down
2 changes: 2 additions & 0 deletions be/src/pipeline/pipeline_x/operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -684,6 +684,8 @@ class DataSinkOperatorXBase : public OperatorBase {

virtual bool should_dry_run(RuntimeState* state) { return false; }

[[nodiscard]] virtual bool count_down_destination() { return true; }

protected:
template <typename Writer, typename Parent>
requires(std::is_base_of_v<vectorized::AsyncResultWriter, Writer>)
Expand Down
3 changes: 1 addition & 2 deletions be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -467,8 +467,7 @@ Status PipelineXFragmentContext::_create_data_sink(ObjectPool* pool, const TData
}

_sink.reset(new MultiCastDataStreamSinkOperatorX(
sink_id, sources, thrift_sink.multi_cast_stream_sink.sinks.size(), pool,
thrift_sink.multi_cast_stream_sink, row_desc));
sink_id, sources, pool, thrift_sink.multi_cast_stream_sink, row_desc));
for (int i = 0; i < sender_size; ++i) {
auto new_pipeline = add_pipeline();
RowDescriptor* _row_desc = nullptr;
Expand Down
37 changes: 31 additions & 6 deletions be/src/pipeline/pipeline_x/pipeline_x_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -255,10 +255,18 @@ Status PipelineXTask::execute(bool* eos) {
set_state(PipelineTaskState::BLOCKED_FOR_DEPENDENCY);
return Status::OK();
}
if (_wake_up_by_downstream) {
*eos = true;
return Status::OK();
}
if (_runtime_filter_blocked_dependency() != nullptr) {
set_state(PipelineTaskState::BLOCKED_FOR_RF);
return Status::OK();
}
if (_wake_up_by_downstream) {
*eos = true;
return Status::OK();
}
// The status must be runnable
if (!_opened) {
{
Expand All @@ -269,10 +277,18 @@ Status PipelineXTask::execute(bool* eos) {
set_state(PipelineTaskState::BLOCKED_FOR_SOURCE);
return Status::OK();
}
if (_wake_up_by_downstream) {
*eos = true;
return Status::OK();
}
if (!sink_can_write()) {
set_state(PipelineTaskState::BLOCKED_FOR_SINK);
return Status::OK();
}
if (_wake_up_by_downstream) {
*eos = true;
return Status::OK();
}
}

Status status = Status::OK();
Expand All @@ -282,10 +298,18 @@ Status PipelineXTask::execute(bool* eos) {
set_state(PipelineTaskState::BLOCKED_FOR_SOURCE);
break;
}
if (_wake_up_by_downstream) {
*eos = true;
return Status::OK();
}
if (!sink_can_write()) {
set_state(PipelineTaskState::BLOCKED_FOR_SINK);
break;
}
if (_wake_up_by_downstream) {
*eos = true;
return Status::OK();
}

/// When a task is cancelled,
/// its blocking state will be cleared and it will transition to a ready state (though it is not truly ready).
Expand Down Expand Up @@ -444,12 +468,13 @@ std::string PipelineXTask::debug_string() {
// If at the same time FE cancel this pipeline task and logging debug_string before _blocked_dep is cleared,
// it will think _blocked_dep is not nullptr and call _blocked_dep->debug_string().
auto* cur_blocked_dep = _blocked_dep;
fmt::format_to(debug_string_buffer,
"PipelineTask[this = {}, state = {}, dry run = {}, elapse time "
"= {}s], block dependency = {}, is running = {}\noperators: ",
(void*)this, get_state_name(_cur_state), _dry_run, elapsed,
cur_blocked_dep && !_finished ? cur_blocked_dep->debug_string() : "NULL",
is_running());
fmt::format_to(
debug_string_buffer,
"PipelineTask[this = {}, state = {}, dry run = {}, elapse time "
"= {}s], _wake_up_by_downstream = {}, block dependency = {}, is running = "
"{}\noperators: ",
(void*)this, get_state_name(_cur_state), _dry_run, elapsed, _wake_up_by_downstream,
cur_blocked_dep && !_finished ? cur_blocked_dep->debug_string() : "NULL", is_running());
for (size_t i = 0; i < _operators.size(); i++) {
fmt::format_to(debug_string_buffer, "\n{}",
_opened && !_finished ? _operators[i]->debug_string(_state, i)
Expand Down
1 change: 1 addition & 0 deletions be/src/vec/runtime/shared_hash_table_controller.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ struct SharedHashTableContext {
std::map<int, RuntimeFilterContextSPtr> runtime_filters;
std::atomic<bool> signaled = false;
bool short_circuit_for_null_in_probe_side = false;
std::atomic<bool> complete_build_stage = false;
};

using SharedHashTableContextPtr = std::shared_ptr<SharedHashTableContext>;
Expand Down
5 changes: 0 additions & 5 deletions be/test/exprs/runtime_filter_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -103,11 +103,6 @@ IRuntimeFilter* create_runtime_filter(TRuntimeFilterType::type type, TQueryOptio

EXPECT_TRUE(status.ok()) << status.to_string();

if (auto bf = runtime_filter->get_bloomfilter()) {
status = bf->init_with_fixed_length();
EXPECT_TRUE(status.ok()) << status.to_string();
}

return status.ok() ? runtime_filter : nullptr;
}

Expand Down

0 comments on commit dd9d7ee

Please sign in to comment.