Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Outputing subset messages as received (initial work) #233

Merged
merged 15 commits into from
Sep 20, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
106 changes: 56 additions & 50 deletions src/honey_badger/epoch_state.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use std::collections::btree_map::Entry;
use std::collections::{BTreeMap, BTreeSet};
use std::collections::{BTreeMap, BTreeSet, VecDeque};
use std::marker::PhantomData;
use std::sync::Arc;

Expand All @@ -11,7 +11,7 @@ use serde::{Deserialize, Serialize};
use super::{Batch, ErrorKind, MessageContent, Result, Step};
use fault_log::{Fault, FaultKind, FaultLog};
use messaging::{DistAlgorithm, NetworkInfo};
use subset::{self as cs, Subset};
use subset::{self as cs, Subset, SubsetOutput};
use threshold_decryption::{self as td, ThresholdDecryption};
use traits::{Contribution, NodeIdT};

Expand Down Expand Up @@ -116,6 +116,8 @@ pub struct EpochState<C, N: Rand> {
subset: SubsetState<N>,
/// The status of threshold decryption, by proposer.
decryption: BTreeMap<N, DecryptionState<N>>,
/// Nodes found so far in `Subset` output.
accepted_proposers: BTreeSet<N>,
_phantom: PhantomData<C>,
}

