Skip to content

Commit

Permalink
support multi-tablet group by agg (apache#15)
Browse files Browse the repository at this point in the history
  • Loading branch information
stdpain authored and HappenLee committed Jul 13, 2021
1 parent bb0a3df commit c8ab31b
Show file tree
Hide file tree
Showing 6 changed files with 257 additions and 16 deletions.
22 changes: 22 additions & 0 deletions be/src/vec/core/block.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -696,4 +696,26 @@ void Block::serialize(PBlock* pblock) const {
}
}

int MutableBlock::rows() {
for (const auto& column : _columns)
if (column) return column->size();

return 0;
}

void MutableBlock::add_row(const Block* block, int row) {
auto& src_columns_with_schema = block->getColumnsWithTypeAndName();
for (int i = 0; i < _columns.size(); ++i) {
_columns[i]->insertFrom(*src_columns_with_schema[i].column.get(), row);
}
}

Block MutableBlock::to_block() {
ColumnsWithTypeAndName columns_with_schema;
for (int i = 0; i < _columns.size(); ++i) {
columns_with_schema.emplace_back(std::move(_columns[i]), _data_types[i], "");
}
return columns_with_schema;
}

} // namespace doris::vectorized
35 changes: 34 additions & 1 deletion be/src/vec/core/block.h
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ class Block {
std::string dumpData() const;

static void filter_block(Block* block, int filter_conlumn_id, int column_to_keep);
// serialize block to PRowBatch
// serialize block to PRowBatch
void serialize(PBlock* pblock) const;

private:
Expand All @@ -180,4 +180,37 @@ using BlocksPtrs = std::shared_ptr<std::vector<BlocksPtr>>;
/// Calculate difference in structure of blocks and write description into output strings. NOTE It doesn't compare values of constant columns.
// void getBlocksDifference(const Block & lhs, const Block & rhs, std::string & out_lhs_diff, std::string & out_rhs_diff);

class MutableBlock {
private:
MutableColumns _columns;
DataTypes _data_types;

public:
MutableBlock() = default;
~MutableBlock() = default;

MutableBlock(MutableColumns&& columns, DataTypes&& data_types)
: _columns(std::move(columns)), _data_types(std::move(data_types)) {}

int rows();

bool empty() { return rows() == 0; }

MutableColumns& mutable_columns() { return _columns; }

DataTypes& data_types() { return _data_types; }

Block to_block();

void add_row(const Block* block, int row);

void clear() {
_columns.clear();
_data_types.clear();
}

// TODO: use add_rows instead of this
// add_rows(Block* block,PODArray<Int32>& group, int group_num);
};

} // namespace doris::vectorized
147 changes: 142 additions & 5 deletions be/src/vec/exec/vaggregation_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -137,12 +137,23 @@ Status AggregationNode::prepare(RuntimeState* state) {

} else {
_agg_data.init(AggregatedDataVariants::Type::serialized);
if (_is_merge) {
_executor.execute = std::bind<Status>(&AggregationNode::_merge_with_serialized_key,
this, std::placeholders::_1);
} else {
_executor.execute = std::bind<Status>(&AggregationNode::_execute_with_serialized_key,
this, std::placeholders::_1);
}

_executor.execute = std::bind<Status>(&AggregationNode::_execute_with_serialized_key, this,
std::placeholders::_1);
_executor.get_result = std::bind<Status>(&AggregationNode::_get_with_serialized_key_result,
this, std::placeholders::_1, std::placeholders::_2,
std::placeholders::_3);
if (_needs_finalize) {
_executor.get_result = std::bind<Status>(
&AggregationNode::_get_with_serialized_key_result, this, std::placeholders::_1,
std::placeholders::_2, std::placeholders::_3);
} else {
_executor.get_result = std::bind<Status>(
&AggregationNode::_serialize_with_serialized_key_result, this,
std::placeholders::_1, std::placeholders::_2, std::placeholders::_3);
}
}

return Status::OK();
Expand Down Expand Up @@ -410,4 +421,130 @@ Status AggregationNode::_get_with_serialized_key_result(RuntimeState* state, Blo
return Status::OK();
}

Status AggregationNode::_serialize_with_serialized_key_result(RuntimeState* state, Block* block,
bool* eos) {
DCHECK(_agg_data.serialized != nullptr);
using Method = AggregationMethodSerialized<AggregatedDataWithStringKey>;
using AggState = Method::State;

auto& method = *_agg_data.serialized;
auto& data = _agg_data.serialized->data;

int key_size = _probe_expr_ctxs.size();
int agg_size = _aggregate_evaluators.size();
MutableColumns value_columns(agg_size);
DataTypes value_data_types(agg_size);

MutableColumns key_columns;
for (int i = 0; i < key_size; ++i) {
key_columns.emplace_back(_probe_expr_ctxs[i]->root()->data_type()->createColumn());
}

// will serialize data to string column
for (int i = 0; i < _aggregate_evaluators.size(); ++i) {
value_data_types[i] = makeNullable(std::make_shared<DataTypeString>());
value_columns[i] = value_data_types[i]->createColumn();
}

data.forEachValue([&](const auto& key, auto& mapped) {
// insert keys
method.insertKeyIntoColumns(key, key_columns, {});

std::ostringstream buf;
// serialize values
for (size_t i = 0; i < _aggregate_evaluators.size(); ++i) {
_aggregate_evaluators[i]->function()->serialize(
mapped + _offsets_of_aggregate_states[i], buf);
value_columns[i]->insertData(buf.str().c_str(), buf.str().length());
buf.str().clear();
}
});

ColumnsWithTypeAndName columns_with_schema;
for (int i = 0; i < key_size; ++i) {
columns_with_schema.emplace_back(std::move(key_columns[i]),
_probe_expr_ctxs[i]->root()->data_type(), "");
}

for (int i = 0; i < agg_size; ++i) {
columns_with_schema.emplace_back(std::move(value_columns[i]), value_data_types[i], "");
}

*block = Block(columns_with_schema);
*eos = true;
return Status::OK();
}

