From 304840ee5dfe62f33e3d915c92fb7eeba83eda2b Mon Sep 17 00:00:00 2001 From: Doodle <13706157+critical27@users.noreply.github.com> Date: Mon, 8 Mar 2021 15:51:16 +0800 Subject: [PATCH 1/3] reconnect when EOF --- .../thrift/ReconnectingRequestChannel.cpp | 2 +- src/common/thrift/ThriftClientManager.inl | 103 ++++++++++-------- 2 files changed, 58 insertions(+), 47 deletions(-) diff --git a/src/common/thrift/ReconnectingRequestChannel.cpp b/src/common/thrift/ReconnectingRequestChannel.cpp index 958691806..5481924a1 100644 --- a/src/common/thrift/ReconnectingRequestChannel.cpp +++ b/src/common/thrift/ReconnectingRequestChannel.cpp @@ -82,7 +82,7 @@ uint32_t ReconnectingRequestChannel::sendRequest( } ReconnectingRequestChannel::Impl& ReconnectingRequestChannel::impl() { - if (!impl_ || !std::dynamic_pointer_cast(impl_)->good()) { + if (!impl_) { impl_ = implCreator_(evb_); } diff --git a/src/common/thrift/ThriftClientManager.inl b/src/common/thrift/ThriftClientManager.inl index 256e2e573..435b72df1 100644 --- a/src/common/thrift/ThriftClientManager.inl +++ b/src/common/thrift/ThriftClientManager.inl @@ -7,6 +7,7 @@ #include #include #include +#include #include #include "common/network/NetworkUtils.h" @@ -18,62 +19,72 @@ namespace thrift { template std::shared_ptr ThriftClientManager::client( const HostAddr& host, folly::EventBase* evb, bool compatibility, uint32_t timeout) { - VLOG(2) << "Getting a client to " << host; - if (evb == nullptr) { evb = folly::EventBaseManager::get()->getEventBase(); } - + // Get client from client manager if it is ok. auto it = clientMap_->find(std::make_pair(host, evb)); if (it != clientMap_->end()) { - return it->second; + do { + auto channel = + dynamic_cast(it->second->getChannel()); + if (channel == nullptr || !channel->good()) { + // Remove bad connection to create a new one. + clientMap_->erase(it); + LOG(ERROR) << "Invalid Channel: " << channel << " for host: " << host; + break; + } + auto transport = dynamic_cast(channel->getTransport()); + if (transport == nullptr || transport->hangup()) { + clientMap_->erase(it); + LOG(ERROR) << "Transport is closed by peers " << transport << " for host: " << host; + break; + } + VLOG(2) << "Getting a client to " << host; + return it->second; + } while (false); } - // Need to create a new client + // Need to create a new client and insert it to client map. VLOG(2) << "There is no existing client to " << host << ", trying to create one"; - auto channel = apache::thrift::ReconnectingRequestChannel::newChannel( - *evb, [compatibility, host = host, timeout] (folly::EventBase& eb) mutable { - static thread_local int connectionCount = 0; - - /* - * TODO(liuyu): folly said 'resolve' may take second to finish - * if this really happen, we will add a cache here. - * */ - if (!folly::IPAddress::validate(host.host)) { - try { - folly::SocketAddress socketAddr(host.host, host.port, true); - std::ostringstream oss; - oss << "resolve " << host << " as "; - host.host = socketAddr.getAddressStr(); - oss << host; - LOG(INFO) << oss.str(); - } catch(const std::exception& e) { - LOG(ERROR) << e.what(); - } - } + static thread_local int connectionCount = 0; + /* + * TODO(liuyu): folly said 'resolve' may take second to finish + * if this really happen, we will add a cache here. + * */ + HostAddr tempHost = host; + if (!folly::IPAddress::validate(tempHost.host)) { + try { + folly::SocketAddress socketAddr(tempHost.host, tempHost.port, true); + std::ostringstream oss; + oss << "resolve " << tempHost << " as "; + tempHost.host = socketAddr.getAddressStr(); + oss << tempHost; + LOG(INFO) << oss.str(); + } catch(const std::exception& e) { + LOG(ERROR) << e.what(); + return nullptr; + } + } - VLOG(2) << "Connecting to " << host << " for " << ++connectionCount << " times"; - std::shared_ptr socket; - eb.runImmediatelyOrRunInEventBaseThreadAndWait( - [&socket, &eb, host]() { - socket = apache::thrift::async::TAsyncSocket::newSocket( - &eb, host.host, host.port, FLAGS_conn_timeout_ms); - }); - auto headerClientChannel = apache::thrift::HeaderClientChannel::newChannel(socket); - if (timeout > 0) { - headerClientChannel->setTimeout(timeout); - } - if (compatibility) { - headerClientChannel->setProtocolId(apache::thrift::protocol::T_BINARY_PROTOCOL); - headerClientChannel->setClientType(THRIFT_UNFRAMED_DEPRECATED); - } - return headerClientChannel; + VLOG(2) << "Connecting to " << host << " for " << ++connectionCount << " times"; + std::shared_ptr socket; + evb->runImmediatelyOrRunInEventBaseThreadAndWait( + [&socket, evb, host]() { + socket = apache::thrift::async::TAsyncSocket::newSocket( + evb, host.host, host.port, FLAGS_conn_timeout_ms); }); - std::shared_ptr client(new ClientType(std::move(channel)), [evb](auto* p) { - evb->runImmediatelyOrRunInEventBaseThreadAndWait([p] { - delete p; - }); - }); + auto headerClientChannel = apache::thrift::HeaderClientChannel::newChannel(socket); + if (timeout > 0) { + headerClientChannel->setTimeout(timeout); + } + if (compatibility) { + headerClientChannel->setProtocolId(apache::thrift::protocol::T_BINARY_PROTOCOL); + headerClientChannel->setClientType(THRIFT_UNFRAMED_DEPRECATED); + } + std::shared_ptr client( + new ClientType(std::move(headerClientChannel)), + [evb](auto* p) { evb->runImmediatelyOrRunInEventBaseThreadAndWait([p] { delete p; }); }); clientMap_->emplace(std::make_pair(host, evb), client); return client; } From f2d0236c360fbc9f3450a8f4accee753556fff92 Mon Sep 17 00:00:00 2001 From: Doodle <13706157+critical27@users.noreply.github.com> Date: Wed, 10 Mar 2021 18:39:49 +0800 Subject: [PATCH 2/3] just delete ReconnectingRequestChannel --- src/common/thrift/CMakeLists.txt | 1 - .../thrift/ReconnectingRequestChannel.cpp | 92 ------------------- src/common/thrift/ThriftClientManager.inl | 1 - 3 files changed, 94 deletions(-) delete mode 100644 src/common/thrift/ReconnectingRequestChannel.cpp diff --git a/src/common/thrift/CMakeLists.txt b/src/common/thrift/CMakeLists.txt index 9940f92fb..4dc127c80 100644 --- a/src/common/thrift/CMakeLists.txt +++ b/src/common/thrift/CMakeLists.txt @@ -5,6 +5,5 @@ nebula_add_library( thrift_obj OBJECT - ReconnectingRequestChannel.cpp ThriftClientManager.cpp ) diff --git a/src/common/thrift/ReconnectingRequestChannel.cpp b/src/common/thrift/ReconnectingRequestChannel.cpp deleted file mode 100644 index 5481924a1..000000000 --- a/src/common/thrift/ReconnectingRequestChannel.cpp +++ /dev/null @@ -1,92 +0,0 @@ -/* - * Copyright 2018-present Facebook, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#include - -#include - -#include - -namespace apache { -namespace thrift { - -class ReconnectingRequestChannel::RequestCallback - : public apache::thrift::RequestCallback { - public: - RequestCallback( - ReconnectingRequestChannel& channel, - std::unique_ptr cob) - : channel_(channel), impl_(channel_.impl_), cob_(std::move(cob)) {} - - void requestSent() override { - cob_->requestSent(); - } - - void replyReceived(apache::thrift::ClientReceiveState&& state) override { - handleTransportException(state); - cob_->replyReceived(std::move(state)); - } - - void requestError(apache::thrift::ClientReceiveState&& state) override { - handleTransportException(state); - cob_->requestError(std::move(state)); - } - - private: - void handleTransportException(apache::thrift::ClientReceiveState& state) { - if (!state.isException()) { - return; - } - if (!state.exception() - .is_compatible_with< - apache::thrift::transport::TTransportException>()) { - return; - } - if (channel_.impl_ != impl_) { - return; - } - channel_.impl_.reset(); - } - - ReconnectingRequestChannel& channel_; - ReconnectingRequestChannel::ImplPtr impl_; - std::unique_ptr cob_; -}; - -uint32_t ReconnectingRequestChannel::sendRequest( - apache::thrift::RpcOptions& options, - std::unique_ptr cob, - std::unique_ptr ctx, - std::unique_ptr buf, - std::shared_ptr header) { - cob = std::make_unique(*this, std::move(cob)); - - return impl().sendRequest( - options, - std::move(cob), - std::move(ctx), - std::move(buf), - std::move(header)); -} - -ReconnectingRequestChannel::Impl& ReconnectingRequestChannel::impl() { - if (!impl_) { - impl_ = implCreator_(evb_); - } - - return *impl_; -} -} // namespace thrift -} // namespace apache diff --git a/src/common/thrift/ThriftClientManager.inl b/src/common/thrift/ThriftClientManager.inl index 435b72df1..fc4809187 100644 --- a/src/common/thrift/ThriftClientManager.inl +++ b/src/common/thrift/ThriftClientManager.inl @@ -4,7 +4,6 @@ * attached with Common Clause Condition 1.0, found in the LICENSES directory. */ -#include #include #include #include From cc13cba91394f44e7941fdff731e47a497656546 Mon Sep 17 00:00:00 2001 From: Doodle <13706157+critical27@users.noreply.github.com> Date: Thu, 11 Mar 2021 17:30:08 +0800 Subject: [PATCH 3/3] address @panda-sheep's comments, rebased --- src/common/thrift/ThriftClientManager.inl | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/src/common/thrift/ThriftClientManager.inl b/src/common/thrift/ThriftClientManager.inl index fc4809187..4bdc834ca 100644 --- a/src/common/thrift/ThriftClientManager.inl +++ b/src/common/thrift/ThriftClientManager.inl @@ -51,14 +51,14 @@ std::shared_ptr ThriftClientManager::client( * TODO(liuyu): folly said 'resolve' may take second to finish * if this really happen, we will add a cache here. * */ - HostAddr tempHost = host; - if (!folly::IPAddress::validate(tempHost.host)) { + HostAddr resolved = host; + if (!folly::IPAddress::validate(resolved.host)) { try { - folly::SocketAddress socketAddr(tempHost.host, tempHost.port, true); + folly::SocketAddress socketAddr(resolved.host, resolved.port, true); std::ostringstream oss; - oss << "resolve " << tempHost << " as "; - tempHost.host = socketAddr.getAddressStr(); - oss << tempHost; + oss << "resolve " << resolved << " as "; + resolved.host = socketAddr.getAddressStr(); + oss << resolved; LOG(INFO) << oss.str(); } catch(const std::exception& e) { LOG(ERROR) << e.what(); @@ -69,9 +69,9 @@ std::shared_ptr ThriftClientManager::client( VLOG(2) << "Connecting to " << host << " for " << ++connectionCount << " times"; std::shared_ptr socket; evb->runImmediatelyOrRunInEventBaseThreadAndWait( - [&socket, evb, host]() { + [&socket, evb, resolved]() { socket = apache::thrift::async::TAsyncSocket::newSocket( - evb, host.host, host.port, FLAGS_conn_timeout_ms); + evb, resolved.host, resolved.port, FLAGS_conn_timeout_ms); }); auto headerClientChannel = apache::thrift::HeaderClientChannel::newChannel(socket); if (timeout > 0) { @@ -84,7 +84,7 @@ std::shared_ptr ThriftClientManager::client( std::shared_ptr client( new ClientType(std::move(headerClientChannel)), [evb](auto* p) { evb->runImmediatelyOrRunInEventBaseThreadAndWait([p] { delete p; }); }); - clientMap_->emplace(std::make_pair(host, evb), client); + clientMap_->emplace(std::make_pair(resolved, evb), client); return client; }