From 067c7a5cfcb2f717f18e36b29adb0988e2dfdf61 Mon Sep 17 00:00:00 2001 From: stdpain Date: Mon, 2 Dec 2024 20:33:20 +0800 Subject: [PATCH] [Refactor] refactor sink buffer Signed-off-by: stdpain --- be/src/exec/pipeline/exchange/sink_buffer.cpp | 127 +++++++----------- be/src/exec/pipeline/exchange/sink_buffer.h | 52 ++++--- 2 files changed, 81 insertions(+), 98 deletions(-) diff --git a/be/src/exec/pipeline/exchange/sink_buffer.cpp b/be/src/exec/pipeline/exchange/sink_buffer.cpp index a6e0a1650705b7..4be8a271e3cd48 100644 --- a/be/src/exec/pipeline/exchange/sink_buffer.cpp +++ b/be/src/exec/pipeline/exchange/sink_buffer.cpp @@ -17,6 +17,7 @@ #include #include +#include #include #include "fmt/core.h" @@ -42,23 +43,20 @@ SinkBuffer::SinkBuffer(FragmentContext* fragment_ctx, const std::vector(); - _buffers[instance_id.lo] = std::queue>(); - _num_finished_rpcs[instance_id.lo] = 0; - _num_in_flight_rpcs[instance_id.lo] = 0; - _network_times[instance_id.lo] = TimeTrace{}; - _mutexes[instance_id.lo] = std::make_unique(); - _dest_addrs[instance_id.lo] = dest.brpc_server; + if (_sink_ctxs.count(instance_id.lo) == 0) { + _sink_ctxs[instance_id.lo] = std::make_unique(); + auto& ctx = sink_ctx(instance_id.lo); + ctx.num_sinker = 0; + ctx.request_seq = -1; + ctx.max_continuous_acked_seqs = -1; + ctx.num_finished_rpcs = 0; + ctx.num_in_flight_rpcs = 0; + ctx.dest_addrs = dest.brpc_server; PUniqueId finst_id; finst_id.set_hi(instance_id.hi); finst_id.set_lo(instance_id.lo); - _instance_id2finst_id[instance_id.lo] = std::move(finst_id); + ctx.finst_id = std::move(finst_id); } } } @@ -70,15 +68,15 @@ SinkBuffer::~SinkBuffer() { DCHECK(is_finished()); - _buffers.clear(); + _sink_ctxs.clear(); } void SinkBuffer::incr_sinker(RuntimeState* state) { _num_uncancelled_sinkers++; - for (auto& [_, num_sinkers_per_instance] : _num_sinkers) { - num_sinkers_per_instance++; + for (auto& [_, sink_ctx] : _sink_ctxs) { + sink_ctx->num_sinker++; } - _num_remaining_eos += _num_sinkers.size(); + _num_remaining_eos += _sink_ctxs.size(); } Status SinkBuffer::add_request(TransmitChunkInfo& request) { @@ -105,7 +103,9 @@ Status SinkBuffer::add_request(TransmitChunkInfo& request) { } auto& instance_id = request.fragment_instance_id; - RETURN_IF_ERROR(_try_to_send_rpc(instance_id, [&]() { _buffers[instance_id.lo].push(request); })); + auto& context = sink_ctx(instance_id.lo); + + RETURN_IF_ERROR(_try_to_send_rpc(instance_id, [&]() { context.buffer.push(request); })); } return Status::OK(); @@ -115,10 +115,10 @@ bool SinkBuffer::is_full() const { // std::queue' read is concurrent safe without mutex // Judgement may not that accurate because we do not known in advance which // instance the data to be sent corresponds to - size_t max_buffer_size = config::pipeline_sink_buffer_size * _buffers.size(); + size_t max_buffer_size = config::pipeline_sink_buffer_size * _sink_ctxs.size(); size_t buffer_size = 0; - for (auto& [_, buffer] : _buffers) { - buffer_size += buffer.size(); + for (auto& [_, context] : _sink_ctxs) { + buffer_size += context->buffer.size(); } const bool is_full = buffer_size > max_buffer_size; @@ -195,7 +195,8 @@ void SinkBuffer::update_profile(RuntimeProfile* profile) { int64_t SinkBuffer::_network_time() { int64_t max = 0; - for (auto& [_, time_trace] : _network_times) { + for (auto& [_, context] : _sink_ctxs) { + auto& time_trace = context->network_time; double average_concurrency = static_cast(time_trace.accumulated_concurrency) / std::max(1, time_trace.times); int64_t average_accumulated_time = @@ -210,31 +211,6 @@ int64_t SinkBuffer::_network_time() { void SinkBuffer::cancel_one_sinker(RuntimeState* const state) { if (--_num_uncancelled_sinkers == 0) { _is_finishing = true; - if (state != nullptr && state->query_ctx() && state->query_ctx()->is_query_expired()) { - // how many in-flight rpcs and what exchange receivers are. - if (_total_in_flight_rpc > 0) { - std::stringstream ss; - auto remain_rpc_num = 0; - for (auto& remain_rpc : _num_in_flight_rpcs) { - std::lock_guard l(*_mutexes[remain_rpc.first]); - if (remain_rpc.second > 0) { - ss << (remain_rpc_num > 0 ? ", " : "") << print_id(_instance_id2finst_id[remain_rpc.first]); - remain_rpc_num++; - } - } - LOG(WARNING) << "Fragment " << print_id(_fragment_ctx->fragment_instance_id()) << " SinkBuffer remains " - << remain_rpc_num << " rpcs, dest are " << ss.str(); - } - // how many alive drivers left and what they are. - if (_num_remaining_eos > 0) { - std::stringstream ss; - for (auto& remain_eos : _num_sinkers) { - ss << print_id(_instance_id2finst_id[remain_eos.first]) << "(" << remain_eos.second << "),"; - } - LOG(WARNING) << "Fragment " << print_id(_fragment_ctx->fragment_instance_id()) - << " remains EOS : " << ss.str(); - } - } } if (state != nullptr && state->query_ctx() && state->query_ctx()->is_query_expired()) { // check how many cancel operations are issued, and show the state of that time. @@ -248,11 +224,12 @@ void SinkBuffer::cancel_one_sinker(RuntimeState* const state) { void SinkBuffer::_update_network_time(const TUniqueId& instance_id, const int64_t send_timestamp, const int64_t receiver_post_process_time) { + auto& context = sink_ctx(instance_id.lo); const int64_t get_response_timestamp = MonotonicNanos(); _last_receive_time = get_response_timestamp; - int32_t concurrency = _num_in_flight_rpcs[instance_id.lo]; + int32_t concurrency = context.num_in_flight_rpcs; int64_t time_usage = get_response_timestamp - send_timestamp - receiver_post_process_time; - _network_times[instance_id.lo].update(time_usage, concurrency); + context.network_time.update(time_usage, concurrency); _rpc_cumulative_time += time_usage; _rpc_count++; } @@ -263,9 +240,10 @@ void SinkBuffer::_process_send_window(const TUniqueId& instance_id, const int64_ if (!_is_dest_merge) { return; } - auto& seqs = _discontinuous_acked_seqs[instance_id.lo]; + auto& context = sink_ctx(instance_id.lo); + auto& seqs = context.discontinuous_acked_seqs; seqs.insert(sequence); - auto& max_continuous_acked_seq = _max_continuous_acked_seqs[instance_id.lo]; + auto& max_continuous_acked_seq = context.max_continuous_acked_seqs; std::unordered_set::iterator it; while ((it = seqs.find(max_continuous_acked_seq + 1)) != seqs.end()) { seqs.erase(it); @@ -274,7 +252,8 @@ void SinkBuffer::_process_send_window(const TUniqueId& instance_id, const int64_ } Status SinkBuffer::_try_to_send_rpc(const TUniqueId& instance_id, const std::function& pre_works) { - std::lock_guard l(*_mutexes[instance_id.lo]); + auto& context = sink_ctx(instance_id.lo); + std::lock_guard guard(context.mutex); pre_works(); DeferOp decrease_defer([this]() { --_num_sending_rpc; }); @@ -285,18 +264,17 @@ Status SinkBuffer::_try_to_send_rpc(const TUniqueId& instance_id, const std::fun return Status::OK(); } - auto& buffer = _buffers[instance_id.lo]; + auto& buffer = context.buffer; bool too_much_brpc_process = false; if (_is_dest_merge) { // discontinuous_acked_window_size means that we are not received all the ack - // with sequence from _max_continuous_acked_seqs[x] to _request_seqs[x] + // with sequence from max_continuous_acked_seqs to request_seqs // Limit the size of the window to avoid buffering too much out-of-order data at the receiving side - int64_t discontinuous_acked_window_size = - _request_seqs[instance_id.lo] - _max_continuous_acked_seqs[instance_id.lo]; + int64_t discontinuous_acked_window_size = context.request_seq - context.max_continuous_acked_seqs; too_much_brpc_process = discontinuous_acked_window_size >= config::pipeline_sink_brpc_dop; } else { - too_much_brpc_process = _num_in_flight_rpcs[instance_id.lo] >= config::pipeline_sink_brpc_dop; + too_much_brpc_process = context.num_in_flight_rpcs >= config::pipeline_sink_brpc_dop; } if (buffer.empty() || too_much_brpc_process) { return Status::OK(); @@ -319,7 +297,7 @@ Status SinkBuffer::_try_to_send_rpc(const TUniqueId& instance_id, const std::fun // The order of data transmiting in IO level may not be strictly the same as // the order of submitting data packets // But we must guarantee that first packet must be received first - if (_num_finished_rpcs[instance_id.lo] == 0 && _num_in_flight_rpcs[instance_id.lo] > 0) { + if (context.num_finished_rpcs == 0 && context.num_in_flight_rpcs > 0) { need_wait = true; return Status::OK(); } @@ -331,12 +309,12 @@ Status SinkBuffer::_try_to_send_rpc(const TUniqueId& instance_id, const std::fun if (--_num_remaining_eos == 0) { _is_finishing = true; } - --_num_sinkers[instance_id.lo]; + sink_ctx(instance_id.lo).num_sinker--; }); // Only the last eos is sent to ExchangeSourceOperator. it must be guaranteed that // eos is the last packet to send to finish the input stream of the corresponding of // ExchangeSourceOperator and eos is sent exactly-once. - if (_num_sinkers[instance_id.lo] > 1) { + if (context.num_sinker > 1) { if (request.params->chunks_size() == 0) { continue; } else { @@ -346,7 +324,7 @@ Status SinkBuffer::_try_to_send_rpc(const TUniqueId& instance_id, const std::fun // The order of data transmiting in IO level may not be strictly the same as // the order of submitting data packets // But we must guarantee that eos packent must be the last packet - if (_num_in_flight_rpcs[instance_id.lo] > 0) { + if (context.num_in_flight_rpcs > 0) { need_wait = true; return Status::OK(); } @@ -357,8 +335,8 @@ Status SinkBuffer::_try_to_send_rpc(const TUniqueId& instance_id, const std::fun } } - *request.params->mutable_finst_id() = _instance_id2finst_id[instance_id.lo]; - request.params->set_sequence(++_request_seqs[instance_id.lo]); + *request.params->mutable_finst_id() = context.finst_id; + request.params->set_sequence(++context.request_seq); if (!request.attachment.empty()) { _bytes_sent += request.attachment.size(); @@ -374,13 +352,11 @@ Status SinkBuffer::_try_to_send_rpc(const TUniqueId& instance_id, const std::fun closure->addFailedHandler([this](const ClosureContext& ctx, std::string_view rpc_error_msg) noexcept { auto defer = DeferOp([this]() { --_total_in_flight_rpc; }); _is_finishing = true; - { - std::lock_guard l(*_mutexes[ctx.instance_id.lo]); - ++_num_finished_rpcs[ctx.instance_id.lo]; - --_num_in_flight_rpcs[ctx.instance_id.lo]; - } + auto& context = sink_ctx(ctx.instance_id.lo); + ++context.num_finished_rpcs; + --context.num_in_flight_rpcs; - const auto& dest_addr = _dest_addrs[ctx.instance_id.lo]; + const auto& dest_addr = context.dest_addrs; std::string err_msg = fmt::format("transmit chunk rpc failed [dest_instance_id={}] [dest={}:{}] detail:{}", print_id(ctx.instance_id), dest_addr.hostname, dest_addr.port, rpc_error_msg); @@ -392,16 +368,15 @@ Status SinkBuffer::_try_to_send_rpc(const TUniqueId& instance_id, const std::fun // when _total_in_flight_rpc desc to 0, _fragment_ctx may be destructed auto defer = DeferOp([this]() { --_total_in_flight_rpc; }); Status status(result.status()); - { - std::lock_guard l(*_mutexes[ctx.instance_id.lo]); - ++_num_finished_rpcs[ctx.instance_id.lo]; - --_num_in_flight_rpcs[ctx.instance_id.lo]; - } + auto& context = sink_ctx(ctx.instance_id.lo); + ++context.num_finished_rpcs; + --context.num_in_flight_rpcs; + if (!status.ok()) { _is_finishing = true; _fragment_ctx->cancel(status); - const auto& dest_addr = _dest_addrs[ctx.instance_id.lo]; + const auto& dest_addr = context.dest_addrs; LOG(WARNING) << fmt::format("transmit chunk rpc failed [dest_instance_id={}] [dest={}:{}] [msg={}]", print_id(ctx.instance_id), dest_addr.hostname, dest_addr.port, status.message()); @@ -414,7 +389,7 @@ Status SinkBuffer::_try_to_send_rpc(const TUniqueId& instance_id, const std::fun }); ++_total_in_flight_rpc; - ++_num_in_flight_rpcs[instance_id.lo]; + ++context.num_in_flight_rpcs; // Attachment will be released by process_mem_tracker in closure->Run() in bthread, when receiving the response, // so decrease the memory usage of attachment from instance_mem_tracker immediately before sending the request. diff --git a/be/src/exec/pipeline/exchange/sink_buffer.h b/be/src/exec/pipeline/exchange/sink_buffer.h index b7b61cb68d7b6e..f7787c6141578d 100644 --- a/be/src/exec/pipeline/exchange/sink_buffer.h +++ b/be/src/exec/pipeline/exchange/sink_buffer.h @@ -19,6 +19,7 @@ #include #include #include +#include #include #include #include @@ -134,32 +135,39 @@ class SinkBuffer { /// use int64_t as key, which is the field type of TUniqueId::lo /// because TUniqueId::hi is exactly the same in one query - // num eos per instance - phmap::flat_hash_map> _num_sinkers; - phmap::flat_hash_map> _request_seqs; - // Considering the following situation - // Sending request 1, 2, 3 in order with one possible order of response 1, 3, 2, - // and field transformation are as following - // a. receive response-1, _max_continuous_acked_seqs[x]->1, _discontinuous_acked_seqs[x]->() - // b. receive response-3, _max_continuous_acked_seqs[x]->1, _discontinuous_acked_seqs[x]->(3) - // c. receive response-2, _max_continuous_acked_seqs[x]->3, _discontinuous_acked_seqs[x]->() - phmap::flat_hash_map> _max_continuous_acked_seqs; - phmap::flat_hash_map, StdHash> _discontinuous_acked_seqs; + struct SinkContext { + // num eos per instance + int64_t num_sinker; + int64_t request_seq; + // Considering the following situation + // Sending request 1, 2, 3 in order with one possible order of response 1, 3, 2, + // and field transformation are as following + // a. receive response-1, _max_continuous_acked_seqs[x]->1, _discontinuous_acked_seqs[x]->() + // b. receive response-3, _max_continuous_acked_seqs[x]->1, _discontinuous_acked_seqs[x]->(3) + // c. receive response-2, _max_continuous_acked_seqs[x]->3, _discontinuous_acked_seqs[x]->() + int64_t max_continuous_acked_seqs; + std::unordered_set discontinuous_acked_seqs; + // The request needs the reference to the allocated finst id, + // so cache finst id for each dest fragment instance. + PUniqueId finst_id; + std::queue> buffer; + Mutex request_mutex; + + std::atomic_size_t num_finished_rpcs; + std::atomic_size_t num_in_flight_rpcs; + TimeTrace network_time; + + Mutex mutex; + + TNetworkAddress dest_addrs; + }; + phmap::flat_hash_map, StdHash> _sink_ctxs; + SinkContext& sink_ctx(int64_t instance_id) { return *_sink_ctxs[instance_id]; } + std::atomic _total_in_flight_rpc = 0; std::atomic _num_uncancelled_sinkers = 0; std::atomic _num_remaining_eos = 0; - // The request needs the reference to the allocated finst id, - // so cache finst id for each dest fragment instance. - phmap::flat_hash_map> _instance_id2finst_id; - phmap::flat_hash_map>, StdHash> - _buffers; - phmap::flat_hash_map> _num_finished_rpcs; - phmap::flat_hash_map> _num_in_flight_rpcs; - phmap::flat_hash_map> _network_times; - phmap::flat_hash_map, StdHash> _mutexes; - phmap::flat_hash_map> _dest_addrs; - // True means that SinkBuffer needn't input chunk and send chunk anymore, // but there may be still in-flight RPC running. // It becomes true, when all sinkers have sent EOS, or been set_finished/cancelled, or RPC has returned error.