diff --git a/src/honey_badger/epoch_state.rs b/src/honey_badger/epoch_state.rs index f5f088a7..9092a562 100644 --- a/src/honey_badger/epoch_state.rs +++ b/src/honey_badger/epoch_state.rs @@ -1,5 +1,5 @@ use std::collections::btree_map::Entry; -use std::collections::{BTreeMap, BTreeSet}; +use std::collections::{BTreeMap, BTreeSet, VecDeque}; use std::marker::PhantomData; use std::sync::Arc; @@ -11,7 +11,7 @@ use serde::{Deserialize, Serialize}; use super::{Batch, ErrorKind, MessageContent, Result, Step}; use fault_log::{Fault, FaultKind, FaultLog}; use messaging::{DistAlgorithm, NetworkInfo}; -use subset::{self as cs, Subset}; +use subset::{self as cs, Subset, SubsetOutput}; use threshold_decryption::{self as td, ThresholdDecryption}; use traits::{Contribution, NodeIdT}; @@ -116,6 +116,8 @@ pub struct EpochState { subset: SubsetState, /// The status of threshold decryption, by proposer. decryption: BTreeMap>, + /// Nodes found so far in `Subset` output. + accepted_proposers: BTreeSet, _phantom: PhantomData, } @@ -132,6 +134,7 @@ where netinfo, subset: SubsetState::Ongoing(cs), decryption: BTreeMap::default(), + accepted_proposers: Default::default(), _phantom: PhantomData, }) } @@ -219,15 +222,39 @@ where /// Checks whether the subset has output, and if it does, sends out our decryption shares. fn process_subset(&mut self, cs_step: cs::Step) -> Result> { let mut step = Step::default(); - let mut cs_outputs = step.extend_with(cs_step, |cs_msg| { + let cs_outputs: VecDeque<_> = step.extend_with(cs_step, |cs_msg| { MessageContent::Subset(cs_msg).with_epoch(self.epoch) }); - if let Some(cs_output) = cs_outputs.pop_front() { - self.subset = SubsetState::Complete(cs_output.keys().cloned().collect()); - step.extend(self.send_decryption_shares(cs_output)?); - } - if !cs_outputs.is_empty() { - error!("Multiple outputs from a single Subset instance."); + let mut has_seen_done = false; + for cs_output in cs_outputs { + if has_seen_done { + error!("`SubsetOutput::Done` was not the last `SubsetOutput`"); + } + match cs_output { + SubsetOutput::Contribution(k, v) => { + step.extend(self.send_decryption_share(k.clone(), &v)?); + self.accepted_proposers.insert(k); + } + SubsetOutput::Done => { + self.subset = SubsetState::Complete(self.accepted_proposers.clone()); + + let faulty_shares: Vec<_> = self + .decryption + .keys() + .filter(|id| !self.accepted_proposers.contains(id)) + .cloned() + .collect(); + for id in faulty_shares { + if let Some(DecryptionState::Ongoing(td)) = self.decryption.remove(&id) { + for id in td.sender_ids() { + let fault_kind = FaultKind::UnexpectedDecryptionShare; + step.fault_log.append(id.clone(), fault_kind); + } + } + } + has_seen_done = true + } + } } Ok(step) } @@ -250,49 +277,28 @@ where /// Given the output of the Subset algorithm, inputs the ciphertexts into the Threshold /// Decryption instances and sends our own decryption shares. - fn send_decryption_shares(&mut self, cs_output: BTreeMap>) -> Result> { - let mut step = Step::default(); - let faulty_shares: Vec<_> = self - .decryption - .keys() - .filter(|id| !cs_output.contains_key(id)) - .cloned() - .collect(); - for id in faulty_shares { - if let Some(DecryptionState::Ongoing(td)) = self.decryption.remove(&id) { - for id in td.sender_ids() { - let fault_kind = FaultKind::UnexpectedDecryptionShare; - step.fault_log.append(id.clone(), fault_kind); - } + fn send_decryption_share(&mut self, proposer_id: N, v: &[u8]) -> Result> { + let ciphertext: Ciphertext = match bincode::deserialize(v) { + Ok(ciphertext) => ciphertext, + Err(err) => { + warn!( + "Cannot deserialize ciphertext from {:?}: {:?}", + proposer_id, err + ); + return Ok(Fault::new(proposer_id, FaultKind::InvalidCiphertext).into()); } - } - for (proposer_id, v) in cs_output { - let ciphertext: Ciphertext = match bincode::deserialize(&v) { - Ok(ciphertext) => ciphertext, - Err(err) => { - warn!( - "Cannot deserialize ciphertext from {:?}: {:?}", - proposer_id, err - ); - let fault_kind = FaultKind::InvalidCiphertext; - step.fault_log.append(proposer_id, fault_kind); - continue; - } - }; - let td_result = match self.decryption.entry(proposer_id.clone()) { - Entry::Occupied(entry) => entry.into_mut(), - Entry::Vacant(entry) => entry.insert(DecryptionState::new(self.netinfo.clone())), - }.set_ciphertext(ciphertext); - match td_result { - Ok(td_step) => step.extend(self.process_decryption(proposer_id, td_step)?), - Err(td::Error::InvalidCiphertext(_)) => { - warn!("Invalid ciphertext from {:?}", proposer_id); - let fault_kind = FaultKind::ShareDecryptionFailed; - step.fault_log.append(proposer_id.clone(), fault_kind); - } - Err(err) => return Err(ErrorKind::ThresholdDecryption(err).into()), + }; + let td_result = match self.decryption.entry(proposer_id.clone()) { + Entry::Occupied(entry) => entry.into_mut(), + Entry::Vacant(entry) => entry.insert(DecryptionState::new(self.netinfo.clone())), + }.set_ciphertext(ciphertext); + match td_result { + Ok(td_step) => self.process_decryption(proposer_id, td_step), + Err(td::Error::InvalidCiphertext(_)) => { + warn!("Invalid ciphertext from {:?}", proposer_id); + Ok(Fault::new(proposer_id.clone(), FaultKind::ShareDecryptionFailed).into()) } + Err(err) => Err(ErrorKind::ThresholdDecryption(err).into()), } - Ok(step) } } diff --git a/src/subset.rs b/src/subset.rs index 722cde13..abb84f60 100644 --- a/src/subset.rs +++ b/src/subset.rs @@ -78,7 +78,8 @@ pub struct Subset { netinfo: Arc>, broadcast_instances: BTreeMap>, ba_instances: BTreeMap>, - broadcast_results: BTreeMap, + /// `None` means that that item has already been output. + broadcast_results: BTreeMap>, ba_results: BTreeMap, /// Whether the instance has decided on a value. decided: bool, @@ -89,7 +90,7 @@ pub type Step = messaging::Step>; impl DistAlgorithm for Subset { type NodeId = N; type Input = ProposedValue; - type Output = BTreeMap; + type Output = SubsetOutput; type Message = Message; type Error = Error; @@ -124,6 +125,12 @@ impl DistAlgorithm for Subset { } } +#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)] +pub enum SubsetOutput { + Contribution(N, Vec), + Done, +} + impl Subset { pub fn new(netinfo: Arc>, session_id: u64) -> Result { // Create all broadcast instances. @@ -220,7 +227,22 @@ impl Subset { return Ok(step); } }; - self.broadcast_results.insert(proposer_id.clone(), value); + + let val_to_insert = if let Some(true) = self.ba_results.get(proposer_id) { + debug!(" {:?} → {:?}", proposer_id, HexBytes(&value)); + step.output + .extend(Some(SubsetOutput::Contribution(proposer_id.clone(), value))); + None + } else { + Some(value) + }; + + if let Some(inval) = self + .broadcast_results + .insert(proposer_id.clone(), val_to_insert) + { + error!("Duplicate insert in broadcast_results: {:?}", inval) + } let set_binary_agreement_input = |ba: &mut BinaryAgreement| { if ba.accepts_input() { ba.handle_input(true) @@ -239,7 +261,7 @@ impl Subset { F: FnOnce(&mut BinaryAgreement) -> binary_agreement::Result>, { let mut step = Step::default(); - let value = { + let accepted = { let binary_agreement = self .ba_instances .get_mut(proposer_id) @@ -252,40 +274,55 @@ impl Subset { f(binary_agreement).map_err(Error::ProcessBinaryAgreement0)?, to_msg, ); - if let Some(output) = output.into_iter().next() { - output + if let Some(accepted) = output.into_iter().next() { + accepted } else { return Ok(step); } }; - if self.ba_results.insert(proposer_id.clone(), value).is_some() { + + // Binary agreement result accepted. + if self + .ba_results + .insert(proposer_id.clone(), accepted) + .is_some() + { return Err(Error::MultipleBinaryAgreementResults); } + debug!( "{:?} Updated Binary Agreement results: {:?}", self.netinfo.our_id(), self.ba_results ); - if value && self.count_true() == self.netinfo.num_correct() { - // Upon delivery of value 1 from at least N − f instances of BA, provide - // input 0 to each instance of BA that has not yet been provided input. - for (id, binary_agreement) in &mut self.ba_instances { - if binary_agreement.accepts_input() { - let to_msg = |a_msg| Message::BinaryAgreement(id.clone(), a_msg); - for output in step.extend_with( - binary_agreement - .handle_input(false) - .map_err(Error::ProcessBinaryAgreement1)?, - to_msg, - ) { - if self.ba_results.insert(id.clone(), output).is_some() { - return Err(Error::MultipleBinaryAgreementResults); + if accepted { + if self.count_true() == self.netinfo.num_correct() { + // Upon delivery of value 1 from at least N − f instances of BA, provide + // input 0 to each instance of BA that has not yet been provided input. + for (id, binary_agreement) in &mut self.ba_instances { + if binary_agreement.accepts_input() { + let to_msg = |a_msg| Message::BinaryAgreement(id.clone(), a_msg); + for output in step.extend_with( + binary_agreement + .handle_input(false) + .map_err(Error::ProcessBinaryAgreement1)?, + to_msg, + ) { + if self.ba_results.insert(id.clone(), output).is_some() { + return Err(Error::MultipleBinaryAgreementResults); + } } } } } + if let Some(Some(value)) = self.broadcast_results.insert(proposer_id.clone(), None) { + debug!(" {:?} → {:?}", proposer_id, HexBytes(&value)); + step.output + .extend(Some(SubsetOutput::Contribution(proposer_id.clone(), value))); + } } + step.output.extend(self.try_binary_agreement_completion()); Ok(step) } @@ -295,7 +332,7 @@ impl Subset { self.ba_results.values().filter(|v| **v).count() } - fn try_binary_agreement_completion(&mut self) -> Option> { + fn try_binary_agreement_completion(&mut self) -> Option> { if self.decided || self.count_true() < self.netinfo.num_correct() { return None; } @@ -322,11 +359,11 @@ impl Subset { ); // Results of Broadcast instances in `delivered_1` - let broadcast_results: BTreeMap = self + let broadcast_results: BTreeSet<&N> = self .broadcast_results .iter() .filter(|(k, _)| delivered_1.contains(k)) - .map(|(k, v)| (k.clone(), v.clone())) + .map(|(k, _)| k) .collect(); if delivered_1.len() == broadcast_results.len() { @@ -334,11 +371,8 @@ impl Subset { "{:?} Binary Agreement instances completed:", self.netinfo.our_id() ); - for (id, result) in &broadcast_results { - debug!(" {:?} → {:?}", id, HexBytes(&result)); - } self.decided = true; - Some(broadcast_results) + Some(SubsetOutput::Done) } else { None } diff --git a/tests/net_dynamic_hb.rs b/tests/net_dynamic_hb.rs index ba9b5b17..de554205 100644 --- a/tests/net_dynamic_hb.rs +++ b/tests/net_dynamic_hb.rs @@ -90,10 +90,12 @@ fn do_drop_and_readd(cfg: TestConfig) { // First, we create a new test network with Honey Badger instances. let mut net = NetBuilder::new(0..cfg.dimension.size) .num_faulty(cfg.dimension.faulty) - .message_limit(200_000) // Limited to 200k messages for now. + .message_limit(200_000) // Limited to 200k messages for now. .using_step(move |node| { println!("Constructing new dynamic honey badger node #{}", node.id); - DynamicHoneyBadger::builder().build(node.netinfo).expect("cannot build instance") + DynamicHoneyBadger::builder() + .build(node.netinfo) + .expect("cannot build instance") }).build() .expect("could not construct test network"); diff --git a/tests/subset.rs b/tests/subset.rs index 92851883..00fe717c 100644 --- a/tests/subset.rs +++ b/tests/subset.rs @@ -20,7 +20,7 @@ use std::iter::once; use std::sync::Arc; use hbbft::messaging::NetworkInfo; -use hbbft::subset::Subset; +use hbbft::subset::{Subset, SubsetOutput}; use network::{Adversary, MessageScheduler, NodeId, SilentAdversary, TestNetwork, TestNode}; @@ -42,24 +42,33 @@ fn test_subset>>( while !network.nodes.values().all(TestNode::terminated) { network.step(); } + // Verify that all instances output the same set. - let mut expected = None; + let observer: BTreeSet<_> = network.observer.outputs().iter().cloned().collect(); for node in network.nodes.values() { - if let Some(output) = expected.as_ref() { - assert!(once(output).eq(node.outputs())); - continue; + let mut outputs = node.outputs(); + let mut actual = BTreeMap::default(); + + let mut has_seen_done = false; + for i in outputs { + assert!(!has_seen_done); + match i { + SubsetOutput::Contribution(k, v) => { + assert!(actual.insert(k, v).is_none()); + } + SubsetOutput::Done => has_seen_done = true, + } } - assert_eq!(1, node.outputs().len()); - expected = Some(node.outputs()[0].clone()); - } - let output = expected.unwrap(); - assert!(once(&output).eq(network.observer.outputs())); - // The Subset algorithm guarantees that more than two thirds of the proposed elements - // are in the set. - assert!(output.len() * 3 > inputs.len() * 2); - // Verify that the set's elements match the proposed values. - for (id, value) in output { - assert_eq!(inputs[&id], value); + assert_eq!(outputs.len(), actual.len() + 1); + + // The Subset algorithm guarantees that more than two thirds of the proposed elements + // are in the set. + assert!(actual.len() * 3 > inputs.len() * 2); + for (id, value) in actual { + assert_eq!(&inputs[id], value); + } + + assert_eq!(outputs.iter().cloned().collect::>(), observer); } }