Skip to content

Commit

Permalink
[fix](followup) Stop tasks if waken by downstream tasks
Browse files Browse the repository at this point in the history
  • Loading branch information
Gabriel39 committed Oct 10, 2024
1 parent fc0279b commit 63938c4
Show file tree
Hide file tree
Showing 13 changed files with 48 additions and 37 deletions.
4 changes: 0 additions & 4 deletions be/src/exprs/runtime_filter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1307,10 +1307,6 @@ std::string IRuntimeFilter::formatted_state() const {
_wrapper->_context->ignored);
}

BloomFilterFuncBase* IRuntimeFilter::get_bloomfilter() const {
return _wrapper->get_bloomfilter();
}

Status IRuntimeFilter::init_with_desc(const TRuntimeFilterDesc* desc, const TQueryOptions* options,
int node_id, bool build_bf_exactly) {
// if node_id == -1 , it shouldn't be a consumer
Expand Down
7 changes: 0 additions & 7 deletions be/src/exprs/runtime_filter.h
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,6 @@ class IRuntimeFilter {
_is_broadcast_join(true),
_has_remote_target(false),
_has_local_target(false),
_rf_state(RuntimeFilterState::NOT_READY),
_rf_state_atomic(RuntimeFilterState::NOT_READY),
_role(RuntimeFilterRole::PRODUCER),
_expr_order(-1),
Expand Down Expand Up @@ -264,8 +263,6 @@ class IRuntimeFilter {
Status init_with_desc(const TRuntimeFilterDesc* desc, const TQueryOptions* options,
int node_id = -1, bool build_bf_exactly = false);

BloomFilterFuncBase* get_bloomfilter() const;

// serialize _wrapper to protobuf
Status serialize(PMergeFilterRequest* request, void** data, int* len);
Status serialize(PPublishFilterRequest* request, void** data = nullptr, int* len = nullptr);
Expand Down Expand Up @@ -366,9 +363,6 @@ class IRuntimeFilter {
void to_protobuf(PInFilter* filter);
void to_protobuf(PMinMaxFilter* filter);

template <class T>
Status _update_filter(const T* param);

template <class T>
Status serialize_impl(T* request, void** data, int* len);

Expand Down Expand Up @@ -398,7 +392,6 @@ class IRuntimeFilter {
// will apply to local node
bool _has_local_target;
// filter is ready for consumer
RuntimeFilterState _rf_state;
std::atomic<RuntimeFilterState> _rf_state_atomic;
// role consumer or producer
RuntimeFilterRole _role;
Expand Down
1 change: 0 additions & 1 deletion be/src/pipeline/common/runtime_filter_consumer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,6 @@ void RuntimeFilterConsumer::init_runtime_filter_dependency(
auto runtime_filter = _runtime_filter_ctxs[i].runtime_filter;
runtime_filter_dependencies[i] = std::make_shared<pipeline::RuntimeFilterDependency>(
id, node_id, name, runtime_filter.get());
_runtime_filter_ctxs[i].runtime_filter_dependency = runtime_filter_dependencies[i].get();
runtime_filter_timers[i] = std::make_shared<pipeline::RuntimeFilterTimer>(
runtime_filter->registration_time(), runtime_filter->wait_time_ms(),
runtime_filter_dependencies[i]);
Expand Down
1 change: 0 additions & 1 deletion be/src/pipeline/common/runtime_filter_consumer.h
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@ class RuntimeFilterConsumer {
// set to true if this runtime filter is already applied to vconjunct_ctx_ptr
bool apply_mark = false;
std::shared_ptr<IRuntimeFilter> runtime_filter;
pipeline::RuntimeFilterDependency* runtime_filter_dependency = nullptr;
};

std::vector<RuntimeFilterContext> _runtime_filter_ctxs;
Expand Down
4 changes: 3 additions & 1 deletion be/src/pipeline/exec/hashjoin_build_sink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -590,6 +590,7 @@ Status HashJoinBuildSinkOperatorX::sink(RuntimeState* state, vectorized::Block*
local_state.process_build_block(state, (*local_state._shared_state->build_block)));
if (_shared_hashtable_controller) {
_shared_hash_table_context->status = Status::OK();
_shared_hash_table_context->complete_build_stage = true;
// arena will be shared with other instances.
_shared_hash_table_context->arena = local_state._shared_state->arena;
_shared_hash_table_context->hash_table_variants =
Expand All @@ -601,7 +602,8 @@ Status HashJoinBuildSinkOperatorX::sink(RuntimeState* state, vectorized::Block*
local_state._shared_state->build_indexes_null;
local_state._runtime_filter_slots->copy_to_shared_context(_shared_hash_table_context);
}
} else if (!local_state._should_build_hash_table) {
} else if (!local_state._should_build_hash_table &&
_shared_hash_table_context->complete_build_stage) {
DCHECK(_shared_hashtable_controller != nullptr);
DCHECK(_shared_hash_table_context != nullptr);
// the instance which is not build hash table, it's should wait the signal of hash table build finished.
Expand Down
15 changes: 10 additions & 5 deletions be/src/pipeline/exec/multi_cast_data_stream_sink.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,15 +42,15 @@ class MultiCastDataStreamSinkOperatorX final
using Base = DataSinkOperatorX<MultiCastDataStreamSinkLocalState>;

public:
MultiCastDataStreamSinkOperatorX(int sink_id, std::vector<int>& sources,
const int cast_sender_count, ObjectPool* pool,
MultiCastDataStreamSinkOperatorX(int sink_id, std::vector<int>& sources, ObjectPool* pool,
const TMultiCastDataStreamSink& sink,
const RowDescriptor& row_desc)
: Base(sink_id, -1, sources),
_pool(pool),
_row_desc(row_desc),
_cast_sender_count(cast_sender_count),
_sink(sink) {}
_cast_sender_count(sources.size()),
_sink(sink),
_num_dests(sources.size()) {}
~MultiCastDataStreamSinkOperatorX() override = default;

Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos) override;
Expand All @@ -60,14 +60,19 @@ class MultiCastDataStreamSinkOperatorX final
std::shared_ptr<BasicSharedState> create_shared_state() const override;

const TMultiCastDataStreamSink& sink_node() { return _sink; }
bool count_down_destination() override {
DCHECK_GT(_num_dests, 0);
return _num_dests.fetch_sub(1) == 1;
}

private:
friend class MultiCastDataStreamSinkLocalState;
ObjectPool* _pool;
RowDescriptor _row_desc;
const int _cast_sender_count;
const size_t _cast_sender_count;
const TMultiCastDataStreamSink& _sink;
friend class MultiCastDataStreamSinkLocalState;
std::atomic<size_t> _num_dests;
};

} // namespace doris::pipeline
2 changes: 2 additions & 0 deletions be/src/pipeline/exec/operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -515,6 +515,8 @@ class DataSinkOperatorXBase : public OperatorBase {

virtual bool should_dry_run(RuntimeState* state) { return false; }

[[nodiscard]] virtual bool count_down_destination() { return true; }

protected:
template <typename Writer, typename Parent>
requires(std::is_base_of_v<vectorized::AsyncResultWriter, Writer>)
Expand Down
8 changes: 5 additions & 3 deletions be/src/pipeline/pipeline.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,11 @@ Status Pipeline::set_sink(DataSinkOperatorPtr& sink) {
}

void Pipeline::make_all_runnable() {
for (auto* task : _tasks) {
if (task) {
task->clear_blocking_state(true);
if (_sink->count_down_destination()) {
for (auto* task : _tasks) {
if (task) {
task->clear_blocking_state(true);
}
}
}
}
Expand Down
18 changes: 11 additions & 7 deletions be/src/pipeline/pipeline_fragment_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1142,8 +1142,7 @@ Status PipelineFragmentContext::_create_data_sink(ObjectPool* pool, const TDataS
}

_sink.reset(new MultiCastDataStreamSinkOperatorX(
sink_id, sources, cast_set<int>(thrift_sink.multi_cast_stream_sink.sinks.size()),
pool, thrift_sink.multi_cast_stream_sink, row_desc));
sink_id, sources, pool, thrift_sink.multi_cast_stream_sink, row_desc));
for (int i = 0; i < sender_size; ++i) {
auto new_pipeline = add_pipeline();
RowDescriptor* _row_desc = nullptr;
Expand Down Expand Up @@ -1759,15 +1758,20 @@ void PipelineFragmentContext::_close_fragment_instance() {
std::dynamic_pointer_cast<PipelineFragmentContext>(shared_from_this()));
}

void PipelineFragmentContext::_trigger_tasks_recursively(PipelineId pipeline_id) {
if (_dag.contains(pipeline_id)) {
for (auto dep : _dag[pipeline_id]) {
_pip_id_to_pipeline[dep]->make_all_runnable();
_trigger_tasks_recursively(dep);
}
}
}

void PipelineFragmentContext::close_a_pipeline(PipelineId pipeline_id) {
// If all tasks of this pipeline has been closed, upstream tasks is never needed, and we just make those runnable here
DCHECK(_pip_id_to_pipeline.contains(pipeline_id));
if (_pip_id_to_pipeline[pipeline_id]->close_task()) {
if (_dag.contains(pipeline_id)) {
for (auto dep : _dag[pipeline_id]) {
_pip_id_to_pipeline[dep]->make_all_runnable();
}
}
_trigger_tasks_recursively(pipeline_id);
}
std::lock_guard<std::mutex> l(_task_mutex);
++_closed_tasks;
Expand Down
2 changes: 2 additions & 0 deletions be/src/pipeline/pipeline_fragment_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,8 @@ class PipelineFragmentContext : public TaskExecutionContext {
void _close_fragment_instance();
void _init_next_report_time();

void _trigger_tasks_recursively(PipelineId pipeline_id);

// Id of this query
TUniqueId _query_id;
int _fragment_id;
Expand Down
20 changes: 13 additions & 7 deletions be/src/pipeline/pipeline_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -227,14 +227,14 @@ bool PipelineTask::_wait_to_start() {
_blocked_dep = _execution_dep->is_blocked_by(this);
if (_blocked_dep != nullptr) {
static_cast<Dependency*>(_blocked_dep)->start_watcher();
return true;
return !_wake_up_by_downstream;
}

for (auto* op_dep : _filter_dependencies) {
_blocked_dep = op_dep->is_blocked_by(this);
if (_blocked_dep != nullptr) {
_blocked_dep->start_watcher();
return true;
return !_wake_up_by_downstream;
}
}
return false;
Expand All @@ -249,7 +249,7 @@ bool PipelineTask::_is_blocked() {
_blocked_dep = dep->is_blocked_by(this);
if (_blocked_dep != nullptr) {
_blocked_dep->start_watcher();
return true;
return !_wake_up_by_downstream;
}
}
// If all dependencies are ready for this operator, we can execute this task if no datum is needed from upstream operators.
Expand All @@ -268,7 +268,7 @@ bool PipelineTask::_is_blocked() {
_blocked_dep = op_dep->is_blocked_by(this);
if (_blocked_dep != nullptr) {
_blocked_dep->start_watcher();
return true;
return !_wake_up_by_downstream;
}
}
return false;
Expand All @@ -278,7 +278,7 @@ Status PipelineTask::execute(bool* eos) {
SCOPED_TIMER(_task_profile->total_time_counter());
SCOPED_TIMER(_exec_timer);
SCOPED_ATTACH_TASK(_state);
_eos = _sink->is_finished(_state) || _eos || _wake_up_by_downstream;
_eos = _sink->is_finished(_state) || _eos;
*eos = _eos;
if (_eos) {
// If task is waken up by finish dependency, `_eos` is set to true by last execution, and we should return here.
Expand Down Expand Up @@ -306,6 +306,11 @@ Status PipelineTask::execute(bool* eos) {
if (_wait_to_start()) {
return Status::OK();
}
if (_wake_up_by_downstream) {
_eos = true;
*eos = true;
return Status::OK();
}
// The status must be runnable
if (!_opened && !_fragment_context->is_canceled()) {
RETURN_IF_ERROR(_open());
Expand Down Expand Up @@ -481,9 +486,10 @@ std::string PipelineTask::debug_string() {
auto elapsed = _fragment_context->elapsed_time() / 1000000000.0;
fmt::format_to(debug_string_buffer,
"PipelineTask[this = {}, id = {}, open = {}, eos = {}, finish = {}, dry run = "
"{}, elapse time "
"= {}s], block dependency = {}, is running = {}\noperators: ",
"{}, elapse time = {}s, _wake_up_by_downstream = {}], block dependency = {}, is "
"running = {}\noperators: ",
(void*)this, _index, _opened, _eos, _finalized, _dry_run, elapsed,
_wake_up_by_downstream.load(),
cur_blocked_dep && !_finalized ? cur_blocked_dep->debug_string() : "NULL",
is_running());
for (size_t i = 0; i < _operators.size(); i++) {
Expand Down
2 changes: 1 addition & 1 deletion be/src/pipeline/pipeline_task.h
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ class PipelineTask {
_blocked_dep = fin_dep->is_blocked_by(this);
if (_blocked_dep != nullptr) {
_blocked_dep->start_watcher();
return true;
return !_wake_up_by_downstream;
}
}
return false;
Expand Down
1 change: 1 addition & 0 deletions be/src/vec/runtime/shared_hash_table_controller.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ struct SharedHashTableContext {
std::map<int, RuntimeFilterContextSPtr> runtime_filters;
std::atomic<bool> signaled = false;
bool short_circuit_for_null_in_probe_side = false;
std::atomic<bool> complete_build_stage = false;
};

using SharedHashTableContextPtr = std::shared_ptr<SharedHashTableContext>;
Expand Down

0 comments on commit 63938c4

Please sign in to comment.