From f32bb23b6e1e00512e1adbc4d5b73fbb08d93bac Mon Sep 17 00:00:00 2001 From: xufei Date: Fri, 6 Nov 2020 19:04:30 +0800 Subject: [PATCH] exchange receiver support multi thread read (#1211) --- .../DataStreams/ExchangeReceiverInputStream.h | 230 +--------------- .../Coprocessor/DAGQueryBlockInterpreter.cpp | 12 +- dbms/src/Flash/Mpp/ExchangeReceiver.h | 251 ++++++++++++++++++ 3 files changed, 267 insertions(+), 226 deletions(-) create mode 100644 dbms/src/Flash/Mpp/ExchangeReceiver.h diff --git a/dbms/src/DataStreams/ExchangeReceiverInputStream.h b/dbms/src/DataStreams/ExchangeReceiverInputStream.h index 14f2799507f..10745e82b1a 100644 --- a/dbms/src/DataStreams/ExchangeReceiverInputStream.h +++ b/dbms/src/DataStreams/ExchangeReceiverInputStream.h @@ -2,6 +2,7 @@ #include #include +#include #include #include #include @@ -9,251 +10,36 @@ #include #include #include - -#pragma GCC diagnostic push -#pragma GCC diagnostic ignored "-Wunused-parameter" -#include -#include -#include -#include -#pragma GCC diagnostic pop - -namespace pingcap -{ -namespace kv -{ - -template <> -struct RpcTypeTraits<::mpp::EstablishMPPConnectionRequest> -{ - using RequestType = ::mpp::EstablishMPPConnectionRequest; - using ResultType = ::mpp::MPPDataPacket; - static std::unique_ptr<::grpc::ClientReader<::mpp::MPPDataPacket>> doRPCCall( - grpc::ClientContext * context, std::shared_ptr client, const RequestType & req) - { - return client->stub->EstablishMPPConnection(context, req); - } -}; - -} // namespace kv -} // namespace pingcap +#include namespace DB { // ExchangeReceiver is in charge of receiving data from exchangeSender located in upstream tasks. -// TODO: Currently, there is a single thread to call the receiver, we need to consider reading parallely -// in the future. class ExchangeReceiverInputStream : public IProfilingBlockInputStream { - TMTContext & context; - std::chrono::seconds timeout; + std::shared_ptr receiver; - tipb::ExchangeReceiver exchange_receiver; - ::mpp::TaskMeta task_meta; - std::vector workers; - - DAGSchema fake_schema; Block sample_block; - // TODO: should be a concurrency bounded queue. - std::mutex rw_mu; - std::condition_variable cv; - std::queue block_buffer; - std::atomic_int live_workers; - bool inited; - bool meet_error; - Exception err; - - Logger * log; - - void decodePacket(const mpp::MPPDataPacket & p) - { - tipb::SelectResponse resp; - resp.ParseFromString(p.data()); - int chunks_size = resp.chunks_size(); - LOG_DEBUG(log, "get chunk size " + std::to_string(chunks_size)); - if (chunks_size == 0) - return; - for (int i = 0; i < chunks_size; i++) - { - Block block; - const tipb::Chunk & chunk = resp.chunks(i); - switch (resp.encode_type()) - { - case tipb::EncodeType::TypeCHBlock: - block = CHBlockChunkCodec().decode(chunk, fake_schema); - break; - case tipb::EncodeType::TypeChunk: - block = ArrowChunkCodec().decode(chunk, fake_schema); - break; - case tipb::EncodeType::TypeDefault: - block = DefaultChunkCodec().decode(chunk, fake_schema); - break; - default: - throw Exception("Unsupported encode type", ErrorCodes::LOGICAL_ERROR); - } - std::lock_guard lk(rw_mu); - block_buffer.push(std::move(block)); - cv.notify_one(); - } - } - - // Check this error is retryable - bool canRetry(const mpp::Error & err) { return err.msg().find("can't find") != std::string::npos; } - - void startAndRead(const String & raw) - { - try - { - startAndReadImpl(raw); - } - catch (Exception & e) - { - meet_error = true; - err = e; - } - catch (std::exception & e) - { - meet_error = true; - err = Exception(e.what()); - } - catch (...) - { - meet_error = true; - err = Exception("fatal error"); - } - live_workers--; - cv.notify_all(); - } - - void startAndReadImpl(const String & raw) - { - // TODO: Retry backoff. - int max_retry = 60; - std::chrono::seconds total_wait_time{}; - for (int idx = 0; idx < max_retry; idx++) - { - auto sender_task = new mpp::TaskMeta(); - sender_task->ParseFromString(raw); - auto req = std::make_shared(); - req->set_allocated_receiver_meta(new mpp::TaskMeta(task_meta)); - req->set_allocated_sender_meta(sender_task); - LOG_DEBUG(log, "begin start and read : " << req->DebugString()); - pingcap::kv::RpcCall call(req); - grpc::ClientContext client_context; - auto stream_resp = context.getCluster()->rpc_client->sendStreamRequest(sender_task->address(), &client_context, call); - - stream_resp->WaitForInitialMetadata(); - - mpp::MPPDataPacket packet; - - bool needRetry = false; - while (stream_resp->Read(&packet)) - { - if (packet.has_error()) // This is the only way that down stream pass an error. - { - if (canRetry(packet.error())) - { - needRetry = true; - break; - } - throw Exception("exchange receiver meet error : " + packet.error().msg()); - } - - LOG_DEBUG(log, "read success"); - decodePacket(packet); - } - if (needRetry) - { - if (timeout.count() > 0 && total_wait_time > timeout) - { - break; - } - using namespace std::chrono_literals; - std::this_thread::sleep_for(1s); - total_wait_time += 1s; - stream_resp->Finish(); - continue; - } - - LOG_DEBUG(log, "finish worker success" << std::to_string(live_workers)); - return; - } - // fail - throw Exception( - "cannot build connection after several tries, total wait time is " + std::to_string(total_wait_time.count()) + "s."); - } - public: - ExchangeReceiverInputStream(Context & context_, const ::tipb::ExchangeReceiver & exc, const ::mpp::TaskMeta & meta) - : context(context_.getTMTContext()), - timeout(context_.getSettings().mpp_task_timeout), - exchange_receiver(exc), - task_meta(meta), - live_workers(0), - inited(false), - meet_error(false), - log(&Logger::get("exchange_receiver")) + ExchangeReceiverInputStream(std::shared_ptr receiver_) : receiver(std::move(receiver_)) { - // generate sample block ColumnsWithTypeAndName columns; - - for (int i = 0; i < exc.field_types_size(); i++) + for (auto & dag_col : receiver->getOutputSchema()) { - String name = "exchange_receiver_" + std::to_string(i); - fake_schema.push_back(std::make_pair(name, ColumnInfo())); - - auto tp = getDataTypeByFieldType(exc.field_types(i)); - ColumnWithTypeAndName col(tp, name); + auto tp = getDataTypeByColumnInfo(dag_col.second); + ColumnWithTypeAndName col(tp, dag_col.first); columns.emplace_back(col); } - sample_block = Block(columns); } - ~ExchangeReceiverInputStream() - { - for (auto & worker : workers) - { - worker.join(); - } - } - Block getHeader() const override { return sample_block; } String getName() const override { return "ExchangeReceiver"; } - void init() - { - int task_size = exchange_receiver.encoded_task_meta_size(); - for (int i = 0; i < task_size; i++) - { - live_workers++; - std::thread t(&ExchangeReceiverInputStream::startAndRead, this, std::ref(exchange_receiver.encoded_task_meta(i))); - workers.push_back(std::move(t)); - } - inited = true; - } - - Block readImpl() override - { - if (!inited) - init(); - std::unique_lock lk(rw_mu); - cv.wait(lk, [&] { return block_buffer.size() > 0 || live_workers == 0 || meet_error; }); - if (meet_error) - { - throw err; - } - if (block_buffer.empty()) - { - return {}; - } - Block blk = block_buffer.front(); - block_buffer.pop(); - return blk; - } + Block readImpl() override { return receiver->nextBlock(); } }; } // namespace DB diff --git a/dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp b/dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp index 43d7fa40b85..23b95c1ee57 100644 --- a/dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp +++ b/dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp @@ -1289,11 +1289,15 @@ void DAGQueryBlockInterpreter::executeImpl(Pipeline & pipeline) } else if (query_block.source->tp() == tipb::ExecType::TypeExchangeReceiver) { - auto exchange_receiver_stream = std::make_shared( - context, query_block.source->exchange_receiver(), dag.getDAGContext().getMPPTaskMeta()); - pipeline.streams.push_back(exchange_receiver_stream); + auto exchange_receiver + = std::make_shared(context, query_block.source->exchange_receiver(), dag.getDAGContext().getMPPTaskMeta()); + // todo choose a more reasonable stream number + for (size_t i = 0; i < max_streams; i++) + { + pipeline.streams.push_back(std::make_shared(exchange_receiver)); + } std::vector source_columns; - Block block = exchange_receiver_stream->getHeader(); + Block block = pipeline.firstStream()->getHeader(); for (const auto & col : block.getColumnsWithTypeAndName()) { source_columns.emplace_back(NameAndTypePair(col.name, col.type)); diff --git a/dbms/src/Flash/Mpp/ExchangeReceiver.h b/dbms/src/Flash/Mpp/ExchangeReceiver.h new file mode 100644 index 00000000000..e4fec910ce5 --- /dev/null +++ b/dbms/src/Flash/Mpp/ExchangeReceiver.h @@ -0,0 +1,251 @@ +#pragma once + +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include + +#pragma GCC diagnostic push +#pragma GCC diagnostic ignored "-Wunused-parameter" +#include +#include +#include +#include + +#pragma GCC diagnostic pop + +namespace pingcap +{ +namespace kv +{ + +template <> +struct RpcTypeTraits<::mpp::EstablishMPPConnectionRequest> +{ + using RequestType = ::mpp::EstablishMPPConnectionRequest; + using ResultType = ::mpp::MPPDataPacket; + static std::unique_ptr<::grpc::ClientReader<::mpp::MPPDataPacket>> doRPCCall( + grpc::ClientContext * context, std::shared_ptr client, const RequestType & req) + { + return client->stub->EstablishMPPConnection(context, req); + } +}; + +} // namespace kv +} // namespace pingcap + +namespace DB +{ + +class ExchangeReceiver +{ + TMTContext & context; + std::chrono::seconds timeout; + + tipb::ExchangeReceiver pb_exchange_receiver; + ::mpp::TaskMeta task_meta; + std::vector workers; + + DAGSchema schema; + + // TODO: should be a concurrency bounded queue. + std::mutex rw_mu; + std::condition_variable cv; + std::queue block_buffer; + std::atomic_int live_workers; + bool inited; + bool meet_error; + Exception err; + + Logger * log; + + void decodePacket(const mpp::MPPDataPacket & p) + { + tipb::SelectResponse resp; + resp.ParseFromString(p.data()); + int chunks_size = resp.chunks_size(); + LOG_DEBUG(log, "get chunk size " + std::to_string(chunks_size)); + if (chunks_size == 0) + return; + for (int i = 0; i < chunks_size; i++) + { + Block block; + const tipb::Chunk & chunk = resp.chunks(i); + switch (resp.encode_type()) + { + case tipb::EncodeType::TypeCHBlock: + block = CHBlockChunkCodec().decode(chunk, schema); + break; + case tipb::EncodeType::TypeChunk: + block = ArrowChunkCodec().decode(chunk, schema); + break; + case tipb::EncodeType::TypeDefault: + block = DefaultChunkCodec().decode(chunk, schema); + break; + default: + throw Exception("Unsupported encode type", ErrorCodes::LOGICAL_ERROR); + } + std::lock_guard lk(rw_mu); + block_buffer.push(std::move(block)); + cv.notify_one(); + } + } + + // Check this error is retryable + bool canRetry(const mpp::Error & error) { return error.msg().find("can't find") != std::string::npos; } + + void startAndRead(const String & raw) + { + try + { + startAndReadImpl(raw); + } + catch (Exception & e) + { + meet_error = true; + err = e; + } + catch (std::exception & e) + { + meet_error = true; + err = Exception(e.what()); + } + catch (...) + { + meet_error = true; + err = Exception("fatal error"); + } + live_workers--; + cv.notify_all(); + } + + void startAndReadImpl(const String & raw) + { + // TODO: Retry backoff. + int max_retry = 60; + std::chrono::seconds total_wait_time{}; + for (int idx = 0; idx < max_retry; idx++) + { + auto sender_task = new mpp::TaskMeta(); + sender_task->ParseFromString(raw); + auto req = std::make_shared(); + req->set_allocated_receiver_meta(new mpp::TaskMeta(task_meta)); + req->set_allocated_sender_meta(sender_task); + LOG_DEBUG(log, "begin start and read : " << req->DebugString()); + pingcap::kv::RpcCall call(req); + grpc::ClientContext client_context; + auto stream_resp = context.getCluster()->rpc_client->sendStreamRequest(sender_task->address(), &client_context, call); + + stream_resp->WaitForInitialMetadata(); + + mpp::MPPDataPacket packet; + + bool needRetry = false; + while (stream_resp->Read(&packet)) + { + if (packet.has_error()) // This is the only way that down stream pass an error. + { + if (canRetry(packet.error())) + { + needRetry = true; + break; + } + throw Exception("exchange receiver meet error : " + packet.error().msg()); + } + + LOG_DEBUG(log, "read success"); + decodePacket(packet); + } + if (needRetry) + { + if (timeout.count() > 0 && total_wait_time > timeout) + { + break; + } + using namespace std::chrono_literals; + std::this_thread::sleep_for(1s); + total_wait_time += 1s; + stream_resp->Finish(); + continue; + } + + LOG_DEBUG(log, "finish worker success" << std::to_string(live_workers)); + return; + } + // fail + throw Exception( + "cannot build connection after several tries, total wait time is " + std::to_string(total_wait_time.count()) + "s."); + } + +public: + ExchangeReceiver(Context & context_, const ::tipb::ExchangeReceiver & exc, const ::mpp::TaskMeta & meta) + : context(context_.getTMTContext()), + timeout(context_.getSettings().mpp_task_timeout), + pb_exchange_receiver(exc), + task_meta(meta), + live_workers(0), + inited(false), + meet_error(false), + log(&Logger::get("exchange_receiver")) + { + for (int i = 0; i < exc.field_types_size(); i++) + { + String name = "exchange_receiver_" + std::to_string(i); + ColumnInfo info = fieldTypeToColumnInfo(exc.field_types(i)); + schema.push_back(std::make_pair(name, info)); + } + } + + ~ExchangeReceiver() + { + for (auto & worker : workers) + { + worker.join(); + } + } + + const DAGSchema & getOutputSchema() const { return schema; } + + void init() + { + std::lock_guard lk(rw_mu); + if (!inited) + { + int task_size = pb_exchange_receiver.encoded_task_meta_size(); + for (int i = 0; i < task_size; i++) + { + live_workers++; + std::thread t(&ExchangeReceiver::startAndRead, this, std::ref(pb_exchange_receiver.encoded_task_meta(i))); + workers.push_back(std::move(t)); + } + inited = true; + } + } + + Block nextBlock() + { + if (!inited) + init(); + std::unique_lock lk(rw_mu); + cv.wait(lk, [&] { return block_buffer.size() > 0 || live_workers == 0 || meet_error; }); + if (meet_error) + { + throw err; + } + if (block_buffer.empty()) + { + return {}; + } + Block blk = block_buffer.front(); + block_buffer.pop(); + return blk; + } +}; +} // namespace DB