Skip to content

Commit

Permalink
ENG-3117: Async DNS resolution of peers
Browse files Browse the repository at this point in the history
Summary:
Implemented async DNS resolution of address for peers.
Used in async mode during config update and election.
There are still places that use synchronous DNS resolution:
1) SetupCatalog
2) StepDown
3) NotifyLeaderAboutLostElection

Suppose that 2 and 3 could be kept in synchronous mode.

Did not test it with docker yet.
Creating diff to trigger tests and start review process.

Test Plan: Jenkins

Reviewers: mikhail, bogdan

Reviewed By: mikhail

Subscribers: ybase, bharat

Differential Revision: https://phabricator.dev.yugabyte.com/D4489
  • Loading branch information
spolitov committed Apr 12, 2018
1 parent a7d1447 commit 9ba8436
Show file tree
Hide file tree
Showing 32 changed files with 402 additions and 235 deletions.
15 changes: 8 additions & 7 deletions src/yb/common/wire_protocol.cc
Original file line number Diff line number Diff line change
Expand Up @@ -208,15 +208,15 @@ Status HostPortToPB(const HostPort& host_port, HostPortPB* host_port_pb) {
return Status::OK();
}

Status HostPortFromPB(const HostPortPB& host_port_pb, HostPort* host_port) {
host_port->set_host(host_port_pb.host());
host_port->set_port(host_port_pb.port());
return Status::OK();
HostPort HostPortFromPB(const HostPortPB& host_port_pb) {
HostPort host_port;
host_port.set_host(host_port_pb.host());
host_port.set_port(host_port_pb.port());
return host_port;
}

Status EndpointFromHostPortPB(const HostPortPB& host_portpb, Endpoint* endpoint) {
HostPort host_port;
RETURN_NOT_OK(HostPortFromPB(host_portpb, &host_port));
HostPort host_port = HostPortFromPB(host_portpb);
return EndpointFromHostPort(host_port, endpoint);
}

Expand Down Expand Up @@ -378,7 +378,8 @@ Status FindLeaderHostPort(const RepeatedPtrField<ServerEntryPB>& entries,
entry.ShortDebugString()));
}
if (entry.role() == consensus::RaftPeerPB::LEADER) {
return HostPortFromPB(entry.registration().rpc_addresses(0), leader_hostport);
*leader_hostport = HostPortFromPB(entry.registration().rpc_addresses(0));
return Status::OK();
}
}
return STATUS(NotFound, "No leader found.");
Expand Down
2 changes: 1 addition & 1 deletion src/yb/common/wire_protocol.h
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ Status StatusFromPB(const AppStatusPB& pb);
Status HostPortToPB(const HostPort& host_port, HostPortPB* host_port_pb);

// Returns the HostPort created from the specified protobuf.
Status HostPortFromPB(const HostPortPB& host_port_pb, HostPort* host_port);
HostPort HostPortFromPB(const HostPortPB& host_port_pb);

// Returns an Endpoint from HostPortPB.
CHECKED_STATUS EndpointFromHostPortPB(const HostPortPB& host_portpb, Endpoint* endpoint);
Expand Down
21 changes: 9 additions & 12 deletions src/yb/consensus/consensus-test-util.h
Original file line number Diff line number Diff line change
Expand Up @@ -388,10 +388,8 @@ class NoOpTestPeerProxyFactory : public PeerProxyFactory {
CHECK_OK(ThreadPoolBuilder("test-peer-pool").set_max_threads(3).Build(&pool_));
}