Status AggregationNode::_merge_with_serialized_key(Block* block) {
DCHECK(!_probe_expr_ctxs.empty());
// now we only support serialized key
// TODO:
DCHECK(_agg_data.serialized != nullptr);

using Method = AggregationMethodSerialized<AggregatedDataWithStringKey>;
using AggState = Method::State;

auto& method = *_agg_data.serialized;

size_t key_size = _probe_expr_ctxs.size();
ColumnRawPtrs key_columns(key_size);

for (size_t i = 0; i < key_size; ++i) {
int result_column_id = -1;
RETURN_IF_ERROR(_probe_expr_ctxs[i]->execute(block, &result_column_id));
key_columns[i] = block->getByPosition(result_column_id).column.get();
}

int rows = block->rows();
PODArray<AggregateDataPtr> places(rows);

AggState state(key_columns, {}, nullptr);

/// For all rows.
for (size_t i = 0; i < rows; ++i) {
AggregateDataPtr aggregate_data = nullptr;

auto emplace_result = state.emplaceKey(method.data, i, _agg_arena_pool);

/// If a new key is inserted, initialize the states of the aggregate functions, and possibly something related to the key.
if (emplace_result.isInserted()) {
/// exception-safety - if you can not allocate memory or create states, then destructors will not be called.
emplace_result.setMapped(nullptr);

aggregate_data = _agg_arena_pool.alignedAlloc(_total_size_of_aggregate_states,
_align_aggregate_states);
_create_agg_status(aggregate_data);

emplace_result.setMapped(aggregate_data);
} else
aggregate_data = emplace_result.getMapped();

places[i] = aggregate_data;
assert(places[i] != nullptr);
}

std::unique_ptr<char[]> deserialize_buffer(new char[_total_size_of_aggregate_states]);
for (int i = 0; i < _aggregate_evaluators.size(); ++i) {
auto column = block->getByPosition(i + key_size).column;
if (column->isNullable()) {
column = ((ColumnNullable*)column.get())->getNestedColumnPtr();
}
for (int j = 0; j < rows; ++j) {
std::string data_buffer;
StringRef ref = column->getDataAt(j);
data_buffer.assign(ref.data, ref.size);
std::istringstream buf(data_buffer);

_aggregate_evaluators[i]->function()->deserialize(
deserialize_buffer.get() + _offsets_of_aggregate_states[i], buf,
&_agg_arena_pool);

_aggregate_evaluators[i]->function()->merge(
places.data()[j] + _offsets_of_aggregate_states[i],
deserialize_buffer.get() + _offsets_of_aggregate_states[i], &_agg_arena_pool);
}
}
return Status::OK();
}

} // namespace doris::vectorized
5 changes: 4 additions & 1 deletion be/src/vec/exec/vaggregation_node.h
Original file line number Diff line number Diff line change
Expand Up @@ -139,13 +139,16 @@ class AggregationNode : public ::doris::ExecNode {

private:
Status _create_agg_status(AggregateDataPtr data);

Status _get_without_key_result(RuntimeState* state, Block* block, bool* eos);
Status _serialize_without_key(RuntimeState* state, Block* block, bool* eos);
Status _execute_without_key(Block* block);
Status _merge_without_key(Block* block);

Status _execute_with_serialized_key(Block* block);
Status _get_with_serialized_key_result(RuntimeState* state, Block* block, bool* eos);
Status _serialize_with_serialized_key_result(RuntimeState* state, Block* block, bool* eos);
Status _execute_with_serialized_key(Block* block);
Status _merge_with_serialized_key(Block* block);

using vectorized_execute = std::function<Status(Block* block)>;
using vectorized_get_result =
Expand Down
59 changes: 51 additions & 8 deletions be/src/vec/sink/vdata_stream_sender.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
#include "runtime/exec_env.h"
#include "runtime/mem_tracker.h"
#include "runtime/runtime_state.h"
#include "vec/common/sip_hash.h"
#include "vec/runtime/vpartition_info.h"

namespace doris::vectorized {
Expand All @@ -17,7 +18,7 @@ Status VDataStreamSender::Channel::init(RuntimeState* state) {

_capacity = std::max(1, _buffer_size / std::max(_row_desc.get_row_size(), 1));
//_batch.reset(new RowBatch(_row_desc, capacity, _parent->_mem_tracker.get()));
_block.reset(new Block());
_mutable_block.reset(new MutableBlock());

if (_brpc_dest_addr.hostname.empty()) {
LOG(WARNING) << "there is no brpc destination address's hostname"
Expand Down Expand Up @@ -46,13 +47,14 @@ Status VDataStreamSender::Channel::init(RuntimeState* state) {
Status VDataStreamSender::Channel::send_current_block(bool eos) {
{
SCOPED_TIMER(_parent->_serialize_batch_timer);
_block->serialize(&_pb_block);
_pb_block.Clear();
_mutable_block->to_block().serialize(&_pb_block);
_mutable_block->clear();
int uncompressed_bytes = _pb_block.ByteSize();
int bytes = uncompressed_bytes;
COUNTER_UPDATE(_parent->_bytes_sent_counter, bytes);
COUNTER_UPDATE(_parent->_uncompressed_bytes_counter, uncompressed_bytes);
}
_block->clear();
RETURN_IF_ERROR(send_block(&_pb_block, eos));
return Status::OK();
}
Expand Down Expand Up @@ -87,6 +89,23 @@ Status VDataStreamSender::Channel::send_block(PBlock* block, bool eos) {
return Status::OK();
}

Status VDataStreamSender::Channel::add_row(Block* block, int row) {
if (_fragment_instance_id.lo == -1) {
return Status::OK();
}
int batch_size = _parent->state()->batch_size();
if (block->rows() == batch_size) {
RETURN_IF_ERROR(send_current_block());
}
if (_mutable_block->rows() == 0) {
auto empty_block = block->cloneEmpty();
_mutable_block.reset(
new MutableBlock(empty_block.mutateColumns(), empty_block.getDataTypes()));
}
_mutable_block->add_row(block, row);
return Status::OK();
}

Status VDataStreamSender::Channel::close_wait(RuntimeState* state) {
if (_need_close) {
Status st = _wait_last_brpc();
Expand All @@ -96,7 +115,7 @@ Status VDataStreamSender::Channel::close_wait(RuntimeState* state) {
_need_close = false;
return st;
}
_block.reset();
_mutable_block.reset();
return Status::OK();
}

Expand All @@ -106,8 +125,8 @@ Status VDataStreamSender::Channel::close_internal() {
}
VLOG_RPC << "Channel::close() instance_id=" << _fragment_instance_id
<< " dest_node=" << _dest_node_id
<< " #rows= " << ((_block == nullptr) ? 0 : _block->rows());
if (_block != nullptr && _block->rows() > 0) {
<< " #rows= " << ((_mutable_block == nullptr) ? 0 : _mutable_block->rows());
if (_mutable_block != nullptr && _mutable_block->rows() > 0) {
RETURN_IF_ERROR(send_current_block(true));
} else {
RETURN_IF_ERROR(send_block(nullptr, true));
Expand Down Expand Up @@ -281,8 +300,32 @@ Status VDataStreamSender::send(RuntimeState* state, Block* block) {
// 3. send batch
// 4. switch proto
} else if (_part_type == TPartitionType::HASH_PARTITIONED) {
// 1. caculate hash
// 2. dispatch rows to channel
// TODO: use vectorized hash caculate
int num_channels = _channels.size();
// will only copy schema
// we don't want send temp columns
auto send_block = *block;

std::vector<int> result(_partition_expr_ctxs.size());
int counter = 0;

for (auto ctx : _partition_expr_ctxs) {
RETURN_IF_ERROR(ctx->execute(block, &result[counter++]));
}

// caculate hash
int rows = block->rows();
for (int i = 0; i < rows; ++i) {
SipHash siphash;
for (int j = 0; j < result.size(); ++j) {
auto column = block->getByPosition(result[j]).column;
column->updateHashWithValue(i, siphash);
}
auto target_channel_id = siphash.get64() % num_channels;
RETURN_IF_ERROR(_channels[target_channel_id]->add_row(&send_block, i));
}
return Status::OK();

} else if (_part_type == TPartitionType::BUCKET_SHFFULE_HASH_PARTITIONED) {
// 1. caculate hash
// 2. dispatch rows to channel
Expand Down
5 changes: 4 additions & 1 deletion be/src/vec/sink/vdata_stream_sender.h
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,8 @@ class VDataStreamSender::Channel {
// if batch is nullptr, send the eof packet
Status send_block(PBlock* block, bool eos = false);

Status add_row(Block* block, int row);

Status send_current_block(bool eos = false);

// Flush buffered rows and close channel. This function don't wait the response
Expand Down Expand Up @@ -212,7 +214,8 @@ class VDataStreamSender::Channel {

// we're accumulating rows into this batch
//boost::scoped_ptr<RowBatch> _batch;
std::unique_ptr<Block> _block;
// std::unique_ptr<Block> _block;
std::unique_ptr<MutableBlock> _mutable_block;

bool _need_close;
int _be_number;
Expand Down

0 comments on commit c8ab31b

Please sign in to comment.