Skip to content

Commit

Permalink
[fix](sink) The issue with 2GB limit of protocol buffer
Browse files Browse the repository at this point in the history
  • Loading branch information
mrhhsg committed Jul 19, 2024
1 parent dbd796c commit acd5467
Show file tree
Hide file tree
Showing 3 changed files with 54 additions and 14 deletions.
8 changes: 8 additions & 0 deletions be/src/runtime/buffer_control_block.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include <google/protobuf/stubs/callback.h>
// IWYU pragma: no_include <bits/chrono.h>
#include <chrono> // IWYU pragma: keep
#include <limits>
#include <ostream>
#include <string>
#include <utility>
Expand Down Expand Up @@ -80,6 +81,13 @@ void GetResultBatchCtx::on_data(const std::unique_ptr<TFetchDataResult>& t_resul
result->set_packet_seq(packet_seq);
result->set_eos(eos);
}

/// The size limit of proto buffer message is 2G
if (result->ByteSizeLong() > std::numeric_limits<int32_t>::max()) {
st = Status::InternalError("Message size exceeds 2GB: {}", result->ByteSizeLong());
result->clear_row_batch();
result->set_empty_batch(true);
}
st.to_protobuf(result->mutable_status());
{ done->Run(); }
delete this;
Expand Down
58 changes: 44 additions & 14 deletions be/src/vec/sink/vmysql_result_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
#include <utility>

#include "common/compiler_util.h" // IWYU pragma: keep
#include "common/config.h"
#include "gutil/integral_types.h"
#include "olap/hll.h"
#include "runtime/buffer_control_block.h"
Expand Down Expand Up @@ -140,23 +141,11 @@ Status VMysqlResultWriter<is_binary_format>::_set_options(
}

template <bool is_binary_format>
Status VMysqlResultWriter<is_binary_format>::write(RuntimeState* state, Block& input_block) {
SCOPED_TIMER(_append_row_batch_timer);
Status VMysqlResultWriter<is_binary_format>::_write_one_block(RuntimeState* state, Block& block) {
Status status = Status::OK();
if (UNLIKELY(input_block.rows() == 0)) {
return status;
}

DCHECK(_output_vexpr_ctxs.empty() != true);

// Exec vectorized expr here to speed up, block.rows() == 0 means expr exec
// failed, just return the error status
Block block;
RETURN_IF_ERROR(VExprContext::get_output_block_after_execute_exprs(_output_vexpr_ctxs,
input_block, &block));
auto num_rows = block.rows();
// convert one batch
auto result = std::make_unique<TFetchDataResult>();
auto num_rows = block.rows();
result->result_batch.rows.resize(num_rows);
uint64_t bytes_sent = 0;
{
Expand Down Expand Up @@ -249,6 +238,47 @@ Status VMysqlResultWriter<is_binary_format>::write(RuntimeState* state, Block& i
return status;
}

template <bool is_binary_format>
Status VMysqlResultWriter<is_binary_format>::write(RuntimeState* state, Block& input_block) {
SCOPED_TIMER(_append_row_batch_timer);
Status status = Status::OK();
if (UNLIKELY(input_block.rows() == 0)) {
return status;
}

DCHECK(_output_vexpr_ctxs.empty() != true);

// Exec vectorized expr here to speed up, block.rows() == 0 means expr exec
// failed, just return the error status
Block block;
RETURN_IF_ERROR(VExprContext::get_output_block_after_execute_exprs(_output_vexpr_ctxs,
input_block, &block));
const auto total_bytes = block.bytes();

if (total_bytes > config::thrift_max_message_size) [[unlikely]] {
const auto total_rows = block.rows();
const auto sub_block_count = (total_bytes + config::thrift_max_message_size - 1) /
config::thrift_max_message_size;
const auto sub_block_rows = (total_rows + sub_block_count - 1) / sub_block_count;

size_t offset = 0;
while (offset < total_rows) {
size_t rows = std::min(sub_block_rows, total_rows - offset);
auto sub_block = block.clone_empty();
for (size_t i = 0; i != block.columns(); ++i) {
sub_block.get_by_position(i).column =
block.get_by_position(i).column->cut(offset, rows);
}
offset += rows;

RETURN_IF_ERROR(_write_one_block(state, sub_block));
}
return Status::OK();
}

return _write_one_block(state, block);
}

template <bool is_binary_format>
Status VMysqlResultWriter<is_binary_format>::close(Status) {
COUNTER_SET(_sent_rows_counter, _written_rows);
Expand Down
2 changes: 2 additions & 0 deletions be/src/vec/sink/vmysql_result_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@ class VMysqlResultWriter final : public ResultWriter {
int _add_one_cell(const ColumnPtr& column_ptr, size_t row_idx, const DataTypePtr& type,
MysqlRowBuffer<is_binary_format>& buffer, int scale = -1);

Status _write_one_block(RuntimeState* state, Block& block);

BufferControlBlock* _sinker = nullptr;

const VExprContextSPtrs& _output_vexpr_ctxs;
Expand Down

0 comments on commit acd5467

Please sign in to comment.