Skip to content

Commit

Permalink
[fix] hyperledger#3899: Fix topology mismatch bug
Browse files Browse the repository at this point in the history
Signed-off-by: Daniil Polyakov <[email protected]>
  • Loading branch information
Arjentix committed Sep 22, 2023
1 parent 7bcb291 commit 5af02e1
Show file tree
Hide file tree
Showing 10 changed files with 70 additions and 88 deletions.
4 changes: 2 additions & 2 deletions cli/src/samples.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ pub fn get_trusted_peers(public_key: Option<&PublicKey>) -> HashSet<PeerId> {
///
/// # Panics
/// - when [`KeyPair`] generation fails (rare case).
pub fn get_config_proxy(peers: HashSet<PeerId>, key_pair: Option<KeyPair>) -> ConfigurationProxy {
pub fn get_config_proxy(peers: Vec<PeerId>, key_pair: Option<KeyPair>) -> ConfigurationProxy {
let (public_key, private_key) = key_pair
.unwrap_or_else(|| KeyPair::generate().expect("Key pair generation failed"))
.into();
Expand Down Expand Up @@ -94,7 +94,7 @@ pub fn get_config_proxy(peers: HashSet<PeerId>, key_pair: Option<KeyPair>) -> Co
///
/// # Panics
/// - when [`KeyPair`] generation fails (rare case).
pub fn get_config(trusted_peers: HashSet<PeerId>, key_pair: Option<KeyPair>) -> Configuration {
pub fn get_config(trusted_peers: Vec<PeerId>, key_pair: Option<KeyPair>) -> Configuration {
get_config_proxy(trusted_peers, key_pair)
.build()
.expect("Iroha config should build as all required fields were provided")
Expand Down
38 changes: 20 additions & 18 deletions config/src/sumeragi.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
//! `Sumeragi` configuration. Contains both block commit and Gossip-related configuration.
#![allow(clippy::std_instead_of_core, clippy::arithmetic_side_effects)]
use std::{collections::HashSet, fmt::Debug, fs::File, io::BufReader, path::Path};
use std::{fmt::Debug, fs::File, io::BufReader, path::Path};

use eyre::{Result, WrapErr};
use iroha_config_base::derive::{view, Documented, Proxy};
Expand Down Expand Up @@ -92,14 +92,14 @@ impl ConfigurationProxy {
/// The [`peer_id`] field of [`Self`]
/// has not been initialized prior to calling this method.
pub fn insert_self_as_trusted_peers(&mut self) {
let mut peers = HashSet::new();
#[allow(clippy::expect_used)]
let peer_id = self
.peer_id
.clone()
.expect("Insertion of `self` as `trusted_peers` implies that `peer_id` field should be initialized");
peers.insert(peer_id);
self.trusted_peers = Some(TrustedPeers { peers });
self.trusted_peers = Some(TrustedPeers {
peers: vec![peer_id],
});
}
}

Expand All @@ -123,7 +123,7 @@ pub struct TrustedPeers {
/// Optional list of predefined trusted peers. Must contain unique
/// entries. Custom deserializer raises error if duplicates found.
#[serde(deserialize_with = "deserialize_unique_trusted_peers")]
pub peers: HashSet<PeerId>,
pub peers: Vec<PeerId>,
}

/// Custom deserializer that ensures that `trusted_peers` only
Expand All @@ -132,42 +132,46 @@ pub struct TrustedPeers {
/// # Errors
/// - Peer Ids not unique,
/// - Not a sequence (array)
fn deserialize_unique_trusted_peers<'de, D>(deserializer: D) -> Result<HashSet<PeerId>, D::Error>
fn deserialize_unique_trusted_peers<'de, D>(deserializer: D) -> Result<Vec<PeerId>, D::Error>
where
D: serde::Deserializer<'de>,
{
/// Helper, for constructing a unique visitor that errors whenever
/// a duplicate entry is found.
struct UniqueVisitor(core::marker::PhantomData<fn() -> HashSet<PeerId>>);
struct UniqueVisitor;

impl<'de> serde::de::Visitor<'de> for UniqueVisitor {
type Value = HashSet<PeerId>;
type Value = Vec<PeerId>;

fn expecting(&self, formatter: &mut core::fmt::Formatter) -> core::fmt::Result {
formatter.write_str("a set of unique `Peer::Id`s.")
}

fn visit_seq<S>(self, mut seq: S) -> Result<HashSet<PeerId>, S::Error>
fn visit_seq<S>(self, mut seq: S) -> Result<Vec<PeerId>, S::Error>
where
S: serde::de::SeqAccess<'de>,
{
let mut result = HashSet::new();
let mut result = Vec::new();
while let Some(value) = seq.next_element()? {
// Takes O(n) for `Vec`, but it's ok since we don't expect users to place
// a lot of trusted peers in the config (so that `HashSet` will beat `Vec` here)
// and also it's just a start-up phase.
//
// While using `Vec` gives us a defined order of peers.
if result.contains(&value) {
return Err(serde::de::Error::custom(format!(
"The peer id: {}'s public key appears twice.",
&value
)));
}
result.insert(value);
result.push(value);
}

Ok(result)
}
}

let visitor = UniqueVisitor(core::marker::PhantomData);
deserializer.deserialize_seq(visitor)
deserializer.deserialize_seq(UniqueVisitor)
}

impl TrustedPeers {
Expand All @@ -181,11 +185,9 @@ impl TrustedPeers {
let file = File::open(&path)
.wrap_err_with(|| format!("Failed to open trusted peers file {:?}", &path))?;
let reader = BufReader::new(file);
let trusted_peers: HashSet<PeerId> =
serde_json::from_reader(reader).wrap_err("Failed to deserialize json from reader")?;
Ok(TrustedPeers {
peers: trusted_peers,
})
serde_json::from_reader(reader)
.wrap_err("Failed to deserialize json from reader")
.map_err(Into::into)
}
}

Expand Down
Binary file modified configs/peer/validator.wasm
Binary file not shown.
36 changes: 3 additions & 33 deletions core/src/block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -434,39 +434,9 @@ mod valid {
self,
topology: &Topology,
) -> Result<CommittedBlock, (Self, BlockValidationError)> {
// TODO: Should the peer that serves genesis have a fixed role of ProxyTail in topology?
if !self.payload().header.is_genesis()
&& topology.is_consensus_required().is_some()
&& topology
.filter_signatures_by_roles(&[Role::ProxyTail], self.signatures())
.is_empty()
{
return Err((self, SignatureVerificationError::ProxyTailMissing.into()));
}

#[allow(clippy::collapsible_else_if)]
if self.payload().header.is_genesis() {
// At genesis round we blindly take on the network topology from the genesis block.
} else {
let roles = [
Role::ValidatingPeer,
Role::Leader,
Role::ProxyTail,
Role::ObservingPeer,
];

let votes_count = topology
.filter_signatures_by_roles(&roles, self.signatures())
.len();
if votes_count.lt(&topology.min_votes_for_commit()) {
return Err((
self,
SignatureVerificationError::NotEnoughSignatures {
votes_count,
min_votes_for_commit: topology.min_votes_for_commit(),
}
.into(),
));
if !self.payload().header.is_genesis() {
if let Err(err) = topology.verify_signatures(self.signatures()) {
return Err((self, err.into()));
}
}

Expand Down
36 changes: 27 additions & 9 deletions core/src/sumeragi/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -212,18 +212,27 @@ impl SumeragiHandle {
view_change_proofs: msg.view_change_proofs,
}) {
self.metrics.dropped_messages.inc();
error!(?error, "This peer is faulty. Incoming control messages have to be dropped due to low processing speed.");
error!(
?error,
"This peer is faulty. \
Incoming control messages have to be dropped due to low processing speed."
);
}
} else if let Err(error) = self.message_sender.try_send(msg) {
self.metrics.dropped_messages.inc();
error!(?error, "This peer is faulty. Incoming messages have to be dropped due to low processing speed.");
error!(
?error,
"This peer is faulty. \
Incoming messages have to be dropped due to low processing speed."
);
}
}

/// Start [`Sumeragi`] actor and return handle to it.
///
/// # Panics
/// May panic if something is of during initialization which is bug.
#[allow(clippy::too_many_lines)]
pub fn start(
SumeragiStartArgs {
configuration,
Expand All @@ -241,8 +250,10 @@ impl SumeragiHandle {

let skip_block_count = wsv.block_hashes.len();
let mut blocks_iter = (skip_block_count + 1..=block_count).map(|block_height| {
kura.get_block_by_height(block_height as u64)
.expect("Sumeragi should be able to load the block that was reported as presented. If not, the block storage was probably disconnected.")
kura.get_block_by_height(block_height as u64).expect(
"Sumeragi should be able to load the block that was reported as presented. \
If not, the block storage was probably disconnected.",
)
});

let current_topology = match wsv.height() {
Expand All @@ -251,7 +262,10 @@ impl SumeragiHandle {
Topology::new(configuration.trusted_peers.peers.clone())
}
height => {
let block_ref = kura.get_block_by_height(height).expect("Sumeragi could not load block that was reported as present. Please check that the block storage was not disconnected.");
let block_ref = kura.get_block_by_height(height).expect(
"Sumeragi could not load block that was reported as present. \
Please check that the block storage was not disconnected.",
);
let mut topology =
Topology::new(block_ref.payload().header.commit_topology.clone());
topology.rotate_set_a();
Expand All @@ -266,8 +280,10 @@ impl SumeragiHandle {
.expect("Kura blocks should be valid")
.commit(&current_topology)
.expect("Kura blocks should be valid");
wsv.apply_without_execution(&block)
.expect("Block application in init should not fail. Blocks loaded from kura assumed to be valid");
wsv.apply_without_execution(&block).expect(
"Block application in init should not fail. \
Blocks loaded from kura assumed to be valid",
);
}

// finalized_wsv is one block behind
Expand All @@ -279,8 +295,10 @@ impl SumeragiHandle {
.expect("Kura blocks should be valid")
.commit(&current_topology)
.expect("Kura blocks should be valid");
wsv.apply_without_execution(&latest_block)
.expect("Block application in init should not fail. Blocks loaded from kura assumed to be valid");
wsv.apply_without_execution(&latest_block).expect(
"Block application in init should not fail. \
Blocks loaded from kura assumed to be valid",
);
}

info!("Sumeragi has finished loading blocks and setting up the WSV");
Expand Down
18 changes: 6 additions & 12 deletions core/src/sumeragi/network_topology.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ pub struct ConsensusTopology<'topology> {

impl Topology {
/// Create a new topology.
///
/// Order of the peers is defined by the order of the iterator.
pub fn new(peers: impl IntoIterator<Item = PeerId>) -> Self {
Topology {
ordered_peers: peers.into_iter().collect(),
Expand Down Expand Up @@ -158,18 +160,10 @@ impl Topology {
.len()
.try_into()
.expect("`usize` should fit into `u64`");
if let Some(mut rem) = n.checked_rem(len) {
// In case where `n` is larger than `usize` could fit
let usize_max = usize::MAX
.try_into()
.expect("`usize` should fit into `u64`");
while rem > usize_max {
rem -= usize_max;
self.ordered_peers.rotate_left(usize::MAX);
}
let rem = rem
.try_into()
.expect("`rem` is smaller or equal then `usize::MAX`");
if let Some(rem) = n.checked_rem(len) {
let rem = rem.try_into().expect(
"`rem` is smaller than `usize::MAX`, because remainder is always smaller than divisor",
);
self.ordered_peers.rotate_left(rem);
}
}
Expand Down
10 changes: 2 additions & 8 deletions core/test_network/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,7 @@
use core::{fmt::Debug, str::FromStr as _, time::Duration};
#[cfg(debug_assertions)]
use std::sync::atomic::AtomicBool;
use std::{
collections::{HashMap, HashSet},
path::Path,
sync::Arc,
thread,
};
use std::{collections::HashMap, path::Path, sync::Arc, thread};

use eyre::Result;
use futures::{prelude::*, stream::FuturesUnordered};
Expand Down Expand Up @@ -778,8 +773,7 @@ impl TestRuntime for Runtime {

impl TestConfiguration for Configuration {
fn test() -> Self {
let mut sample_proxy =
iroha::samples::get_config_proxy(HashSet::new(), Some(get_key_pair()));
let mut sample_proxy = iroha::samples::get_config_proxy(Vec::new(), Some(get_key_pair()));
let env_proxy =
ConfigurationProxy::from_std_env().expect("Test env variables should parse properly");
let (public_key, private_key) = KeyPair::generate().unwrap().into();
Expand Down
4 changes: 3 additions & 1 deletion crypto/src/signature.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@ use ursa::{
},
};

use crate::{ffi, Error, PublicKey};
#[cfg(any(feature = "std", feature = "import_ffi"))]
use crate::Error;
use crate::{ffi, PublicKey};
#[cfg(feature = "std")]
use crate::{HashOf, KeyPair};

Expand Down
4 changes: 3 additions & 1 deletion data_model/src/block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@ use core::{fmt::Display, time::Duration};

use derive_more::Display;
use getset::Getters;
use iroha_crypto::{HashOf, KeyPair, MerkleTree, SignaturesOf};
#[cfg(all(feature = "std", feature = "transparent_api"))]
use iroha_crypto::KeyPair;
use iroha_crypto::{HashOf, MerkleTree, SignaturesOf};
use iroha_data_model_derive::model;
use iroha_macro::FromVariant;
use iroha_schema::IntoSchema;
Expand Down
8 changes: 4 additions & 4 deletions scripts/test_env.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,14 +50,14 @@ def __init__(self, args: argparse.Namespace):
peer_entry = {"address": f"{peer.host_ip}:{peer.p2p_port}", "public_key": peer.public_key}
self.trusted_peers.append(json.dumps(peer_entry))
os.environ["SUMERAGI_TRUSTED_PEERS"] = f"[{','.join(self.trusted_peers)}]"

def wait_for_genesis(self, n_tries: int):
for i in range(n_tries):
logging.info(f"Waiting for genesis block to be created... Attempt {i+1}/{n_tries}")
try:
with urllib.request.urlopen(f"http://{self.peers[0].host_ip}:{self.peers[0].telemetry_port}/status/blocks") as response:
block_count = int(response.read())
if block_count > 1:
if block_count >= 1:
logging.info(f"Genesis block created. Block count: {block_count}")
return
else:
Expand Down Expand Up @@ -107,7 +107,7 @@ def __init__(self, args: argparse.Namespace, nth: int):
self.private_key = json.dumps(json_keypair['private_key'])

logging.info(f"Peer {self.name} initialized")

def run(self, is_genesis: bool = False):
logging.info(f"Running peer {self.name}...")
peer_dir = self.out_dir.joinpath(f"peers/{self.name}")
Expand Down Expand Up @@ -162,7 +162,7 @@ def copy_or_prompt_build_bin(bin_name: str, root_dir: pathlib.Path, target_dir:
def main(args):
# Bold ASCII escape sequence
logging.basicConfig(level=logging.INFO if args.verbose else logging.WARNING,
style="{",
style="{",
format="{asctime} {levelname} \033[1m{funcName}:{lineno}\033[0m: {message}",)
# ISO 8601 timestamps without timezone
logging.Formatter.formatTime = (lambda self, record, datefmt=None:
Expand Down

0 comments on commit 5af02e1

Please sign in to comment.