virtual CHECKED_STATUS NewProxy(const consensus::RaftPeerPB& peer_pb,
gscoped_ptr<PeerProxy>* proxy) override {
proxy->reset(new NoOpTestPeerProxy(pool_.get(), peer_pb));
return Status::OK();
void NewProxy(const RaftPeerPB& peer_pb, PeerProxyWaiter waiter) override {
waiter(PeerProxyPtr(new NoOpTestPeerProxy(pool_.get(), peer_pb)));
}

gscoped_ptr<ThreadPool> pool_;
Expand Down Expand Up @@ -595,14 +593,13 @@ class LocalTestPeerProxyFactory : public PeerProxyFactory {
CHECK_OK(ThreadPoolBuilder("test-peer-pool").set_max_threads(3).Build(&pool_));
}

virtual CHECKED_STATUS NewProxy(const consensus::RaftPeerPB& peer_pb,
gscoped_ptr<PeerProxy>* proxy) override {
LocalTestPeerProxy* new_proxy = new LocalTestPeerProxy(peer_pb.permanent_uuid(),
pool_.get(),
peers_);
proxy->reset(new_proxy);
proxies_.push_back(new_proxy);
return Status::OK();
void NewProxy(const consensus::RaftPeerPB& peer_pb, PeerProxyWaiter waiter) override {
auto new_proxy = std::make_unique<LocalTestPeerProxy>(
peer_pb.permanent_uuid(), pool_.get(), peers_);
proxies_.push_back(new_proxy.get());
ASSERT_OK(pool_->SubmitFunc([waiter, new_proxy = new_proxy.release()] {
waiter(PeerProxyPtr(new_proxy));
}));
}

virtual const vector<LocalTestPeerProxy*>& GetProxies() {
Expand Down
34 changes: 34 additions & 0 deletions src/yb/consensus/consensus_fwd.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
// Copyright (c) YugaByte, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
// in compliance with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations
// under the License.
//

#ifndef YB_CONSENSUS_CONSENSUS_FWD_H
#define YB_CONSENSUS_CONSENSUS_FWD_H

namespace yb {
namespace consensus {

class PeerProxyFactory;
class PeerMessageQueue;
class VoteRequestPB;
class VoteResponsePB;

class ConsensusServiceProxy;
typedef std::unique_ptr<ConsensusServiceProxy> ConsensusServiceProxyPtr;

class PeerProxy;
typedef std::unique_ptr<PeerProxy> PeerProxyPtr;

} // namespace consensus
} // namespace yb

#endif // YB_CONSENSUS_CONSENSUS_FWD_H
36 changes: 9 additions & 27 deletions src/yb/consensus/consensus_peers-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -123,15 +123,9 @@ class ConsensusPeersTest : public YBTest {
peer_pb.set_permanent_uuid(peer_name);
auto proxy_ptr = new DelayablePeerProxy<NoOpTestPeerProxy>(
raft_pool_.get(), new NoOpTestPeerProxy(raft_pool_.get(), peer_pb));
gscoped_ptr<PeerProxy> proxy(proxy_ptr);
CHECK_OK(Peer::NewRemotePeer(peer_pb,
kTabletId,
kLeaderUuid,
message_queue_.get(),
raft_pool_token_.get(),
proxy.Pass(),
nullptr,
peer));
*peer = CHECK_RESULT(Peer::NewRemotePeer(
peer_pb, kTabletId, kLeaderUuid, message_queue_.get(), raft_pool_token_.get(),
PeerProxyPtr(proxy_ptr), nullptr /* consensus */));
return proxy_ptr;
}

Expand Down Expand Up @@ -296,15 +290,9 @@ TEST_F(ConsensusPeersTest, TestRemotePeers) {
// and thus always has data pending, we should be able to close the peer.
TEST_F(ConsensusPeersTest, TestCloseWhenRemotePeerDoesntMakeProgress) {
auto mock_proxy = new MockedPeerProxy(raft_pool_.get());
std::unique_ptr<Peer> peer;
ASSERT_OK(Peer::NewRemotePeer(FakeRaftPeerPB(kFollowerUuid),
kTabletId,
kLeaderUuid,
message_queue_.get(),
raft_pool_token_.get(),
gscoped_ptr<PeerProxy>(mock_proxy),
nullptr, // consensus
&peer));
auto peer = ASSERT_RESULT(Peer::NewRemotePeer(
FakeRaftPeerPB(kFollowerUuid), kTabletId, kLeaderUuid, message_queue_.get(),
raft_pool_token_.get(), PeerProxyPtr(mock_proxy), nullptr /* consensus */));

// Make the peer respond without making any progress -- it always returns
// that it has only replicated op 0.0. When we see the response, we always
Expand All @@ -330,15 +318,9 @@ TEST_F(ConsensusPeersTest, TestCloseWhenRemotePeerDoesntMakeProgress) {

TEST_F(ConsensusPeersTest, TestDontSendOneRpcPerWriteWhenPeerIsDown) {
auto mock_proxy = new MockedPeerProxy(raft_pool_.get());
std::unique_ptr<Peer> peer;
ASSERT_OK(Peer::NewRemotePeer(FakeRaftPeerPB(kFollowerUuid),
kTabletId,
kLeaderUuid,
message_queue_.get(),
raft_pool_token_.get(),
gscoped_ptr<PeerProxy>(mock_proxy),
nullptr, // consensus
&peer));
auto peer = ASSERT_RESULT(Peer::NewRemotePeer(
FakeRaftPeerPB(kFollowerUuid), kTabletId, kLeaderUuid, message_queue_.get(),
raft_pool_token_.get(), PeerProxyPtr(mock_proxy), nullptr /* consensus */));

// Initial response has to be successful -- otherwise we'll consider the peer "new" and only send
// heartbeat RPCs.
Expand Down
Loading

0 comments on commit 9ba8436

Please sign in to comment.