Skip to content

Commit

Permalink
[Exechange Node] Support Exechange Node (apache#9)
Browse files Browse the repository at this point in the history
* dev send-recv

* dev exechange
  • Loading branch information
stdpain authored and HappenLee committed Jul 13, 2021
1 parent 246fe6f commit c446fb0
Show file tree
Hide file tree
Showing 42 changed files with 1,807 additions and 105 deletions.
3 changes: 2 additions & 1 deletion be/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -689,8 +689,9 @@ if (${MAKE_TEST} STREQUAL "ON")
add_subdirectory(${TEST_DIR}/udf)
add_subdirectory(${TEST_DIR}/util)
add_subdirectory(${TEST_DIR}/vec/core)
add_subdirectory(${TEST_DIR}/vec/function)
add_subdirectory(${TEST_DIR}/vec/exprs)
add_subdirectory(${TEST_DIR}/vec/function)
add_subdirectory(${TEST_DIR}/vec/runtime)
add_subdirectory(${TEST_DIR}/vec/aggregate_functions)
add_subdirectory(${TEST_DIR}/plugin)
add_subdirectory(${TEST_DIR}/plugin/example)
Expand Down
32 changes: 25 additions & 7 deletions be/src/exec/data_sink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
#include "runtime/runtime_state.h"
#include "util/logging.h"
#include "vec/sink/result_sink.h"
#include "vec/sink/vdata_stream_sender.h"

namespace doris {

Expand Down Expand Up @@ -63,13 +64,30 @@ Status DataSink::create_data_sink(ObjectPool* pool, const TDataSink& thrift_sink
sink->reset(tmp_sink);
break;
}
case TDataSinkType::VDATA_STREAM_SINK: {
if (!thrift_sink.__isset.stream_sink) {
return Status::InternalError("Missing data stream sink.");
}
bool send_query_statistics_with_every_batch =
params.__isset.send_query_statistics_with_every_batch
? params.send_query_statistics_with_every_batch
: false;
// TODO: figure out good buffer size based on size of output row
tmp_sink = new doris::vectorized::VDataStreamSender(
pool, params.sender_id, row_desc, thrift_sink.stream_sink, params.destinations,
16 * 1024, send_query_statistics_with_every_batch);
// RETURN_IF_ERROR(sender->prepare(state->obj_pool(), thrift_sink.stream_sink));
sink->reset(tmp_sink);
break;
}
case TDataSinkType::RESULT_SINK:
if (!thrift_sink.__isset.result_sink) {
return Status::InternalError("Missing data buffer sink.");
}

// TODO: figure out good buffer size based on size of output row
tmp_sink = new ResultSink(row_desc, output_exprs, thrift_sink.result_sink, 1024, config::is_vec);
tmp_sink = new ResultSink(row_desc, output_exprs, thrift_sink.result_sink, 1024,
config::is_vec);
sink->reset(tmp_sink);
break;
case TDataSinkType::VRESULT_SINK:
Expand All @@ -78,7 +96,8 @@ Status DataSink::create_data_sink(ObjectPool* pool, const TDataSink& thrift_sink
}

// TODO: figure out good buffer size based on size of output row
tmp_sink = new doris::vectorized::ResultSink(row_desc, output_exprs, thrift_sink.result_sink, 1024);
tmp_sink = new doris::vectorized::ResultSink(row_desc, output_exprs,
thrift_sink.result_sink, 1024);
sink->reset(tmp_sink);
break;
case TDataSinkType::MEMORY_SCRATCH_SINK:
Expand Down Expand Up @@ -108,8 +127,7 @@ Status DataSink::create_data_sink(ObjectPool* pool, const TDataSink& thrift_sink
if (!thrift_sink.__isset.odbc_table_sink) {
return Status::InternalError("Missing data odbc sink.");
}
OdbcTableSink* odbc_tbl_sink = new OdbcTableSink(pool,
row_desc, output_exprs);
OdbcTableSink* odbc_tbl_sink = new OdbcTableSink(pool, row_desc, output_exprs);
sink->reset(odbc_tbl_sink);
break;
}
Expand Down Expand Up @@ -168,9 +186,9 @@ Status DataSink::init(const TDataSink& thrift_sink) {
}

Status DataSink::prepare(RuntimeState* state) {
_expr_mem_tracker = MemTracker::CreateTracker(
-1, _name + ":Expr:" + std::to_string(state->load_job_id()),
state->instance_mem_tracker());
_expr_mem_tracker =
MemTracker::CreateTracker(-1, _name + ":Expr:" + std::to_string(state->load_job_id()),
state->instance_mem_tracker());
return Status::OK();
}

Expand Down
9 changes: 7 additions & 2 deletions be/src/exec/exec_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,10 +61,10 @@
#include "runtime/runtime_state.h"
#include "util/debug_util.h"
#include "util/runtime_profile.h"

#include "vec/core/block.h"
#include "vec/exec/aggregation_node.h"
#include "vec/exec/olap_scan_node.h"
#include "vec/exec/vexchange_node.h"
#include "vec/exprs/vexpr.h"

namespace doris {
Expand Down Expand Up @@ -166,7 +166,8 @@ void ExecNode::push_down_predicate(RuntimeState* state, std::list<ExprContext*>*
Status ExecNode::init(const TPlanNode& tnode, RuntimeState* state) {
if (tnode.__isset.vconjunct) {
_vconjunct_ctx_ptr.reset(new doris::vectorized::VExprContext*);
RETURN_IF_ERROR(doris::vectorized::VExpr::create_expr_tree(_pool, tnode.vconjunct, _vconjunct_ctx_ptr.get()));
RETURN_IF_ERROR(doris::vectorized::VExpr::create_expr_tree(_pool, tnode.vconjunct,
_vconjunct_ctx_ptr.get()));
}
RETURN_IF_ERROR(Expr::create_expr_trees(_pool, tnode.conjuncts, &_conjunct_ctxs));

Expand Down Expand Up @@ -416,6 +417,10 @@ Status ExecNode::create_node(RuntimeState* state, ObjectPool* pool, const TPlanN
*node = pool->add(new ExchangeNode(pool, tnode, descs));
return Status::OK();

case TPlanNodeType::VEXCHANGE_NODE:
*node = pool->add(new doris::vectorized::VExchangeNode(pool, tnode, descs));
return Status::OK();

case TPlanNodeType::SELECT_NODE:
*node = pool->add(new SelectNode(pool, tnode, descs));
return Status::OK();
Expand Down
6 changes: 4 additions & 2 deletions be/src/runtime/data_stream_sender.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -417,6 +417,7 @@ DataStreamSender::DataStreamSender(ObjectPool* pool, int sender_id, const RowDes

// We use the ParttitionRange to compare here. It should not be a member function of PartitionInfo
// class becaurce there are some other member in it.
// TODO: move this to dpp_sink
static bool compare_part_use_range(const PartitionInfo* v1, const PartitionInfo* v2) {
return v1->range() < v2->range();
}
Expand Down Expand Up @@ -469,8 +470,9 @@ Status DataStreamSender::prepare(RuntimeState* state) {
<< "])";
_profile = _pool->add(new RuntimeProfile(title.str()));
SCOPED_TIMER(_profile->total_time_counter());
_mem_tracker = MemTracker::CreateTracker(_profile, -1, "DataStreamSender",
state->instance_mem_tracker());
_mem_tracker = MemTracker::CreateTracker(
_profile, -1, "DataStreamSender:" + print_id(state->fragment_instance_id()),
state->instance_mem_tracker());

if (_part_type == TPartitionType::UNPARTITIONED || _part_type == TPartitionType::RANDOM) {
// Randomize the order we open/transmit to channels to avoid thundering herd problems.
Expand Down
6 changes: 5 additions & 1 deletion be/src/runtime/exec_env.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@
#include "olap/options.h"

namespace doris {

namespace vectorized {
class VDataStreamMgr;
}
class BfdParser;
class BrokerMgr;
class BrpcStubCache;
Expand Down Expand Up @@ -92,6 +94,7 @@ class ExecEnv {
const std::string& token() const;
ExternalScanContextMgr* external_scan_context_mgr() { return _external_scan_context_mgr; }
DataStreamMgr* stream_mgr() { return _stream_mgr; }
doris::vectorized::VDataStreamMgr* vstream_mgr() { return _vstream_mgr; }
ResultBufferMgr* result_mgr() { return _result_mgr; }
ResultQueueMgr* result_queue_mgr() { return _result_queue_mgr; }
ClientCache<BackendServiceClient>* client_cache() { return _backend_client_cache; }
Expand Down Expand Up @@ -160,6 +163,7 @@ class ExecEnv {
// Leave protected so that subclasses can override
ExternalScanContextMgr* _external_scan_context_mgr = nullptr;
DataStreamMgr* _stream_mgr = nullptr;
doris::vectorized::VDataStreamMgr* _vstream_mgr = nullptr;
ResultBufferMgr* _result_mgr = nullptr;
ResultQueueMgr* _result_queue_mgr = nullptr;
ClientCache<BackendServiceClient>* _backend_client_cache = nullptr;
Expand Down
2 changes: 2 additions & 0 deletions be/src/runtime/exec_env_init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
#include "util/parse_util.h"
#include "util/pretty_printer.h"
#include "util/priority_thread_pool.hpp"
#include "vec/runtime/vdata_stream_mgr.h"

namespace doris {

Expand All @@ -81,6 +82,7 @@ Status ExecEnv::_init(const std::vector<StorePath>& store_paths) {
_store_paths = store_paths;
_external_scan_context_mgr = new ExternalScanContextMgr(this);
_stream_mgr = new DataStreamMgr();
_vstream_mgr = new doris::vectorized::VDataStreamMgr();
_result_mgr = new ResultBufferMgr();
_result_queue_mgr = new ResultQueueMgr();
_backend_client_cache = new BackendServiceClientCache(config::max_client_cache_size_per_host);
Expand Down
13 changes: 12 additions & 1 deletion be/src/runtime/plan_fragment_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
#include "util/pretty_printer.h"
#include "util/uid_util.h"
#include "vec/core/block.h"
#include "vec/exec/vexchange_node.h"
#include "vec/sink/data_sink.h"

namespace doris {
Expand Down Expand Up @@ -160,7 +161,8 @@ Status PlanFragmentExecutor::prepare(const TExecPlanFragmentParams& request,
if (params.__isset.debug_node_id) {
DCHECK(params.__isset.debug_action);
DCHECK(params.__isset.debug_phase);
ExecNode::set_debug_options(params.debug_node_id, params.debug_phase, params.debug_action, _plan);
ExecNode::set_debug_options(params.debug_node_id, params.debug_phase, params.debug_action,
_plan);
}

// set #senders of exchange nodes before calling Prepare()
Expand All @@ -172,6 +174,15 @@ Status PlanFragmentExecutor::prepare(const TExecPlanFragmentParams& request,
DCHECK_GT(num_senders, 0);
static_cast<ExchangeNode*>(exch_node)->set_num_senders(num_senders);
}
// for vexchange node
exch_nodes.clear();
_plan->collect_nodes(TPlanNodeType::VEXCHANGE_NODE, &exch_nodes);
for (auto exch_node : exch_nodes) {
DCHECK_EQ(exch_node->type(), TPlanNodeType::VEXCHANGE_NODE);
int num_senders = find_with_default(params.per_exch_num_senders, exch_node->id(), 0);
DCHECK_GT(num_senders, 0);
static_cast<doris::vectorized::VExchangeNode*>(exch_node)->set_num_senders(num_senders);
}

RETURN_IF_ERROR(_plan->prepare(_runtime_state.get()));
// set scan ranges
Expand Down
14 changes: 14 additions & 0 deletions be/src/service/internal_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
#include "service/brpc.h"
#include "util/thrift_util.h"
#include "util/uid_util.h"
#include "vec/runtime/vdata_stream_mgr.h"

namespace doris {

Expand Down Expand Up @@ -285,6 +286,19 @@ void PInternalServiceImpl<T>::apply_filter(::google::protobuf::RpcController* co
}
st.to_protobuf(response->mutable_status());
}

void PInternalServiceImpl<T>::transmit_block(google::protobuf::RpcController* cntl_base,
const PTransmitDataParams* request,
PTransmitDataResult* response,
google::protobuf::Closure* done) {
VLOG_ROW << "transmit data: fragment_instance_id=" << print_id(request->finst_id())
<< " node=" << request->node_id();
_exec_env->vstream_mgr()->transmit_block(request, &done);
if (done != nullptr) {
done->Run();
}
}

template class PInternalServiceImpl<PBackendService>;
template class PInternalServiceImpl<palo::PInternalService>;

Expand Down
4 changes: 4 additions & 0 deletions be/src/service/internal_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,10 @@ class PInternalServiceImpl : public T {
const ::doris::PPublishFilterRequest* request,
::doris::PPublishFilterResponse* response,
::google::protobuf::Closure* done) override;
void transmit_block(::google::protobuf::RpcController* controller,
const ::doris::PTransmitDataParams* request,
::doris::PTransmitDataResult* response,
::google::protobuf::Closure* done) override;

private:
Status _exec_plan_fragment(const std::string& s_request);
Expand Down
8 changes: 4 additions & 4 deletions be/src/util/brpc_stub_cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,9 @@ namespace doris {
class BrpcStubCache {
public:
BrpcStubCache();
~BrpcStubCache();
virtual ~BrpcStubCache();

PBackendService_Stub* get_stub(const butil::EndPoint& endpoint) {
virtual PBackendService_Stub* get_stub(const butil::EndPoint& endpoint) {
std::lock_guard<SpinLock> l(_lock);
auto stub_ptr = _stub_map.seek(endpoint);
if (stub_ptr != nullptr) {
Expand All @@ -52,7 +52,7 @@ class BrpcStubCache {
return stub;
}

PBackendService_Stub* get_stub(const TNetworkAddress& taddr) {
virtual PBackendService_Stub* get_stub(const TNetworkAddress& taddr) {
butil::EndPoint endpoint;
if (str2endpoint(taddr.hostname.c_str(), taddr.port, &endpoint)) {
LOG(WARNING) << "unknown endpoint, hostname=" << taddr.hostname;
Expand All @@ -61,7 +61,7 @@ class BrpcStubCache {
return get_stub(endpoint);
}

PBackendService_Stub* get_stub(const std::string& host, int port) {
virtual PBackendService_Stub* get_stub(const std::string& host, int port) {
butil::EndPoint endpoint;
if (str2endpoint(host.c_str(), port, &endpoint)) {
LOG(WARNING) << "unknown endpoint, hostname=" << host;
Expand Down
5 changes: 5 additions & 0 deletions be/src/vec/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ set(VEC_FILES
exec/aggregation_node.cpp
exec/olap_scan_node.cpp
exec/olap_scanner.cpp
exec/vexchange_node.cpp
exprs/vectorized_agg_fn.cpp
exprs/vectorized_fn_call.cpp
exprs/vexpr.cpp
Expand All @@ -76,6 +77,10 @@ set(VEC_FILES
functions/divide.cpp
sink/mysql_result_writer.cpp
sink/result_sink.cpp
sink/vdata_stream_sender.cpp
runtime/vdata_stream_recvr.cpp
runtime/vdata_stream_mgr.cpp
runtime/vpartition_info.cpp
)

add_library(Vec STATIC
Expand Down
4 changes: 4 additions & 0 deletions be/src/vec/aggregate_functions/aggregate_function.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@
#pragma once

#include <cstddef>
#include <istream>
#include <memory>
#include <ostream>
#include <type_traits>
#include <vector>

Expand Down Expand Up @@ -101,9 +103,11 @@ class IAggregateFunction {

/// Serializes state (to transmit it over the network, for example).
// virtual void serialize(ConstAggregateDataPtr place, WriteBuffer & buf) const = 0;
virtual void serialize(ConstAggregateDataPtr place, std::ostream& buf) const = 0;

/// Deserializes state. This function is called only for empty (just created) states.
// virtual void deserialize(AggregateDataPtr place, ReadBuffer & buf, Arena * arena) const = 0;
virtual void deserialize(AggregateDataPtr place, std::istream& buf, Arena* arena) const = 0;

/// Returns true if a function requires Arena to handle own states (see add(), merge(), deserialize()).
virtual bool allocatesMemoryInArena() const { return false; }
Expand Down
29 changes: 13 additions & 16 deletions be/src/vec/aggregate_functions/aggregate_function_count.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include <vec/columns/column_nullable.h>
#include <vec/common/assert_cast.h>
#include <vec/data_types/data_types_number.h>
#include <vec/io/io_helper.h>

#include <array>

Expand Down Expand Up @@ -57,15 +58,13 @@ class AggregateFunctionCount final
data(place).count += data(rhs).count;
}

// void serialize(ConstAggregateDataPtr place, WriteBuffer & buf) const override
// {
// writeVarUInt(data(place).count, buf);
// }
void serialize(ConstAggregateDataPtr place, std::ostream& buf) const override {
writeVarUInt(data(place).count, buf);
}

// void deserialize(AggregateDataPtr place, ReadBuffer & buf, Arena *) const override
// {
// readVarUInt(data(place).count, buf);
// }
void deserialize(AggregateDataPtr place, std::istream& buf, Arena*) const override {
readVarUInt(data(place).count, buf);
}

void insertResultInto(ConstAggregateDataPtr place, IColumn& to) const override {
assert_cast<ColumnUInt64&>(to).getData().push_back(data(place).count);
Expand Down Expand Up @@ -102,15 +101,13 @@ class AggregateFunctionCountNotNullUnary final
data(place).count += data(rhs).count;
}

// void serialize(ConstAggregateDataPtr place, WriteBuffer & buf) const override
// {
// writeVarUInt(data(place).count, buf);
// }
void serialize(ConstAggregateDataPtr place, std::ostream& buf) const override {
writeVarUInt(data(place).count, buf);
}

// void deserialize(AggregateDataPtr place, ReadBuffer & buf, Arena *) const override
// {
// readVarUInt(data(place).count, buf);
// }
void deserialize(AggregateDataPtr place, std::istream& buf, Arena*) const override {
readVarUInt(data(place).count, buf);
}

void insertResultInto(ConstAggregateDataPtr place, IColumn& to) const override {
assert_cast<ColumnUInt64&>(to).getData().push_back(data(place).count);
Expand Down
9 changes: 3 additions & 6 deletions be/src/vec/aggregate_functions/aggregate_function_nothing.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include <vec/columns/column.h>
#include <vec/data_types/data_type_nothing.h>
#include <vec/data_types/data_type_nullable.h>
#include <vec/io/io_helper.h>

namespace doris::vectorized {

Expand Down Expand Up @@ -51,13 +52,9 @@ class AggregateFunctionNothing final : public IAggregateFunctionHelper<Aggregate

void merge(AggregateDataPtr, ConstAggregateDataPtr, Arena*) const override {}

// void serialize(ConstAggregateDataPtr, WriteBuffer &) const override
// {
// }
void serialize(ConstAggregateDataPtr, std::ostream&) const override {}

// void deserialize(AggregateDataPtr, ReadBuffer &, Arena *) const override
// {
// }
void deserialize(AggregateDataPtr, std::istream&, Arena*) const override {}

void insertResultInto(ConstAggregateDataPtr, IColumn& to) const override { to.insertDefault(); }

Expand Down
Loading

0 comments on commit c446fb0

Please sign in to comment.