Skip to content

Commit

Permalink
[fix](spill) Avoid releasing resources while spill tasks are executing (
Browse files Browse the repository at this point in the history
  • Loading branch information
mrhhsg authored and Doris-Extras committed Apr 10, 2024
1 parent f534003 commit 87f9927
Show file tree
Hide file tree
Showing 14 changed files with 185 additions and 60 deletions.
11 changes: 10 additions & 1 deletion be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -244,8 +244,17 @@ Status PartitionedAggSinkLocalState::revoke_memory(RuntimeState* state) {
}
}
}};

auto execution_context = state->get_task_execution_context();
_shared_state_holder = _shared_state->shared_from_this();

status = ExecEnv::GetInstance()->spill_stream_mgr()->get_async_task_thread_pool()->submit_func(
[this, &parent, state] {
[this, &parent, state, execution_context] {
auto execution_context_lock = execution_context.lock();
if (!execution_context_lock) {
LOG(INFO) << "execution_context released, maybe query was cancelled.";
return Status::Cancelled("Cancelled");
}
SCOPED_ATTACH_TASK(state);
SCOPED_TIMER(Base::_spill_timer);
Defer defer {[&]() {
Expand Down
5 changes: 5 additions & 0 deletions be/src/pipeline/exec/partitioned_aggregation_sink_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,11 @@ class PartitionedAggSinkLocalState
std::mutex _spill_lock;
std::condition_variable _spill_cv;

/// Resources in shared state will be released when the operator is closed,
/// but there may be asynchronous spilling tasks at this time, which can lead to conflicts.
/// So, we need hold the pointer of shared state.
std::shared_ptr<PartitionedAggSharedState> _shared_state_holder;

// temp structures during spilling
vectorized::MutableColumns key_columns_;
vectorized::MutableColumns value_columns_;
Expand Down
11 changes: 10 additions & 1 deletion be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -196,9 +196,18 @@ Status PartitionedAggLocalState::initiate_merge_spill_partition_agg_data(Runtime
RETURN_IF_ERROR(Base::_shared_state->in_mem_shared_state->reset_hash_table());
_dependency->Dependency::block();

auto execution_context = state->get_task_execution_context();
_shared_state_holder = _shared_state->shared_from_this();
RETURN_IF_ERROR(
ExecEnv::GetInstance()->spill_stream_mgr()->get_async_task_thread_pool()->submit_func(
[this, state] {
[this, state, execution_context] {
auto execution_context_lock = execution_context.lock();
if (!execution_context_lock) {
LOG(INFO) << "execution_context released, maybe query was cancelled.";
// FIXME: return status is meaningless?
return Status::Cancelled("Cancelled");
}

SCOPED_ATTACH_TASK(state);
Defer defer {[&]() {
if (!_status.ok()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,11 @@ class PartitionedAggLocalState final : public PipelineXSpillLocalState<Partition
std::mutex _merge_spill_lock;
std::condition_variable _merge_spill_cv;

/// Resources in shared state will be released when the operator is closed,
/// but there may be asynchronous spilling tasks at this time, which can lead to conflicts.
/// So, we need hold the pointer of shared state.
std::shared_ptr<PartitionedAggSharedState> _shared_state_holder;

std::unique_ptr<RuntimeProfile> _internal_runtime_profile;
RuntimeProfile::Counter* _get_results_timer = nullptr;
RuntimeProfile::Counter* _serialize_result_timer = nullptr;
Expand Down
128 changes: 79 additions & 49 deletions be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -174,29 +174,37 @@ Status PartitionedHashJoinProbeLocalState::spill_build_block(RuntimeState* state

auto* spill_io_pool = ExecEnv::GetInstance()->spill_stream_mgr()->get_spill_io_thread_pool(
build_spilling_stream->get_spill_root_dir());
return spill_io_pool->submit_func([state, &build_spilling_stream, &mutable_block, this] {
(void)state; // avoid ut compile error
SCOPED_ATTACH_TASK(state);
if (_spill_status_ok) {
auto build_block = mutable_block->to_block();
DCHECK_EQ(mutable_block->rows(), 0);
auto st = build_spilling_stream->spill_block(build_block, false);
if (!st.ok()) {
std::unique_lock<std::mutex> lock(_spill_lock);
_spill_status_ok = false;
_spill_status = std::move(st);
} else {
COUNTER_UPDATE(_spill_build_rows, build_block.rows());
COUNTER_UPDATE(_spill_build_blocks, 1);
}
}
--_spilling_task_count;
auto execution_context = state->get_task_execution_context();
_shared_state_holder = _shared_state->shared_from_this();
return spill_io_pool->submit_func(
[execution_context, state, &build_spilling_stream, &mutable_block, this] {
auto execution_context_lock = execution_context.lock();
if (!execution_context_lock) {
LOG(INFO) << "execution_context released, maybe query was cancelled.";
return;
}
(void)state; // avoid ut compile error
SCOPED_ATTACH_TASK(state);
if (_spill_status_ok) {
auto build_block = mutable_block->to_block();
DCHECK_EQ(mutable_block->rows(), 0);
auto st = build_spilling_stream->spill_block(build_block, false);
if (!st.ok()) {
std::unique_lock<std::mutex> lock(_spill_lock);
_spill_status_ok = false;
_spill_status = std::move(st);
} else {
COUNTER_UPDATE(_spill_build_rows, build_block.rows());
COUNTER_UPDATE(_spill_build_blocks, 1);
}
}
--_spilling_task_count;

if (_spilling_task_count == 0) {
std::unique_lock<std::mutex> lock(_spill_lock);
_dependency->set_ready();
}
});
if (_spilling_task_count == 0) {
std::unique_lock<std::mutex> lock(_spill_lock);
_dependency->set_ready();
}
});
}

Status PartitionedHashJoinProbeLocalState::spill_probe_blocks(RuntimeState* state,
Expand All @@ -223,34 +231,42 @@ Status PartitionedHashJoinProbeLocalState::spill_probe_blocks(RuntimeState* stat
}

if (!blocks.empty()) {
return spill_io_pool->submit_func([state, &blocks, &spilling_stream, this] {
(void)state; // avoid ut compile error
SCOPED_ATTACH_TASK(state);
COUNTER_UPDATE(_spill_probe_blocks, blocks.size());
while (!blocks.empty()) {
auto block = std::move(blocks.back());
blocks.pop_back();
if (_spill_status_ok) {
auto st = spilling_stream->spill_block(block, false);
if (!st.ok()) {
std::unique_lock<std::mutex> lock(_spill_lock);
_spill_status_ok = false;
_spill_status = std::move(st);
break;
auto execution_context = state->get_task_execution_context();
_shared_state_holder = _shared_state->shared_from_this();
return spill_io_pool->submit_func(
[execution_context, state, &blocks, spilling_stream, this] {
auto execution_context_lock = execution_context.lock();
if (!execution_context_lock) {
LOG(INFO) << "execution_context released, maybe query was cancelled.";
_dependency->set_ready();
return;
}
SCOPED_ATTACH_TASK(state);
COUNTER_UPDATE(_spill_probe_blocks, blocks.size());
while (!blocks.empty() && !state->is_cancelled()) {
auto block = std::move(blocks.back());
blocks.pop_back();
if (_spill_status_ok) {
auto st = spilling_stream->spill_block(block, false);
if (!st.ok()) {
std::unique_lock<std::mutex> lock(_spill_lock);
_spill_status_ok = false;
_spill_status = std::move(st);
break;
}
COUNTER_UPDATE(_spill_probe_rows, block.rows());
} else {
break;
}
}
COUNTER_UPDATE(_spill_probe_rows, block.rows());
} else {
break;
}
}

--_spilling_task_count;
--_spilling_task_count;

if (_spilling_task_count == 0) {
std::unique_lock<std::mutex> lock(_spill_lock);
_dependency->set_ready();
}
});
if (_spilling_task_count == 0) {
std::unique_lock<std::mutex> lock(_spill_lock);
_dependency->set_ready();
}
});
} else {
--_spilling_task_count;
if (_spilling_task_count == 0) {
Expand Down Expand Up @@ -296,7 +312,14 @@ Status PartitionedHashJoinProbeLocalState::recovery_build_blocks_from_disk(Runti
auto& mutable_block = _shared_state->partitioned_build_blocks[partition_index];
DCHECK(mutable_block != nullptr);

auto read_func = [this, state, &spilled_stream, &mutable_block] {
auto execution_context = state->get_task_execution_context();
_shared_state_holder = _shared_state->shared_from_this();
auto read_func = [this, state, &spilled_stream, &mutable_block, execution_context] {
auto execution_context_lock = execution_context.lock();
if (!execution_context_lock) {
LOG(INFO) << "execution_context released, maybe query was cancelled.";
return;
}
Defer defer([this] { --_spilling_task_count; });
(void)state; // avoid ut compile error
SCOPED_ATTACH_TASK(state);
Expand Down Expand Up @@ -363,7 +386,14 @@ Status PartitionedHashJoinProbeLocalState::recovery_probe_blocks_from_disk(Runti
auto& blocks = _probe_blocks[partition_index];

/// TODO: maybe recovery more blocks each time.
auto read_func = [this, state, &spilled_stream, &blocks] {
auto execution_context = state->get_task_execution_context();
_shared_state_holder = _shared_state->shared_from_this();
auto read_func = [this, execution_context, state, &spilled_stream, &blocks] {
auto execution_context_lock = execution_context.lock();
if (!execution_context_lock) {
LOG(INFO) << "execution_context released, maybe query was cancelled.";
return;
}
Defer defer([this] { --_spilling_task_count; });
(void)state; // avoid ut compile error
SCOPED_ATTACH_TASK(state);
Expand Down
5 changes: 5 additions & 0 deletions be/src/pipeline/exec/partitioned_hash_join_probe_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,11 @@ class PartitionedHashJoinProbeLocalState final
std::vector<std::unique_ptr<vectorized::MutableBlock>> _partitioned_blocks;
std::map<uint32_t, std::vector<vectorized::Block>> _probe_blocks;

/// Resources in shared state will be released when the operator is closed,
/// but there may be asynchronous spilling tasks at this time, which can lead to conflicts.
/// So, we need hold the pointer of shared state.
std::shared_ptr<PartitionedHashJoinSharedState> _shared_state_holder;

std::vector<vectorized::SpillStreamSPtr> _probe_spilling_streams;

std::unique_ptr<PartitionerType> _partitioner;
Expand Down
9 changes: 8 additions & 1 deletion be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,14 @@ Status PartitionedHashJoinSinkLocalState::revoke_memory(RuntimeState* state) {
auto* spill_io_pool =
ExecEnv::GetInstance()->spill_stream_mgr()->get_async_task_thread_pool();
DCHECK(spill_io_pool != nullptr);
auto st = spill_io_pool->submit_func([this, state, spilling_stream, i] {
auto execution_context = state->get_task_execution_context();
_shared_state_holder = _shared_state->shared_from_this();
auto st = spill_io_pool->submit_func([this, execution_context, state, spilling_stream, i] {
auto execution_context_lock = execution_context.lock();
if (!execution_context_lock) {
LOG(INFO) << "execution_context released, maybe query was cancelled.";
return;
}
(void)state; // avoid ut compile error
SCOPED_ATTACH_TASK(state);
_spill_to_disk(i, spilling_stream);
Expand Down
5 changes: 5 additions & 0 deletions be/src/pipeline/exec/partitioned_hash_join_sink_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,11 @@ class PartitionedHashJoinSinkLocalState
Status _spill_status;
std::mutex _spill_status_lock;

/// Resources in shared state will be released when the operator is closed,
/// but there may be asynchronous spilling tasks at this time, which can lead to conflicts.
/// So, we need hold the pointer of shared state.
std::shared_ptr<PartitionedHashJoinSharedState> _shared_state_holder;

std::unique_ptr<PartitionerType> _partitioner;

RuntimeProfile::Counter* _partition_timer = nullptr;
Expand Down
11 changes: 10 additions & 1 deletion be/src/pipeline/exec/spill_sort_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -214,11 +214,20 @@ Status SpillSortSinkLocalState::revoke_memory(RuntimeState* state) {
if (!_eos) {
Base::_dependency->Dependency::block();
}

auto execution_context = state->get_task_execution_context();
_shared_state_holder = _shared_state->shared_from_this();
status =
ExecEnv::GetInstance()
->spill_stream_mgr()
->get_spill_io_thread_pool(_spilling_stream->get_spill_root_dir())
->submit_func([this, state, &parent] {
->submit_func([this, state, &parent, execution_context] {
auto execution_context_lock = execution_context.lock();
if (!execution_context_lock) {
LOG(INFO) << "execution_context released, maybe query was cancelled.";
return Status::OK();
}

SCOPED_ATTACH_TASK(state);
Defer defer {[&]() {
if (!_shared_state->sink_status.ok()) {
Expand Down
5 changes: 5 additions & 0 deletions be/src/pipeline/exec/spill_sort_sink_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,11 @@ class SpillSortSinkLocalState : public PipelineXSpillSinkLocalState<SpillSortSha

friend class SpillSortSinkOperatorX;

/// Resources in shared state will be released when the operator is closed,
/// but there may be asynchronous spilling tasks at this time, which can lead to conflicts.
/// So, we need hold the pointer of shared state.
std::shared_ptr<SpillSortSharedState> _shared_state_holder;

std::unique_ptr<RuntimeState> _runtime_state;
std::unique_ptr<RuntimeProfile> _internal_runtime_profile;
RuntimeProfile::Counter* _partial_sort_timer = nullptr;
Expand Down
10 changes: 9 additions & 1 deletion be/src/pipeline/exec/spill_sort_source_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,15 @@ Status SpillSortLocalState::initiate_merge_sort_spill_streams(RuntimeState* stat
}
}};

auto spill_func = [this, state, &parent] {
auto execution_context = state->get_task_execution_context();
_shared_state_holder = _shared_state->shared_from_this();
auto spill_func = [this, state, &parent, execution_context] {
auto execution_context_lock = execution_context.lock();
if (!execution_context_lock) {
LOG(INFO) << "execution_context released, maybe query was cancelled.";
return Status::OK();
}

SCOPED_TIMER(_spill_merge_sort_timer);
SCOPED_ATTACH_TASK(state);
Defer defer {[&]() {
Expand Down
5 changes: 5 additions & 0 deletions be/src/pipeline/exec/spill_sort_source_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,11 @@ class SpillSortLocalState final : public PipelineXSpillLocalState<SpillSortShare
bool _opened = false;
Status _status;

/// Resources in shared state will be released when the operator is closed,
/// but there may be asynchronous spilling tasks at this time, which can lead to conflicts.
/// So, we need hold the pointer of shared state.
std::shared_ptr<SpillSortSharedState> _shared_state_holder;

int64_t _external_sort_bytes_threshold = 134217728; // 128M
std::vector<vectorized::SpillStreamSPtr> _current_merging_streams;
std::unique_ptr<vectorized::VSortedRunMerger> _merger;
Expand Down
Loading

0 comments on commit 87f9927

Please sign in to comment.