diff --git a/src/common/thrift/CMakeLists.txt b/src/common/thrift/CMakeLists.txt index 54fad577109..b281fad2634 100644 --- a/src/common/thrift/CMakeLists.txt +++ b/src/common/thrift/CMakeLists.txt @@ -1,5 +1,4 @@ nebula_add_library( thrift_obj OBJECT - ReconnectingRequestChannel.cpp ThriftClientManager.cpp ) diff --git a/src/common/thrift/ReconnectingRequestChannel.cpp b/src/common/thrift/ReconnectingRequestChannel.cpp deleted file mode 100644 index 9586918069f..00000000000 --- a/src/common/thrift/ReconnectingRequestChannel.cpp +++ /dev/null @@ -1,92 +0,0 @@ -/* - * Copyright 2018-present Facebook, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#include - -#include - -#include - -namespace apache { -namespace thrift { - -class ReconnectingRequestChannel::RequestCallback - : public apache::thrift::RequestCallback { - public: - RequestCallback( - ReconnectingRequestChannel& channel, - std::unique_ptr cob) - : channel_(channel), impl_(channel_.impl_), cob_(std::move(cob)) {} - - void requestSent() override { - cob_->requestSent(); - } - - void replyReceived(apache::thrift::ClientReceiveState&& state) override { - handleTransportException(state); - cob_->replyReceived(std::move(state)); - } - - void requestError(apache::thrift::ClientReceiveState&& state) override { - handleTransportException(state); - cob_->requestError(std::move(state)); - } - - private: - void handleTransportException(apache::thrift::ClientReceiveState& state) { - if (!state.isException()) { - return; - } - if (!state.exception() - .is_compatible_with< - apache::thrift::transport::TTransportException>()) { - return; - } - if (channel_.impl_ != impl_) { - return; - } - channel_.impl_.reset(); - } - - ReconnectingRequestChannel& channel_; - ReconnectingRequestChannel::ImplPtr impl_; - std::unique_ptr cob_; -}; - -uint32_t ReconnectingRequestChannel::sendRequest( - apache::thrift::RpcOptions& options, - std::unique_ptr cob, - std::unique_ptr ctx, - std::unique_ptr buf, - std::shared_ptr header) { - cob = std::make_unique(*this, std::move(cob)); - - return impl().sendRequest( - options, - std::move(cob), - std::move(ctx), - std::move(buf), - std::move(header)); -} - -ReconnectingRequestChannel::Impl& ReconnectingRequestChannel::impl() { - if (!impl_ || !std::dynamic_pointer_cast(impl_)->good()) { - impl_ = implCreator_(evb_); - } - - return *impl_; -} -} // namespace thrift -} // namespace apache diff --git a/src/common/thrift/ThriftClientManager.inl b/src/common/thrift/ThriftClientManager.inl index 69615af1b8e..f7946a559cf 100644 --- a/src/common/thrift/ThriftClientManager.inl +++ b/src/common/thrift/ThriftClientManager.inl @@ -4,9 +4,9 @@ * attached with Common Clause Condition 1.0, found in the LICENSES directory. */ -#include #include #include +#include #include #include "network/NetworkUtils.h" @@ -18,51 +18,57 @@ namespace thrift { template std::shared_ptr ThriftClientManager::client( const HostAddr& host, folly::EventBase* evb, bool compatibility, uint32_t timeout) { - VLOG(2) << "Getting a client to " - << network::NetworkUtils::intToIPv4(host.first) - << ":" << host.second; - if (evb == nullptr) { evb = folly::EventBaseManager::get()->getEventBase(); } - auto it = clientMap_->find(std::make_pair(host, evb)); - if (it != clientMap_->end()) { - return it->second; - } - - // Need to create a new client auto ipAddr = network::NetworkUtils::intToIPv4(host.first); auto port = host.second; - VLOG(2) << "There is no existing client to " - << ipAddr << ":" << port - << ", trying to create one"; - auto channel = apache::thrift::ReconnectingRequestChannel::newChannel( - *evb, [compatibility, ipAddr, port, timeout] (folly::EventBase& eb) mutable { - static thread_local int connectionCount = 0; - VLOG(2) << "Connecting to " << ipAddr << ":" << port - << " for " << ++connectionCount << " times"; - std::shared_ptr socket; - eb.runImmediatelyOrRunInEventBaseThreadAndWait( - [&socket, &eb, ipAddr, port]() { - socket = apache::thrift::async::TAsyncSocket::newSocket( - &eb, ipAddr, port, FLAGS_conn_timeout_ms); - }); - auto headerClientChannel = apache::thrift::HeaderClientChannel::newChannel(socket); - if (timeout > 0) { - headerClientChannel->setTimeout(timeout); + // Get client from client manager if it is ok. + auto it = clientMap_->find(std::make_pair(host, evb)); + if (it != clientMap_->end()) { + do { + auto channel = + dynamic_cast(it->second->getChannel()); + if (channel == nullptr || !channel->good()) { + // Remove bad connection to create a new one. + clientMap_->erase(it); + VLOG(2) << "Invalid Channel: " << channel << " for host " << ipAddr << ":" << port; + break; } - if (compatibility) { - headerClientChannel->setProtocolId(apache::thrift::protocol::T_BINARY_PROTOCOL); - headerClientChannel->setClientType(THRIFT_UNFRAMED_DEPRECATED); + auto transport = dynamic_cast(channel->getTransport()); + if (transport == nullptr || transport->hangup()) { + clientMap_->erase(it); + VLOG(2) << "Transport is closed by peers " << transport << " for host " + << ipAddr << ":" << port; + break; } - return headerClientChannel; - }); - std::shared_ptr client(new ClientType(std::move(channel)), [evb](auto* p) { - evb->runImmediatelyOrRunInEventBaseThreadAndWait([p] { - delete p; - }); + VLOG(2) << "Getting a client to " << ipAddr << ":" << port; + return it->second; + } while (false); + } + + // Need to create a new client and insert it to client map. + VLOG(2) << "There is no existing client to " << host << ", trying to create one"; + static thread_local int connectionCount = 0; + + VLOG(2) << "Connecting to " << host << " for " << ++connectionCount << " times"; + std::shared_ptr socket; + evb->runImmediatelyOrRunInEventBaseThreadAndWait([&socket, evb, ipAddr, port]() { + socket = apache::thrift::async::TAsyncSocket::newSocket( + evb, ipAddr, port, FLAGS_conn_timeout_ms); }); + auto headerClientChannel = apache::thrift::HeaderClientChannel::newChannel(socket); + if (timeout > 0) { + headerClientChannel->setTimeout(timeout); + } + if (compatibility) { + headerClientChannel->setProtocolId(apache::thrift::protocol::T_BINARY_PROTOCOL); + headerClientChannel->setClientType(THRIFT_UNFRAMED_DEPRECATED); + } + std::shared_ptr client( + new ClientType(std::move(headerClientChannel)), + [evb](auto* p) { evb->runImmediatelyOrRunInEventBaseThreadAndWait([p] { delete p; }); }); clientMap_->emplace(std::make_pair(host, evb), client); return client; } diff --git a/src/meta/client/MetaClient.cpp b/src/meta/client/MetaClient.cpp index 35c721100c1..a59af1d5797 100644 --- a/src/meta/client/MetaClient.cpp +++ b/src/meta/client/MetaClient.cpp @@ -16,7 +16,7 @@ #include -DEFINE_int32(heartbeat_interval_secs, 3, "Heartbeat interval"); +DEFINE_int32(heartbeat_interval_secs, 10, "Heartbeat interval"); DEFINE_int32(meta_client_retry_times, 3, "meta client retry times, 0 means no retry"); DEFINE_int32(meta_client_retry_interval_secs, 1, "meta client sleep interval between retry"); DEFINE_int32(meta_client_timeout_ms, 60 * 1000, "meta client timeout"); @@ -1828,8 +1828,8 @@ folly::Future> MetaClient::heartbeat() { leaderIds_.clear(); leaderIds_ = leaderIds; } - req.set_leader_partIds(std::move(leaderIds)); } + req.set_leader_partIds(std::move(leaderIds)); } else { req.set_leader_partIds(std::move(leaderIds)); }