Skip to content

Commit

Permalink
[Refactor] refactor sink buffer
Browse files Browse the repository at this point in the history
Signed-off-by: stdpain <[email protected]>
  • Loading branch information
stdpain committed Dec 3, 2024
1 parent 51537af commit fdd255f
Show file tree
Hide file tree
Showing 2 changed files with 81 additions and 98 deletions.
127 changes: 51 additions & 76 deletions be/src/exec/pipeline/exchange/sink_buffer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include <bthread/bthread.h>

#include <chrono>
#include <mutex>
#include <string_view>

#include "fmt/core.h"
Expand All @@ -42,23 +43,20 @@ SinkBuffer::SinkBuffer(FragmentContext* fragment_ctx, const std::vector<TPlanFra
continue;
}

auto it = _num_sinkers.find(instance_id.lo);
if (it == _num_sinkers.end()) {
_num_sinkers[instance_id.lo] = 0;
_request_seqs[instance_id.lo] = -1;
_max_continuous_acked_seqs[instance_id.lo] = -1;
_discontinuous_acked_seqs[instance_id.lo] = std::unordered_set<int64_t>();
_buffers[instance_id.lo] = std::queue<TransmitChunkInfo, std::list<TransmitChunkInfo>>();
_num_finished_rpcs[instance_id.lo] = 0;
_num_in_flight_rpcs[instance_id.lo] = 0;
_network_times[instance_id.lo] = TimeTrace{};
_mutexes[instance_id.lo] = std::make_unique<Mutex>();
_dest_addrs[instance_id.lo] = dest.brpc_server;
if (_sink_ctxs.count(instance_id.lo) == 0) {
_sink_ctxs[instance_id.lo] = std::make_unique<SinkContext>();
auto& ctx = sink_ctx(instance_id.lo);
ctx.num_sinker = 0;
ctx.request_seq = -1;
ctx.max_continuous_acked_seqs = -1;
ctx.num_finished_rpcs = 0;
ctx.num_in_flight_rpcs = 0;
ctx.dest_addrs = dest.brpc_server;

PUniqueId finst_id;
finst_id.set_hi(instance_id.hi);
finst_id.set_lo(instance_id.lo);
_instance_id2finst_id[instance_id.lo] = std::move(finst_id);
ctx.finst_id = std::move(finst_id);
}
}
}
Expand All @@ -70,15 +68,15 @@ SinkBuffer::~SinkBuffer() {

DCHECK(is_finished());

_buffers.clear();
_sink_ctxs.clear();
}

