Skip to content
This repository has been archived by the owner on Apr 17, 2019. It is now read-only.

Commit

Permalink
BFT OS trunk
Browse files Browse the repository at this point in the history
Signed-off-by: Andrei Lebedev <[email protected]>
  • Loading branch information
lebdron committed Aug 16, 2018
1 parent 9db8ba2 commit 1f6c56e
Show file tree
Hide file tree
Showing 54 changed files with 1,220 additions and 721 deletions.
10 changes: 6 additions & 4 deletions irohad/consensus/yac/cluster_order.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@
#ifndef IROHA_CLUSTER_ORDER_HPP
#define IROHA_CLUSTER_ORDER_HPP

#include <boost/optional.hpp>
#include <vector>
#include <memory>
#include <vector>

#include <boost/optional.hpp>
#include "consensus/yac/yac_types.hpp"

namespace shared_model {
namespace interface {
Expand Down Expand Up @@ -65,7 +67,7 @@ namespace iroha {
std::vector<std::shared_ptr<shared_model::interface::Peer>> getPeers()
const;

size_t getNumberOfPeers() const;
PeersNumberType getNumberOfPeers() const;

virtual ~ClusterOrdering() = default;

Expand All @@ -77,7 +79,7 @@ namespace iroha {
std::vector<std::shared_ptr<shared_model::interface::Peer>> order);

std::vector<std::shared_ptr<shared_model::interface::Peer>> order_;
uint32_t index_ = 0;
PeersNumberType index_ = 0;
};
} // namespace yac
} // namespace consensus
Expand Down
10 changes: 5 additions & 5 deletions irohad/consensus/yac/impl/supermajority_checker_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ namespace iroha {
and peersSubset(signatures, peers);
}

bool SupermajorityCheckerImpl::checkSize(uint64_t current,
uint64_t all) const {
bool SupermajorityCheckerImpl::checkSize(PeersNumberType current,
PeersNumberType all) const {
if (current > all) {
return false;
}
Expand All @@ -55,9 +55,9 @@ namespace iroha {
}));
}

bool SupermajorityCheckerImpl::hasReject(uint64_t frequent,
uint64_t voted,
uint64_t all) const {
bool SupermajorityCheckerImpl::hasReject(PeersNumberType frequent,
PeersNumberType voted,
PeersNumberType all) const {
auto not_voted = all - voted;
return not checkSize(frequent + not_voted, all);
}
Expand Down
9 changes: 5 additions & 4 deletions irohad/consensus/yac/impl/supermajority_checker_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,17 +40,18 @@ namespace iroha {
const std::vector<std::shared_ptr<shared_model::interface::Peer>>
&peers) const override;

virtual bool checkSize(uint64_t current, uint64_t all) const override;
virtual bool checkSize(PeersNumberType current,
PeersNumberType all) const override;

virtual bool peersSubset(
const shared_model::interface::types::SignatureRangeType
&signatures,
const std::vector<std::shared_ptr<shared_model::interface::Peer>>
&peers) const override;

virtual bool hasReject(uint64_t frequent,
uint64_t voted,
uint64_t all) const override;
virtual bool hasReject(PeersNumberType frequent,
PeersNumberType voted,
PeersNumberType all) const override;
};
} // namespace yac
} // namespace consensus
Expand Down
207 changes: 62 additions & 145 deletions irohad/consensus/yac/impl/yac.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,6 @@ namespace iroha {
return result;
}

template <typename T>
static std::string cryptoError(const std::initializer_list<T> &votes) {
return cryptoError<std::initializer_list<T>>(votes);
}

