Skip to content
This repository has been archived by the owner on Apr 17, 2019. It is now read-only.

Resending votes in consensus #2200

Open
wants to merge 1 commit into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions irohad/consensus/yac/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ target_link_libraries(yac

add_library(yac_transport
transport/impl/network_impl.cpp
transport/impl/yac_network_sender.cpp
impl/yac_crypto_provider_impl.cpp
)
target_link_libraries(yac_transport
Expand Down
2 changes: 1 addition & 1 deletion irohad/consensus/yac/cluster_order.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ namespace iroha {
/**
* Provide current leader peer
*/
const shared_model::interface::Peer &currentLeader();
const std::shared_ptr<shared_model::interface::Peer> &currentLeader();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would make

Suggested change
const std::shared_ptr<shared_model::interface::Peer> &currentLeader();
std::shared_ptr<shared_model::interface::Peer> currentLeader();

as returning a reference to a local object is scary, and then

Suggested change
const std::shared_ptr<shared_model::interface::Peer> &currentLeader();
std::shared_ptr<const shared_model::interface::Peer> currentLeader();

as we do not want anyone to change the object (no matter it is an interface)


/**
* Switch to next peer as leader
Expand Down
5 changes: 3 additions & 2 deletions irohad/consensus/yac/impl/cluster_order.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,12 @@ namespace iroha {
: order_(std::move(order)) {}

// TODO : 24/03/2018 x3medima17: make it const, IR-1164
const shared_model::interface::Peer &ClusterOrdering::currentLeader() {
const std::shared_ptr<shared_model::interface::Peer>
&ClusterOrdering::currentLeader() {
if (index_ >= order_.size()) {
index_ = 0;
}
return *order_.at(index_);
return order_.at(index_);
}

bool ClusterOrdering::hasNext() const {
Expand Down
14 changes: 7 additions & 7 deletions irohad/consensus/yac/impl/yac.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -147,8 +147,7 @@ namespace iroha {
vote.hash.vote_hashes.proposal_hash,
vote.hash.vote_hashes.block_hash,
current_leader);

network_->sendState(current_leader, {vote});
propagateStateDirectly(current_leader, {vote});
cluster_order_.switchToNext();
if (cluster_order_.hasNext()) {
timer_->invokeAfterDelay([this, vote] { this->votingStep(vote); });
Expand Down Expand Up @@ -246,7 +245,7 @@ namespace iroha {
last_round,
from->address());
auto votes = [](const auto &state) { return state.votes; };
this->propagateStateDirectly(*from,
this->propagateStateDirectly(from,
visit_in_place(last_state, votes));
};
};
Expand All @@ -258,13 +257,14 @@ namespace iroha {

void Yac::propagateState(const std::vector<VoteMessage> &msg) {
for (const auto &peer : cluster_order_.getPeers()) {
propagateStateDirectly(*peer, msg);
propagateStateDirectly(peer, msg);
}
}

void Yac::propagateStateDirectly(const shared_model::interface::Peer &to,
const std::vector<VoteMessage> &msg) {
network_->sendState(to, msg);
void Yac::propagateStateDirectly(
std::shared_ptr<shared_model::interface::Peer> to,
const std::vector<VoteMessage> &msg) {
network_->sendState(std::move(to), msg);
}

} // namespace yac
Expand Down
61 changes: 55 additions & 6 deletions irohad/consensus/yac/transport/impl/network_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,9 @@ namespace iroha {
handler_ = handler;
}

void NetworkImpl::sendState(const shared_model::interface::Peer &to,
const std::vector<VoteMessage> &state) {
YacNetworkWithFeedBack::SendStateReturnType
NetworkImpl::sendState(const shared_model::interface::Peer &to,
const std::vector<VoteMessage> &state) {
createPeerConnection(to);

proto::State request;
Expand All @@ -42,12 +43,60 @@ namespace iroha {
*pb_vote = PbConverters::serializeVote(vote);
}

async_call_->Call([&](auto context, auto cq) {
return peers_.at(to.address())->AsyncSendState(context, request, cq);
});

log_->info(
"Send votes bundle[size={}] to {}", state.size(), to.address());

auto log_outcome = [log = log_, destination_peer = to.toString()](
const grpc::Status &status) {
log->info("Sent to {} with status details [{}]",
destination_peer,
status.ok() ? "OK" : status.error_details());
};

return async_call_
->Call([&](auto context, auto cq) {
return peers_.at(to.address())
->AsyncSendState(context, request, cq);
})
.tap(log_outcome)
.map(
[](const auto &status) { return makeSendStateStatus(status); });
}

YacNetworkWithFeedBack::ValueStateReturnType
NetworkImpl::makeSendStateStatus(const grpc::Status &status) {
auto is_ok = [](const auto &code) {
return code == grpc::StatusCode::OK;
};

auto is_troubles_with_recipient = [](const auto &code) {
using namespace grpc;
std::vector<int> codes = {StatusCode::CANCELLED,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
std::vector<int> codes = {StatusCode::CANCELLED,
std::set<StatusCode> codes = {StatusCode::CANCELLED,

performs better lookup

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems that set is more appropriate because it is a more reliable way to stay unique constants.

StatusCode::INVALID_ARGUMENT,
StatusCode::UNAUTHENTICATED,
StatusCode::RESOURCE_EXHAUSTED,
StatusCode::ABORTED,
StatusCode::UNIMPLEMENTED,
StatusCode::UNAVAILABLE,
StatusCode::DATA_LOSS};
return std::any_of(codes.begin(), codes.end(), [code](auto val) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
return std::any_of(codes.begin(), codes.end(), [code](auto val) {
return std::find(codes.begin(), codes.end(), code) != codes.end();

or, if using set,

Suggested change
return std::any_of(codes.begin(), codes.end(), [code](auto val) {
return codes.find(code) != codes.end();

which is faster

return code == val;
});
};

auto code = status.error_code();

using namespace iroha::consensus::yac::sending_statuses;

if (is_ok(code)) {
return SuccessfulSent();
}

if (is_troubles_with_recipient(code)) {
return UnavailableReceiver();
}

return UnavailableNetwork();
}

grpc::Status NetworkImpl::SendState(
Expand Down
11 changes: 8 additions & 3 deletions irohad/consensus/yac/transport/impl/network_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ namespace iroha {
* Class which provides implementation of transport for consensus based on
* grpc
*/
class NetworkImpl : public YacNetwork, public proto::Yac::Service {
class NetworkImpl : public YacNetworkWithFeedBack,
public proto::Yac::Service {
public:
explicit NetworkImpl(
std::shared_ptr<network::AsyncGrpcClient<google::protobuf::Empty>>
Expand All @@ -37,8 +38,9 @@ namespace iroha {
void subscribe(
std::shared_ptr<YacNetworkNotifications> handler) override;

void sendState(const shared_model::interface::Peer &to,
const std::vector<VoteMessage> &state) override;
YacNetworkWithFeedBack::SendStateReturnType sendState(
const shared_model::interface::Peer &to,
const std::vector<VoteMessage> &state) override;

/**
* Receive votes from another peer;
Expand All @@ -58,6 +60,9 @@ namespace iroha {
*/
void createPeerConnection(const shared_model::interface::Peer &peer);

static YacNetworkWithFeedBack::ValueStateReturnType makeSendStateStatus(
const grpc::Status &);

/**
* Mapping of peer objects to connections
*/
Expand Down
46 changes: 46 additions & 0 deletions irohad/consensus/yac/transport/impl/yac_network_sender.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/**
* Copyright Soramitsu Co., Ltd. All Rights Reserved.
* SPDX-License-Identifier: Apache-2.0
*/

#include "consensus/yac/transport/impl/yac_network_sender.hpp"

#include "common/visitor.hpp"
#include "consensus/yac/vote_message.hpp"

using namespace iroha::consensus::yac;

YacNetworkSender::YacNetworkSender(std::shared_ptr<TransportType> transport)
: transport_(std::move(transport)) {}

void YacNetworkSender::subscribe(
std::shared_ptr<YacNetworkNotifications> handler) {
transport_->subscribe(std::move(handler));
}

void YacNetworkSender::sendState(PeerType to, StateType state) {
sendStateViaTransport(
to, std::make_shared<StateType>(std::move(state)), transport_);
}

void YacNetworkSender::sendStateViaTransport(
PeerType to,
StateInCollectionType state,
std::shared_ptr<TransportType> transport) {
transport->sendState(*to, *state)
.subscribe([transport = transport, to, state](const auto &result) {
iroha::visit_in_place(
result,
[transport, to, state](
const sending_statuses::UnavailableNetwork &) {
// assume the message is undelivered if troubles occur with our
// connection then it will resend the message

sendStateViaTransport(to, state, transport);
},
[&](const auto &) {
// if message delivers or recipient peer goes down then it
// will stop resending the message
});
});
}
68 changes: 68 additions & 0 deletions irohad/consensus/yac/transport/impl/yac_network_sender.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/**
* Copyright Soramitsu Co., Ltd. All Rights Reserved.
* SPDX-License-Identifier: Apache-2.0
*/

#ifndef IROHA_YAC_NETWORK_SENDER_HPP
#define IROHA_YAC_NETWORK_SENDER_HPP

#include "consensus/yac/transport/yac_network_interface.hpp"

#include <memory>
#include <rxcpp/rx.hpp>
#include <unordered_map>

namespace iroha {
namespace consensus {
namespace yac {
/**
* Transport layer wrapper which retries to send messages if the network
* shut down
*/
class YacNetworkSender : public YacNetwork {
public:
/// type of low transport level
using TransportType = YacNetworkWithFeedBack;

/// type of peer structure
using PeerType = std::shared_ptr<shared_model::interface::Peer>;

/// type of state
using StateType = std::vector<VoteMessage>;

YacNetworkSender(const YacNetworkSender &) = delete;
YacNetworkSender(YacNetworkSender &&) = delete;
YacNetworkSender &operator=(const YacNetworkSender &) = delete;
YacNetworkSender &operator=(YacNetworkSender &&) = delete;

/**
* Creates transport with redelivery property
* @param transport - instance of effective transport
*/
YacNetworkSender(std::shared_ptr<TransportType> transport);

void subscribe(
std::shared_ptr<YacNetworkNotifications> handler) override;

void sendState(PeerType to, StateType state) override;

private:
using StateInCollectionType = std::shared_ptr<StateType>;
using StatesCollection =
std::unordered_map<PeerType, StateInCollectionType>;

static void sendStateViaTransport(
PeerType to,
StateInCollectionType state,
std::shared_ptr<TransportType> transport);

// ------------------------| Global state | ----------------------------
std::shared_ptr<TransportType> transport_;

// ------------------------| Current state | ---------------------------
StatesCollection undelivered_states_;
};
} // namespace yac
} // namespace consensus
} // namespace iroha
#endif // IROHA_YAC_NETWORK_SENDER_HPP
58 changes: 56 additions & 2 deletions irohad/consensus/yac/transport/yac_network_interface.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@
#ifndef IROHA_YAC_NETWORK_INTERFACE_HPP
#define IROHA_YAC_NETWORK_INTERFACE_HPP

#include <boost/variant.hpp>
#include <memory>
#include <rxcpp/rx.hpp>
#include <vector>

namespace shared_model {
Expand All @@ -22,6 +24,8 @@ namespace iroha {
struct VoteMessage;

class YacNetworkNotifications {
// TODO: 2019-03-20 @muratovv add method virtual void
// updatePeerList(...) IR-412
public:
/**
* Callback on receiving collection of votes
Expand All @@ -33,6 +37,8 @@ namespace iroha {
};

class YacNetwork {
// TODO: 2019-03-20 @muratovv add method virtual void
// updatePeerList(...) IR-412
public:
virtual void subscribe(
std::shared_ptr<YacNetworkNotifications> handler) = 0;
Expand All @@ -42,14 +48,62 @@ namespace iroha {
* @param to - peer recipient
* @param state - message for sending
*/
virtual void sendState(const shared_model::interface::Peer &to,
const std::vector<VoteMessage> &state) = 0;
virtual void sendState(
std::shared_ptr<shared_model::interface::Peer> to,
std::vector<VoteMessage> state) = 0;

// TODO: add method virtual void updatePeerList();

/**
* Virtual destructor required for inheritance
*/
virtual ~YacNetwork() = default;
};

/// namespace contains statuses of sending messages
namespace sending_statuses {
/// status presents successful delivery of a message
struct SuccessfulSent {};

/// status presents that something happens with our network connection
struct UnavailableNetwork {};

/// status presents that recipient peer shut down or has bad connection
struct UnavailableReceiver {};
} // namespace sending_statuses

/**
* The interface introduces blocking approach for YAC transport
*/
class YacNetworkWithFeedBack {
// TODO: 2019-03-20 @muratovv add method virtual void
// updatePeerList(...) IR-412
public:
virtual void subscribe(
std::shared_ptr<YacNetworkNotifications> handler) = 0;

using ValueStateReturnType =
boost::variant<sending_statuses::SuccessfulSent,
sending_statuses::UnavailableNetwork,
sending_statuses::UnavailableReceiver>;

using SendStateReturnType = rxcpp::observable<ValueStateReturnType>;

/**
* Directly share collection of votes.
* Note: method assumes blocking approach for the propagation
* @param to - peer recipient
* @param state - message for sending
* @return status of sending
*/
virtual SendStateReturnType sendState(
const shared_model::interface::Peer &to,
const std::vector<VoteMessage> &state) = 0;

// TODO: add method virtual void updatePeerList();

~YacNetworkWithFeedBack() = default;
};
} // namespace yac
} // namespace consensus
} // namespace iroha
Expand Down
5 changes: 3 additions & 2 deletions irohad/consensus/yac/yac.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,9 @@ namespace iroha {

// ------|Propagation|------
void propagateState(const std::vector<VoteMessage> &msg);
void propagateStateDirectly(const shared_model::interface::Peer &to,
const std::vector<VoteMessage> &msg);
void propagateStateDirectly(
std::shared_ptr<shared_model::interface::Peer> to,
const std::vector<VoteMessage> &msg);
void tryPropagateBack(const std::vector<VoteMessage> &state);

// ------|Fields|------
Expand Down
Loading