void SinkBuffer::incr_sinker(RuntimeState* state) {
_num_uncancelled_sinkers++;
for (auto& [_, num_sinkers_per_instance] : _num_sinkers) {
num_sinkers_per_instance++;
for (auto& [_, sink_ctx] : _sink_ctxs) {
sink_ctx->num_sinker++;
}
_num_remaining_eos += _num_sinkers.size();
_num_remaining_eos += _sink_ctxs.size();
}

Status SinkBuffer::add_request(TransmitChunkInfo& request) {
Expand All @@ -105,7 +103,9 @@ Status SinkBuffer::add_request(TransmitChunkInfo& request) {
}

auto& instance_id = request.fragment_instance_id;
RETURN_IF_ERROR(_try_to_send_rpc(instance_id, [&]() { _buffers[instance_id.lo].push(request); }));
auto& context = sink_ctx(instance_id.lo);

RETURN_IF_ERROR(_try_to_send_rpc(instance_id, [&]() { context.buffer.push(request); }));
}

return Status::OK();
Expand All @@ -115,10 +115,10 @@ bool SinkBuffer::is_full() const {
// std::queue' read is concurrent safe without mutex
// Judgement may not that accurate because we do not known in advance which
// instance the data to be sent corresponds to
size_t max_buffer_size = config::pipeline_sink_buffer_size * _buffers.size();
size_t max_buffer_size = config::pipeline_sink_buffer_size * _sink_ctxs.size();
size_t buffer_size = 0;
for (auto& [_, buffer] : _buffers) {
buffer_size += buffer.size();
for (auto& [_, context] : _sink_ctxs) {
buffer_size += context->buffer.size();
}
const bool is_full = buffer_size > max_buffer_size;

Expand Down Expand Up @@ -195,7 +195,8 @@ void SinkBuffer::update_profile(RuntimeProfile* profile) {

int64_t SinkBuffer::_network_time() {
int64_t max = 0;
for (auto& [_, time_trace] : _network_times) {
for (auto& [_, context] : _sink_ctxs) {
auto& time_trace = context->network_time;
double average_concurrency =
static_cast<double>(time_trace.accumulated_concurrency) / std::max(1, time_trace.times);
int64_t average_accumulated_time =
Expand All @@ -210,31 +211,6 @@ int64_t SinkBuffer::_network_time() {
void SinkBuffer::cancel_one_sinker(RuntimeState* const state) {
if (--_num_uncancelled_sinkers == 0) {
_is_finishing = true;
if (state != nullptr && state->query_ctx() && state->query_ctx()->is_query_expired()) {
// how many in-flight rpcs and what exchange receivers are.
if (_total_in_flight_rpc > 0) {
std::stringstream ss;
auto remain_rpc_num = 0;
for (auto& remain_rpc : _num_in_flight_rpcs) {
std::lock_guard<Mutex> l(*_mutexes[remain_rpc.first]);
if (remain_rpc.second > 0) {
ss << (remain_rpc_num > 0 ? ", " : "") << print_id(_instance_id2finst_id[remain_rpc.first]);
remain_rpc_num++;
}
}
LOG(WARNING) << "Fragment " << print_id(_fragment_ctx->fragment_instance_id()) << " SinkBuffer remains "
<< remain_rpc_num << " rpcs, dest are " << ss.str();
}
// how many alive drivers left and what they are.
if (_num_remaining_eos > 0) {
std::stringstream ss;
for (auto& remain_eos : _num_sinkers) {
ss << print_id(_instance_id2finst_id[remain_eos.first]) << "(" << remain_eos.second << "),";
}
LOG(WARNING) << "Fragment " << print_id(_fragment_ctx->fragment_instance_id())
<< " remains EOS : " << ss.str();
}
}
}
if (state != nullptr && state->query_ctx() && state->query_ctx()->is_query_expired()) {
// check how many cancel operations are issued, and show the state of that time.
Expand All @@ -248,11 +224,12 @@ void SinkBuffer::cancel_one_sinker(RuntimeState* const state) {

void SinkBuffer::_update_network_time(const TUniqueId& instance_id, const int64_t send_timestamp,
const int64_t receiver_post_process_time) {
auto& context = sink_ctx(instance_id.lo);
const int64_t get_response_timestamp = MonotonicNanos();
_last_receive_time = get_response_timestamp;
int32_t concurrency = _num_in_flight_rpcs[instance_id.lo];
int32_t concurrency = context.num_in_flight_rpcs;
int64_t time_usage = get_response_timestamp - send_timestamp - receiver_post_process_time;
_network_times[instance_id.lo].update(time_usage, concurrency);
context.network_time.update(time_usage, concurrency);
_rpc_cumulative_time += time_usage;
_rpc_count++;
}
Expand All @@ -263,9 +240,10 @@ void SinkBuffer::_process_send_window(const TUniqueId& instance_id, const int64_
if (!_is_dest_merge) {
return;
}
auto& seqs = _discontinuous_acked_seqs[instance_id.lo];
auto& context = sink_ctx(instance_id.lo);
auto& seqs = context.discontinuous_acked_seqs;
seqs.insert(sequence);
auto& max_continuous_acked_seq = _max_continuous_acked_seqs[instance_id.lo];
auto& max_continuous_acked_seq = context.max_continuous_acked_seqs;
std::unordered_set<int64_t>::iterator it;
while ((it = seqs.find(max_continuous_acked_seq + 1)) != seqs.end()) {
seqs.erase(it);
Expand All @@ -274,7 +252,8 @@ void SinkBuffer::_process_send_window(const TUniqueId& instance_id, const int64_
}

Status SinkBuffer::_try_to_send_rpc(const TUniqueId& instance_id, const std::function<void()>& pre_works) {
std::lock_guard<Mutex> l(*_mutexes[instance_id.lo]);
auto& context = sink_ctx(instance_id.lo);
std::lock_guard guard(context.mutex);
pre_works();

DeferOp decrease_defer([this]() { --_num_sending_rpc; });
Expand All @@ -285,18 +264,17 @@ Status SinkBuffer::_try_to_send_rpc(const TUniqueId& instance_id, const std::fun
return Status::OK();
}

auto& buffer = _buffers[instance_id.lo];
auto& buffer = context.buffer;

bool too_much_brpc_process = false;
if (_is_dest_merge) {
// discontinuous_acked_window_size means that we are not received all the ack
// with sequence from _max_continuous_acked_seqs[x] to _request_seqs[x]
// with sequence from max_continuous_acked_seqs to request_seqs
// Limit the size of the window to avoid buffering too much out-of-order data at the receiving side
int64_t discontinuous_acked_window_size =
_request_seqs[instance_id.lo] - _max_continuous_acked_seqs[instance_id.lo];
int64_t discontinuous_acked_window_size = context.request_seq - context.max_continuous_acked_seqs;
too_much_brpc_process = discontinuous_acked_window_size >= config::pipeline_sink_brpc_dop;
} else {
too_much_brpc_process = _num_in_flight_rpcs[instance_id.lo] >= config::pipeline_sink_brpc_dop;
too_much_brpc_process = context.num_in_flight_rpcs >= config::pipeline_sink_brpc_dop;
}
if (buffer.empty() || too_much_brpc_process) {
return Status::OK();
Expand All @@ -319,7 +297,7 @@ Status SinkBuffer::_try_to_send_rpc(const TUniqueId& instance_id, const std::fun
// The order of data transmiting in IO level may not be strictly the same as
// the order of submitting data packets
// But we must guarantee that first packet must be received first
if (_num_finished_rpcs[instance_id.lo] == 0 && _num_in_flight_rpcs[instance_id.lo] > 0) {
if (context.num_finished_rpcs == 0 && context.num_in_flight_rpcs > 0) {
need_wait = true;
return Status::OK();
}
Expand All @@ -331,12 +309,12 @@ Status SinkBuffer::_try_to_send_rpc(const TUniqueId& instance_id, const std::fun
if (--_num_remaining_eos == 0) {
_is_finishing = true;
}
--_num_sinkers[instance_id.lo];
sink_ctx(instance_id.lo).num_sinker--;
});
// Only the last eos is sent to ExchangeSourceOperator. it must be guaranteed that
// eos is the last packet to send to finish the input stream of the corresponding of
// ExchangeSourceOperator and eos is sent exactly-once.
if (_num_sinkers[instance_id.lo] > 1) {
if (context.num_sinker > 1) {
if (request.params->chunks_size() == 0) {
continue;
} else {
Expand All @@ -346,7 +324,7 @@ Status SinkBuffer::_try_to_send_rpc(const TUniqueId& instance_id, const std::fun
// The order of data transmiting in IO level may not be strictly the same as
// the order of submitting data packets
// But we must guarantee that eos packent must be the last packet
if (_num_in_flight_rpcs[instance_id.lo] > 0) {
if (context.num_in_flight_rpcs > 0) {
need_wait = true;
return Status::OK();
}
Expand All @@ -357,8 +335,8 @@ Status SinkBuffer::_try_to_send_rpc(const TUniqueId& instance_id, const std::fun
}
}

*request.params->mutable_finst_id() = _instance_id2finst_id[instance_id.lo];
request.params->set_sequence(++_request_seqs[instance_id.lo]);
*request.params->mutable_finst_id() = context.finst_id;
request.params->set_sequence(++context.request_seq);

if (!request.attachment.empty()) {
_bytes_sent += request.attachment.size();
Expand All @@ -374,13 +352,11 @@ Status SinkBuffer::_try_to_send_rpc(const TUniqueId& instance_id, const std::fun
closure->addFailedHandler([this](const ClosureContext& ctx, std::string_view rpc_error_msg) noexcept {
auto defer = DeferOp([this]() { --_total_in_flight_rpc; });
_is_finishing = true;
{
std::lock_guard<Mutex> l(*_mutexes[ctx.instance_id.lo]);
++_num_finished_rpcs[ctx.instance_id.lo];
--_num_in_flight_rpcs[ctx.instance_id.lo];
}
auto& context = sink_ctx(ctx.instance_id.lo);
++context.num_finished_rpcs;
--context.num_in_flight_rpcs;

const auto& dest_addr = _dest_addrs[ctx.instance_id.lo];
const auto& dest_addr = context.dest_addrs;
std::string err_msg =
fmt::format("transmit chunk rpc failed [dest_instance_id={}] [dest={}:{}] detail:{}",
print_id(ctx.instance_id), dest_addr.hostname, dest_addr.port, rpc_error_msg);
Expand All @@ -392,16 +368,15 @@ Status SinkBuffer::_try_to_send_rpc(const TUniqueId& instance_id, const std::fun
// when _total_in_flight_rpc desc to 0, _fragment_ctx may be destructed
auto defer = DeferOp([this]() { --_total_in_flight_rpc; });
Status status(result.status());
{
std::lock_guard<Mutex> l(*_mutexes[ctx.instance_id.lo]);
++_num_finished_rpcs[ctx.instance_id.lo];
--_num_in_flight_rpcs[ctx.instance_id.lo];
}
auto& context = sink_ctx(ctx.instance_id.lo);
++context.num_finished_rpcs;
--context.num_in_flight_rpcs;

if (!status.ok()) {
_is_finishing = true;
_fragment_ctx->cancel(status);

const auto& dest_addr = _dest_addrs[ctx.instance_id.lo];
const auto& dest_addr = context.dest_addrs;
LOG(WARNING) << fmt::format("transmit chunk rpc failed [dest_instance_id={}] [dest={}:{}] [msg={}]",
print_id(ctx.instance_id), dest_addr.hostname, dest_addr.port,
status.message());
Expand All @@ -414,7 +389,7 @@ Status SinkBuffer::_try_to_send_rpc(const TUniqueId& instance_id, const std::fun
});

++_total_in_flight_rpc;
++_num_in_flight_rpcs[instance_id.lo];
++context.num_in_flight_rpcs;

// Attachment will be released by process_mem_tracker in closure->Run() in bthread, when receiving the response,
// so decrease the memory usage of attachment from instance_mem_tracker immediately before sending the request.
Expand Down
52 changes: 30 additions & 22 deletions be/src/exec/pipeline/exchange/sink_buffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include <algorithm>
#include <future>
#include <list>
#include <memory>
#include <mutex>
#include <queue>
#include <unordered_set>
Expand Down Expand Up @@ -134,32 +135,39 @@ class SinkBuffer {
/// use int64_t as key, which is the field type of TUniqueId::lo
/// because TUniqueId::hi is exactly the same in one query

// num eos per instance
phmap::flat_hash_map<int64_t, int64_t, StdHash<int64_t>> _num_sinkers;
phmap::flat_hash_map<int64_t, int64_t, StdHash<int64_t>> _request_seqs;
// Considering the following situation
// Sending request 1, 2, 3 in order with one possible order of response 1, 3, 2,
// and field transformation are as following
// a. receive response-1, _max_continuous_acked_seqs[x]->1, _discontinuous_acked_seqs[x]->()
// b. receive response-3, _max_continuous_acked_seqs[x]->1, _discontinuous_acked_seqs[x]->(3)
// c. receive response-2, _max_continuous_acked_seqs[x]->3, _discontinuous_acked_seqs[x]->()
phmap::flat_hash_map<int64_t, int64_t, StdHash<int64_t>> _max_continuous_acked_seqs;
phmap::flat_hash_map<int64_t, std::unordered_set<int64_t>, StdHash<int64_t>> _discontinuous_acked_seqs;
struct SinkContext {
// num eos per instance
int64_t num_sinker;
int64_t request_seq;
// Considering the following situation
// Sending request 1, 2, 3 in order with one possible order of response 1, 3, 2,
// and field transformation are as following
// a. receive response-1, _max_continuous_acked_seqs[x]->1, _discontinuous_acked_seqs[x]->()
// b. receive response-3, _max_continuous_acked_seqs[x]->1, _discontinuous_acked_seqs[x]->(3)
// c. receive response-2, _max_continuous_acked_seqs[x]->3, _discontinuous_acked_seqs[x]->()
int64_t max_continuous_acked_seqs;
std::unordered_set<int64_t> discontinuous_acked_seqs;
// The request needs the reference to the allocated finst id,
// so cache finst id for each dest fragment instance.
PUniqueId finst_id;
std::queue<TransmitChunkInfo, std::list<TransmitChunkInfo>> buffer;
Mutex request_mutex;

std::atomic_size_t num_finished_rpcs;
std::atomic_size_t num_in_flight_rpcs;
TimeTrace network_time;

Mutex mutex;

TNetworkAddress dest_addrs;
};
phmap::flat_hash_map<int64_t, std::unique_ptr<SinkContext>, StdHash<int64_t>> _sink_ctxs;
SinkContext& sink_ctx(int64_t instance_id) { return *_sink_ctxs[instance_id]; }

std::atomic<int32_t> _total_in_flight_rpc = 0;
std::atomic<int32_t> _num_uncancelled_sinkers = 0;
std::atomic<int32_t> _num_remaining_eos = 0;

// The request needs the reference to the allocated finst id,
// so cache finst id for each dest fragment instance.
phmap::flat_hash_map<int64_t, PUniqueId, StdHash<int64_t>> _instance_id2finst_id;
phmap::flat_hash_map<int64_t, std::queue<TransmitChunkInfo, std::list<TransmitChunkInfo>>, StdHash<int64_t>>
_buffers;
phmap::flat_hash_map<int64_t, int32_t, StdHash<int64_t>> _num_finished_rpcs;
phmap::flat_hash_map<int64_t, int32_t, StdHash<int64_t>> _num_in_flight_rpcs;
phmap::flat_hash_map<int64_t, TimeTrace, StdHash<int64_t>> _network_times;
phmap::flat_hash_map<int64_t, std::unique_ptr<Mutex>, StdHash<int64_t>> _mutexes;
phmap::flat_hash_map<int64_t, TNetworkAddress, StdHash<int64_t>> _dest_addrs;

// True means that SinkBuffer needn't input chunk and send chunk anymore,
// but there may be still in-flight RPC running.
// It becomes true, when all sinkers have sent EOS, or been set_finished/cancelled, or RPC has returned error.
Expand Down

0 comments on commit fdd255f

Please sign in to comment.