std::shared_ptr<Yac> Yac::create(
YacVoteStorage vote_storage,
std::shared_ptr<YacNetwork> network,
Expand Down Expand Up @@ -83,41 +78,23 @@ namespace iroha {

cluster_order_ = order;
auto vote = crypto_->getVote(hash);
// TODO 10.06.2018 andrei: IR-1407 move YAC propagation strategy to a
// separate entity
votingStep(vote);
}

rxcpp::observable<CommitMessage> Yac::on_commit() {
rxcpp::observable<Answer> Yac::onOutcome() {
return notifier_.get_observable();
}

// ------|Network notifications|------

void Yac::on_vote(VoteMessage vote) {
std::lock_guard<std::mutex> guard(mutex_);
if (crypto_->verify(vote)) {
applyVote(findPeer(vote), vote);
} else {
log_->warn(cryptoError({vote}));
}
}

void Yac::on_commit(CommitMessage commit) {
void Yac::onState(std::vector<VoteMessage> state) {
std::lock_guard<std::mutex> guard(mutex_);
if (crypto_->verify(commit)) {
// Commit does not contain data about peer which sent the message
applyCommit(boost::none, commit);
if (crypto_->verify(state)) {
applyState(state);
} else {
log_->warn(cryptoError(commit.votes));
}
}

void Yac::on_reject(RejectMessage reject) {
std::lock_guard<std::mutex> guard(mutex_);
if (crypto_->verify(reject)) {
// Reject does not contain data about peer which sent the message
applyReject(boost::none, reject);
} else {
log_->warn(cryptoError(reject.votes));
log_->warn(cryptoError(state));
}
}

Expand All @@ -133,7 +110,7 @@ namespace iroha {
vote.hash.proposal_hash,
vote.hash.block_hash);

network_->send_vote(cluster_order_.currentLeader(), vote);
network_->sendState(cluster_order_.currentLeader(), {vote});
cluster_order_.switchToNext();
if (cluster_order_.hasNext()) {
timer_->invokeAfterDelay([this, vote] { this->votingStep(vote); });
Expand All @@ -157,137 +134,77 @@ namespace iroha {

// ------|Apply data|------

const char *kRejectMsg = "reject case";
const char *kRejectOnHashMsg = "Reject case on hash {} achieved";

void Yac::applyCommit(
boost::optional<std::shared_ptr<shared_model::interface::Peer>> from,
const CommitMessage &commit) {
void Yac::applyState(const std::vector<VoteMessage> &state) {
auto answer =
vote_storage_.store(commit, cluster_order_.getNumberOfPeers());
answer | [&](const auto &answer) {
auto proposal_hash = getProposalHash(commit.votes).value();
auto already_processed =
vote_storage_.getProcessingState(proposal_hash);
if (not already_processed) {
vote_storage_.markAsProcessedState(proposal_hash);
visit_in_place(answer,
[&](const CommitMessage &commit) {
notifier_.get_subscriber().on_next(commit);
},
[&](const RejectMessage &reject) {
log_->warn(kRejectMsg);
// TODO 14/08/17 Muratov: work on reject case
// IR-497
});
}
this->closeRound();
};
}
vote_storage_.store(state, cluster_order_.getNumberOfPeers());

void Yac::applyReject(
boost::optional<std::shared_ptr<shared_model::interface::Peer>> from,
const RejectMessage &reject) {
auto answer =
vote_storage_.store(reject, cluster_order_.getNumberOfPeers());
answer | [&](const auto &answer) {
auto proposal_hash = getProposalHash(reject.votes).value();
auto already_processed =
vote_storage_.getProcessingState(proposal_hash);
// TODO 10.06.2018 andrei: IR-1407 move YAC propagation strategy to a
// separate entity

if (not already_processed) {
vote_storage_.markAsProcessedState(proposal_hash);
visit_in_place(answer,
[&](const RejectMessage &reject) {
log_->warn(kRejectMsg);
// TODO 14/08/17 Muratov: work on reject case
// IR-497
},
[&](const CommitMessage &commit) {
this->propagateCommit(commit);
notifier_.get_subscriber().on_next(commit);
});
answer | [&](const auto &answer) {
auto &proposal_hash = state.at(0).hash.proposal_hash;

/*
* It is possible that a new peer with an outdated peers list may
* collect an outcome from a smaller number of peers which are
* included in set of `f` peers in the system. The new peer will not
* accept our message with valid supermajority because he cannot apply
* votes from unknown peers.
*/
if (state.size() > 1) {
// some peer has already collected commit/reject, so it is sent
if (vote_storage_.getProcessingState(proposal_hash)
== ProposalState::kNotSentNotProcessed) {
vote_storage_.nextProcessingState(proposal_hash);
log_->info(
"Received supermajority of votes for {}, skip propagation",
proposal_hash);
}
}
this->closeRound();
};
}

void Yac::applyVote(
boost::optional<std::shared_ptr<shared_model::interface::Peer>> from,
const VoteMessage &vote) {
if (from) {
log_->info("Apply vote: {} from ledger peer {}",
vote.hash.block_hash,
(*from)->address());
} else {
log_->info("Apply vote: {} from unknown peer {}",
vote.hash.block_hash,
vote.signature->publicKey().hex());
}

auto answer =
vote_storage_.store(vote, cluster_order_.getNumberOfPeers());

answer | [&](const auto &answer) {
auto &proposal_hash = vote.hash.proposal_hash;
auto already_processed =
auto processing_state =
vote_storage_.getProcessingState(proposal_hash);

if (not already_processed) {
vote_storage_.markAsProcessedState(proposal_hash);
visit_in_place(answer,
[&](const CommitMessage &commit) {
// propagate for all
log_->info("Propagate commit {} to whole network",
vote.hash.block_hash);
this->propagateCommit(commit);
notifier_.get_subscriber().on_next(commit);
},
[&](const RejectMessage &reject) {
// propagate reject for all
log_->info(kRejectOnHashMsg, proposal_hash);
this->propagateReject(reject);
});
} else {
from | [&](const auto &from) {
visit_in_place(answer,
[&](const CommitMessage &commit) {
log_->info("Propagate commit {} directly to {}",
vote.hash.block_hash,
from->address());
this->propagateCommitDirectly(*from, commit);
},
[&](const RejectMessage &reject) {
log_->info(kRejectOnHashMsg, proposal_hash);
this->propagateRejectDirectly(*from, reject);
});
};
auto votes = [](const auto &state) { return state.votes; };

switch (processing_state) {
case ProposalState::kNotSentNotProcessed:
vote_storage_.nextProcessingState(proposal_hash);
log_->info("Propagate state {} to whole network", proposal_hash);
this->propagateState(visit_in_place(answer, votes));
break;
case ProposalState::kSentNotProcessed:
vote_storage_.nextProcessingState(proposal_hash);
log_->info("Pass outcome for {} to pipeline", proposal_hash);
this->closeRound();
notifier_.get_subscriber().on_next(answer);
break;
case ProposalState::kSentProcessed:
if (state.size() == 1) {
this->findPeer(state.at(0)) | [&](const auto &from) {
log_->info("Propagate state {} directly to {}",
proposal_hash,
from->address());
this->propagateStateDirectly(*from,
visit_in_place(answer, votes));
};
}
break;
}
};
}

// ------|Propagation|------

void Yac::propagateCommit(const CommitMessage &msg) {
for (const auto &peer : cluster_order_.getPeers()) {
propagateCommitDirectly(*peer, msg);
}
}

void Yac::propagateCommitDirectly(const shared_model::interface::Peer &to,
const CommitMessage &msg) {
network_->send_commit(to, msg);
}

void Yac::propagateReject(const RejectMessage &msg) {
void Yac::propagateState(const std::vector<VoteMessage> &msg) {
for (const auto &peer : cluster_order_.getPeers()) {
propagateRejectDirectly(*peer, msg);
propagateStateDirectly(*peer, msg);
}
}

void Yac::propagateRejectDirectly(const shared_model::interface::Peer &to,
const RejectMessage &msg) {
network_->send_reject(std::move(to), std::move(msg));
void Yac::propagateStateDirectly(const shared_model::interface::Peer &to,
const std::vector<VoteMessage> &msg) {
network_->sendState(to, msg);
}

} // namespace yac
Expand Down
29 changes: 10 additions & 19 deletions irohad/consensus/yac/impl/yac_crypto_provider_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,27 +27,18 @@ namespace iroha {
const shared_model::crypto::Keypair &keypair)
: keypair_(keypair) {}

bool CryptoProviderImpl::verify(CommitMessage msg) {
bool CryptoProviderImpl::verify(std::vector<VoteMessage> msg) {
return std::all_of(
std::begin(msg.votes),
std::end(msg.votes),
[this](const auto &vote) { return this->verify(vote); });
}

bool CryptoProviderImpl::verify(RejectMessage msg) {
return std::all_of(
std::begin(msg.votes),
std::end(msg.votes),
[this](const auto &vote) { return this->verify(vote); });
}

bool CryptoProviderImpl::verify(VoteMessage msg) {
auto serialized =
PbConverters::serializeVote(msg).hash().SerializeAsString();
auto blob = shared_model::crypto::Blob(serialized);
std::begin(msg), std::end(msg), [](const auto &vote) {
auto serialized =
PbConverters::serializeVote(vote).hash().SerializeAsString();
auto blob = shared_model::crypto::Blob(serialized);

return shared_model::crypto::CryptoVerifier<>::verify(
msg.signature->signedData(), blob, msg.signature->publicKey());
return shared_model::crypto::CryptoVerifier<>::verify(
vote.signature->signedData(),
blob,
vote.signature->publicKey());
});
}

VoteMessage CryptoProviderImpl::getVote(YacHash hash) {
Expand Down
6 changes: 1 addition & 5 deletions irohad/consensus/yac/impl/yac_crypto_provider_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,7 @@ namespace iroha {
explicit CryptoProviderImpl(
const shared_model::crypto::Keypair &keypair);

bool verify(CommitMessage msg) override;

bool verify(RejectMessage msg) override;

bool verify(VoteMessage msg) override;
bool verify(std::vector<VoteMessage> msg) override;

VoteMessage getVote(YacHash hash) override;

Expand Down
Loading

0 comments on commit 1f6c56e

Please sign in to comment.