Expand All @@ -132,6 +134,7 @@ where
netinfo,
subset: SubsetState::Ongoing(cs),
decryption: BTreeMap::default(),
accepted_proposers: Default::default(),
_phantom: PhantomData,
})
}
Expand Down Expand Up @@ -219,15 +222,39 @@ where
/// Checks whether the subset has output, and if it does, sends out our decryption shares.
fn process_subset(&mut self, cs_step: cs::Step<N>) -> Result<Step<C, N>> {
let mut step = Step::default();
let mut cs_outputs = step.extend_with(cs_step, |cs_msg| {
let cs_outputs: VecDeque<_> = step.extend_with(cs_step, |cs_msg| {
MessageContent::Subset(cs_msg).with_epoch(self.epoch)
});
if let Some(cs_output) = cs_outputs.pop_front() {
self.subset = SubsetState::Complete(cs_output.keys().cloned().collect());
step.extend(self.send_decryption_shares(cs_output)?);
}
if !cs_outputs.is_empty() {
error!("Multiple outputs from a single Subset instance.");
let mut has_seen_done = false;
for cs_output in cs_outputs {
if has_seen_done {
error!("`SubsetOutput::Done` was not the last `SubsetOutput`");
}
match cs_output {
SubsetOutput::Contribution(k, v) => {
step.extend(self.send_decryption_share(k.clone(), &v)?);
self.accepted_proposers.insert(k);
}
SubsetOutput::Done => {
self.subset = SubsetState::Complete(self.accepted_proposers.clone());

let faulty_shares: Vec<_> = self
.decryption
.keys()
.filter(|id| !self.accepted_proposers.contains(id))
.cloned()
.collect();
for id in faulty_shares {
if let Some(DecryptionState::Ongoing(td)) = self.decryption.remove(&id) {
for id in td.sender_ids() {
let fault_kind = FaultKind::UnexpectedDecryptionShare;
step.fault_log.append(id.clone(), fault_kind);
}
}
}
has_seen_done = true
}
}
}
Ok(step)
}
Expand All @@ -250,49 +277,28 @@ where

/// Given the output of the Subset algorithm, inputs the ciphertexts into the Threshold
/// Decryption instances and sends our own decryption shares.
fn send_decryption_shares(&mut self, cs_output: BTreeMap<N, Vec<u8>>) -> Result<Step<C, N>> {
let mut step = Step::default();
let faulty_shares: Vec<_> = self
.decryption
.keys()
.filter(|id| !cs_output.contains_key(id))
.cloned()
.collect();
for id in faulty_shares {
if let Some(DecryptionState::Ongoing(td)) = self.decryption.remove(&id) {
for id in td.sender_ids() {
let fault_kind = FaultKind::UnexpectedDecryptionShare;
step.fault_log.append(id.clone(), fault_kind);
}
fn send_decryption_share(&mut self, proposer_id: N, v: &[u8]) -> Result<Step<C, N>> {
let ciphertext: Ciphertext = match bincode::deserialize(v) {
Ok(ciphertext) => ciphertext,
Err(err) => {
warn!(
"Cannot deserialize ciphertext from {:?}: {:?}",
proposer_id, err
);
return Ok(Fault::new(proposer_id, FaultKind::InvalidCiphertext).into());
}
}
for (proposer_id, v) in cs_output {
let ciphertext: Ciphertext = match bincode::deserialize(&v) {
Ok(ciphertext) => ciphertext,
Err(err) => {
warn!(
"Cannot deserialize ciphertext from {:?}: {:?}",
proposer_id, err
);
let fault_kind = FaultKind::InvalidCiphertext;
step.fault_log.append(proposer_id, fault_kind);
continue;
}
};
let td_result = match self.decryption.entry(proposer_id.clone()) {
Entry::Occupied(entry) => entry.into_mut(),
Entry::Vacant(entry) => entry.insert(DecryptionState::new(self.netinfo.clone())),
}.set_ciphertext(ciphertext);
match td_result {
Ok(td_step) => step.extend(self.process_decryption(proposer_id, td_step)?),
Err(td::Error::InvalidCiphertext(_)) => {
warn!("Invalid ciphertext from {:?}", proposer_id);
let fault_kind = FaultKind::ShareDecryptionFailed;
step.fault_log.append(proposer_id.clone(), fault_kind);
}
Err(err) => return Err(ErrorKind::ThresholdDecryption(err).into()),
};
let td_result = match self.decryption.entry(proposer_id.clone()) {
Entry::Occupied(entry) => entry.into_mut(),
Entry::Vacant(entry) => entry.insert(DecryptionState::new(self.netinfo.clone())),
}.set_ciphertext(ciphertext);
match td_result {
Ok(td_step) => self.process_decryption(proposer_id, td_step),
Err(td::Error::InvalidCiphertext(_)) => {
warn!("Invalid ciphertext from {:?}", proposer_id);
Ok(Fault::new(proposer_id.clone(), FaultKind::ShareDecryptionFailed).into())
}
Err(err) => Err(ErrorKind::ThresholdDecryption(err).into()),
}
Ok(step)
}
}
90 changes: 62 additions & 28 deletions src/subset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,8 @@ pub struct Subset<N: Rand> {
netinfo: Arc<NetworkInfo<N>>,
broadcast_instances: BTreeMap<N, Broadcast<N>>,
ba_instances: BTreeMap<N, BinaryAgreement<N>>,
broadcast_results: BTreeMap<N, ProposedValue>,
/// `None` means that that item has already been output.
broadcast_results: BTreeMap<N, Option<ProposedValue>>,
ba_results: BTreeMap<N, bool>,
/// Whether the instance has decided on a value.
decided: bool,
Expand All @@ -89,7 +90,7 @@ pub type Step<N> = messaging::Step<Subset<N>>;
impl<N: NodeIdT + Rand> DistAlgorithm for Subset<N> {
type NodeId = N;
type Input = ProposedValue;
type Output = BTreeMap<N, ProposedValue>;
type Output = SubsetOutput<N>;
type Message = Message<N>;
type Error = Error;

Expand Down Expand Up @@ -124,6 +125,12 @@ impl<N: NodeIdT + Rand> DistAlgorithm for Subset<N> {
}
}

#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub enum SubsetOutput<N> {
Contribution(N, Vec<u8>),
Done,
}

impl<N: NodeIdT + Rand> Subset<N> {
pub fn new(netinfo: Arc<NetworkInfo<N>>, session_id: u64) -> Result<Self> {
// Create all broadcast instances.
Expand Down Expand Up @@ -220,7 +227,22 @@ impl<N: NodeIdT + Rand> Subset<N> {
return Ok(step);
}
};
self.broadcast_results.insert(proposer_id.clone(), value);

let val_to_insert = if let Some(true) = self.ba_results.get(proposer_id) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need to pattern-match here: This could just be if Some(true) == self.ba_results.get(proposer_id).

On the other hand, maybe it should be a match instead: I'm wondering whether in the Some(false) case, we should also just insert None, since we won't need the value anyway—we already know it has been rejected?

debug!(" {:?} → {:?}", proposer_id, HexBytes(&value));
step.output
.extend(Some(SubsetOutput::Contribution(proposer_id.clone(), value)));
None
} else {
Some(value)
};

if let Some(inval) = self
.broadcast_results
.insert(proposer_id.clone(), val_to_insert)
{
error!("Duplicate insert in broadcast_results: {:?}", inval)
}
let set_binary_agreement_input = |ba: &mut BinaryAgreement<N>| {
if ba.accepts_input() {
ba.handle_input(true)
Expand All @@ -239,7 +261,7 @@ impl<N: NodeIdT + Rand> Subset<N> {
F: FnOnce(&mut BinaryAgreement<N>) -> binary_agreement::Result<binary_agreement::Step<N>>,
{
let mut step = Step::default();
let value = {
let accepted = {
let binary_agreement = self
.ba_instances
.get_mut(proposer_id)
Expand All @@ -252,40 +274,55 @@ impl<N: NodeIdT + Rand> Subset<N> {
f(binary_agreement).map_err(Error::ProcessBinaryAgreement0)?,
to_msg,
);
if let Some(output) = output.into_iter().next() {
output
if let Some(accepted) = output.into_iter().next() {
accepted
} else {
return Ok(step);
}
};
if self.ba_results.insert(proposer_id.clone(), value).is_some() {

// Binary agreement result accepted.
if self
.ba_results
.insert(proposer_id.clone(), accepted)
.is_some()
{
return Err(Error::MultipleBinaryAgreementResults);
}

debug!(
"{:?} Updated Binary Agreement results: {:?}",
self.netinfo.our_id(),
self.ba_results
);

if value && self.count_true() == self.netinfo.num_correct() {
// Upon delivery of value 1 from at least N − f instances of BA, provide
// input 0 to each instance of BA that has not yet been provided input.
for (id, binary_agreement) in &mut self.ba_instances {
if binary_agreement.accepts_input() {
let to_msg = |a_msg| Message::BinaryAgreement(id.clone(), a_msg);
for output in step.extend_with(
binary_agreement
.handle_input(false)
.map_err(Error::ProcessBinaryAgreement1)?,
to_msg,
) {
if self.ba_results.insert(id.clone(), output).is_some() {
return Err(Error::MultipleBinaryAgreementResults);
if accepted {
if self.count_true() == self.netinfo.num_correct() {
// Upon delivery of value 1 from at least N − f instances of BA, provide
// input 0 to each instance of BA that has not yet been provided input.
for (id, binary_agreement) in &mut self.ba_instances {
if binary_agreement.accepts_input() {
let to_msg = |a_msg| Message::BinaryAgreement(id.clone(), a_msg);
for output in step.extend_with(
binary_agreement
.handle_input(false)
.map_err(Error::ProcessBinaryAgreement1)?,
to_msg,
) {
if self.ba_results.insert(id.clone(), output).is_some() {
return Err(Error::MultipleBinaryAgreementResults);
}
}
}
}
}
if let Some(Some(value)) = self.broadcast_results.insert(proposer_id.clone(), None) {
debug!(" {:?} → {:?}", proposer_id, HexBytes(&value));
step.output
.extend(Some(SubsetOutput::Contribution(proposer_id.clone(), value)));
}
}

step.output.extend(self.try_binary_agreement_completion());
Ok(step)
}
Expand All @@ -295,7 +332,7 @@ impl<N: NodeIdT + Rand> Subset<N> {
self.ba_results.values().filter(|v| **v).count()
}

fn try_binary_agreement_completion(&mut self) -> Option<BTreeMap<N, ProposedValue>> {
fn try_binary_agreement_completion(&mut self) -> Option<SubsetOutput<N>> {
if self.decided || self.count_true() < self.netinfo.num_correct() {
return None;
}
Expand All @@ -322,23 +359,20 @@ impl<N: NodeIdT + Rand> Subset<N> {
);

// Results of Broadcast instances in `delivered_1`
let broadcast_results: BTreeMap<N, ProposedValue> = self
let broadcast_results: BTreeSet<&N> = self
.broadcast_results
.iter()
.filter(|(k, _)| delivered_1.contains(k))
.map(|(k, v)| (k.clone(), v.clone()))
.map(|(k, _)| k)
.collect();

if delivered_1.len() == broadcast_results.len() {
debug!(
"{:?} Binary Agreement instances completed:",
self.netinfo.our_id()
);
for (id, result) in &broadcast_results {
debug!(" {:?} → {:?}", id, HexBytes(&result));
}
self.decided = true;
Some(broadcast_results)
Some(SubsetOutput::Done)
DemiMarie marked this conversation as resolved.
Show resolved Hide resolved
} else {
None
}
Expand Down
6 changes: 4 additions & 2 deletions tests/net_dynamic_hb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,10 +90,12 @@ fn do_drop_and_readd(cfg: TestConfig) {
// First, we create a new test network with Honey Badger instances.
let mut net = NetBuilder::new(0..cfg.dimension.size)
.num_faulty(cfg.dimension.faulty)
.message_limit(200_000) // Limited to 200k messages for now.
.message_limit(200_000) // Limited to 200k messages for now.
.using_step(move |node| {
println!("Constructing new dynamic honey badger node #{}", node.id);
DynamicHoneyBadger::builder().build(node.netinfo).expect("cannot build instance")
DynamicHoneyBadger::builder()
.build(node.netinfo)
.expect("cannot build instance")
}).build()
.expect("could not construct test network");

Expand Down
41 changes: 25 additions & 16 deletions tests/subset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use std::iter::once;
use std::sync::Arc;

use hbbft::messaging::NetworkInfo;
use hbbft::subset::Subset;
use hbbft::subset::{Subset, SubsetOutput};

use network::{Adversary, MessageScheduler, NodeId, SilentAdversary, TestNetwork, TestNode};

Expand All @@ -42,24 +42,33 @@ fn test_subset<A: Adversary<Subset<NodeId>>>(
while !network.nodes.values().all(TestNode::terminated) {
network.step();
}

// Verify that all instances output the same set.
let mut expected = None;
let observer: BTreeSet<_> = network.observer.outputs().iter().cloned().collect();
afck marked this conversation as resolved.
Show resolved Hide resolved
for node in network.nodes.values() {
if let Some(output) = expected.as_ref() {
assert!(once(output).eq(node.outputs()));
continue;
let mut outputs = node.outputs();
let mut actual = BTreeMap::default();

let mut has_seen_done = false;
for i in outputs {
assert!(!has_seen_done);
match i {
SubsetOutput::Contribution(k, v) => {
assert!(actual.insert(k, v).is_none());
}
SubsetOutput::Done => has_seen_done = true,
}
}
assert_eq!(1, node.outputs().len());
expected = Some(node.outputs()[0].clone());
}
let output = expected.unwrap();
assert!(once(&output).eq(network.observer.outputs()));
DemiMarie marked this conversation as resolved.
Show resolved Hide resolved
// The Subset algorithm guarantees that more than two thirds of the proposed elements
// are in the set.
assert!(output.len() * 3 > inputs.len() * 2);
// Verify that the set's elements match the proposed values.
for (id, value) in output {
assert_eq!(inputs[&id], value);
assert_eq!(outputs.len(), actual.len() + 1);

// The Subset algorithm guarantees that more than two thirds of the proposed elements
// are in the set.
assert!(actual.len() * 3 > inputs.len() * 2);
for (id, value) in actual {
assert_eq!(&inputs[id], value);
}

assert_eq!(outputs.iter().cloned().collect::<BTreeSet<_>>(), observer);
afck marked this conversation as resolved.
Show resolved Hide resolved
}
}

Expand Down