Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Outputing subset messages as received (initial work) #233

Merged
merged 15 commits into from
Sep 20, 2018
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
102 changes: 52 additions & 50 deletions src/honey_badger/epoch_state.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use std::collections::btree_map::Entry;
use std::collections::{BTreeMap, BTreeSet};
use std::collections::{BTreeMap, BTreeSet, VecDeque};
use std::marker::PhantomData;
use std::sync::Arc;

Expand All @@ -11,7 +11,7 @@ use serde::{Deserialize, Serialize};
use super::{Batch, ErrorKind, MessageContent, Result, Step};
use fault_log::{Fault, FaultKind, FaultLog};
use messaging::{DistAlgorithm, NetworkInfo};
use subset::{self as cs, Subset};
use subset::{self as cs, Subset, SubsetOutput};
use threshold_decryption::{self as td, ThresholdDecryption};
use traits::{Contribution, NodeIdT};

Expand Down Expand Up @@ -116,6 +116,8 @@ pub struct EpochState<C, N: Rand> {
subset: SubsetState<N>,
/// The status of threshold decryption, by proposer.
decryption: BTreeMap<N, DecryptionState<N>>,
/// N seen so far
DemiMarie marked this conversation as resolved.
Show resolved Hide resolved
nodes_found_so_far: BTreeSet<N>,
DemiMarie marked this conversation as resolved.
Show resolved Hide resolved
_phantom: PhantomData<C>,
}

Expand All @@ -132,6 +134,7 @@ where
netinfo,
subset: SubsetState::Ongoing(cs),
decryption: BTreeMap::default(),
nodes_found_so_far: Default::default(),
_phantom: PhantomData,
})
}
Expand Down Expand Up @@ -219,15 +222,35 @@ where
/// Checks whether the subset has output, and if it does, sends out our decryption shares.
fn process_subset(&mut self, cs_step: cs::Step<N>) -> Result<Step<C, N>> {
let mut step = Step::default();
let mut cs_outputs = step.extend_with(cs_step, |cs_msg| {
let cs_outputs: VecDeque<_> = step.extend_with(cs_step, |cs_msg| {
MessageContent::Subset(cs_msg).with_epoch(self.epoch)
});
if let Some(cs_output) = cs_outputs.pop_front() {
self.subset = SubsetState::Complete(cs_output.keys().cloned().collect());
step.extend(self.send_decryption_shares(cs_output)?);
}
if !cs_outputs.is_empty() {
error!("Multiple outputs from a single Subset instance.");
for cs_output in cs_outputs {
match cs_output {
DemiMarie marked this conversation as resolved.
Show resolved Hide resolved
SubsetOutput::Contribution(k, v) => {
step.extend(self.send_decryption_share(k.clone(), &v)?);
self.nodes_found_so_far.insert(k);
}
SubsetOutput::Done => {
self.subset = SubsetState::Complete(self.nodes_found_so_far.clone());

let faulty_shares: Vec<_> = self
.decryption
.keys()
.filter(|id| !self.nodes_found_so_far.contains(id))
.cloned()
.collect();
for id in faulty_shares {
if let Some(DecryptionState::Ongoing(td)) = self.decryption.remove(&id) {
for id in td.sender_ids() {
let fault_kind = FaultKind::UnexpectedDecryptionShare;
step.fault_log.append(id.clone(), fault_kind);
}
}
}
break;
DemiMarie marked this conversation as resolved.
Show resolved Hide resolved
}
DemiMarie marked this conversation as resolved.
Show resolved Hide resolved
}
}
Ok(step)
}
Expand All @@ -250,49 +273,28 @@ where

/// Given the output of the Subset algorithm, inputs the ciphertexts into the Threshold
/// Decryption instances and sends our own decryption shares.
fn send_decryption_shares(&mut self, cs_output: BTreeMap<N, Vec<u8>>) -> Result<Step<C, N>> {
let mut step = Step::default();
let faulty_shares: Vec<_> = self
.decryption
.keys()
.filter(|id| !cs_output.contains_key(id))
.cloned()
.collect();
for id in faulty_shares {
if let Some(DecryptionState::Ongoing(td)) = self.decryption.remove(&id) {
for id in td.sender_ids() {
let fault_kind = FaultKind::UnexpectedDecryptionShare;
step.fault_log.append(id.clone(), fault_kind);
}
fn send_decryption_share(&mut self, proposer_id: N, v: &[u8]) -> Result<Step<C, N>> {
let ciphertext: Ciphertext = match bincode::deserialize(v) {
Ok(ciphertext) => ciphertext,
Err(err) => {
warn!(
"Cannot deserialize ciphertext from {:?}: {:?}",
proposer_id, err
);
return Ok(Fault::new(proposer_id, FaultKind::InvalidCiphertext).into());
}
}
for (proposer_id, v) in cs_output {
let ciphertext: Ciphertext = match bincode::deserialize(&v) {
Ok(ciphertext) => ciphertext,
Err(err) => {
warn!(
"Cannot deserialize ciphertext from {:?}: {:?}",
proposer_id, err
);
let fault_kind = FaultKind::InvalidCiphertext;
step.fault_log.append(proposer_id, fault_kind);
continue;
}
};
let td_result = match self.decryption.entry(proposer_id.clone()) {
Entry::Occupied(entry) => entry.into_mut(),
Entry::Vacant(entry) => entry.insert(DecryptionState::new(self.netinfo.clone())),
}.set_ciphertext(ciphertext);
match td_result {
Ok(td_step) => step.extend(self.process_decryption(proposer_id, td_step)?),
Err(td::Error::InvalidCiphertext(_)) => {
warn!("Invalid ciphertext from {:?}", proposer_id);
let fault_kind = FaultKind::ShareDecryptionFailed;
step.fault_log.append(proposer_id.clone(), fault_kind);
}
Err(err) => return Err(ErrorKind::ThresholdDecryption(err).into()),
};
let td_result = match self.decryption.entry(proposer_id.clone()) {
Entry::Occupied(entry) => entry.into_mut(),
Entry::Vacant(entry) => entry.insert(DecryptionState::new(self.netinfo.clone())),
}.set_ciphertext(ciphertext);
match td_result {
Ok(td_step) => self.process_decryption(proposer_id, td_step),
Err(td::Error::InvalidCiphertext(_)) => {
warn!("Invalid ciphertext from {:?}", proposer_id);
Ok(Fault::new(proposer_id.clone(), FaultKind::ShareDecryptionFailed).into())
}
Err(err) => Err(ErrorKind::ThresholdDecryption(err).into()),
}
Ok(step)
}
}
93 changes: 65 additions & 28 deletions src/subset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,8 @@ pub struct Subset<N: Rand> {
netinfo: Arc<NetworkInfo<N>>,
broadcast_instances: BTreeMap<N, Broadcast<N>>,
ba_instances: BTreeMap<N, BinaryAgreement<N>>,
broadcast_results: BTreeMap<N, ProposedValue>,
/// `None` means that that item has already been output.
broadcast_results: BTreeMap<N, Option<ProposedValue>>,
ba_results: BTreeMap<N, bool>,
/// Whether the instance has decided on a value.
decided: bool,
Expand All @@ -89,7 +90,7 @@ pub type Step<N> = messaging::Step<Subset<N>>;
impl<N: NodeIdT + Rand> DistAlgorithm for Subset<N> {
type NodeId = N;
type Input = ProposedValue;
type Output = BTreeMap<N, ProposedValue>;
type Output = SubsetOutput<N>;
type Message = Message<N>;
type Error = Error;

Expand Down Expand Up @@ -124,6 +125,12 @@ impl<N: NodeIdT + Rand> DistAlgorithm for Subset<N> {
}
}

#[derive(Debug, Clone, PartialEq, Eq)]
pub enum SubsetOutput<N> {
DemiMarie marked this conversation as resolved.
Show resolved Hide resolved
Contribution(N, Vec<u8>),
Done,
}

impl<N: NodeIdT + Rand> Subset<N> {
pub fn new(netinfo: Arc<NetworkInfo<N>>, session_id: u64) -> Result<Self> {
// Create all broadcast instances.
Expand Down Expand Up @@ -220,7 +227,25 @@ impl<N: NodeIdT + Rand> Subset<N> {
return Ok(step);
}
};
self.broadcast_results.insert(proposer_id.clone(), value);
{
let ba_results = &self.ba_results;
DemiMarie marked this conversation as resolved.
Show resolved Hide resolved

let val_to_insert = if let Some(true) = ba_results.get(proposer_id) {
debug!(" {:?} → {:?}", proposer_id, HexBytes(&value));
step.output
.extend(Some(SubsetOutput::Contribution(proposer_id.clone(), value)));
None
} else {
Some(value)
};

if let Some(inval) = self
.broadcast_results
.insert(proposer_id.clone(), val_to_insert)
{
error!("Duplicate insert in broadcast_results: {:?}", inval)
}
}
let set_binary_agreement_input = |ba: &mut BinaryAgreement<N>| {
if ba.accepts_input() {
ba.handle_input(true)
Expand All @@ -239,7 +264,7 @@ impl<N: NodeIdT + Rand> Subset<N> {
F: FnOnce(&mut BinaryAgreement<N>) -> binary_agreement::Result<binary_agreement::Step<N>>,
{
let mut step = Step::default();
let value = {
let accepted = {
let binary_agreement = self
.ba_instances
.get_mut(proposer_id)
Expand All @@ -252,40 +277,55 @@ impl<N: NodeIdT + Rand> Subset<N> {
f(binary_agreement).map_err(Error::ProcessBinaryAgreement0)?,
to_msg,
);
if let Some(output) = output.into_iter().next() {
output
if let Some(accepted) = output.into_iter().next() {
accepted
} else {
return Ok(step);
}
};
if self.ba_results.insert(proposer_id.clone(), value).is_some() {

// Binary agreement result accepted.
if self
.ba_results
.insert(proposer_id.clone(), accepted)
.is_some()
{
return Err(Error::MultipleBinaryAgreementResults);
}

debug!(
"{:?} Updated Binary Agreement results: {:?}",
self.netinfo.our_id(),
self.ba_results
);

if value && self.count_true() == self.netinfo.num_correct() {
// Upon delivery of value 1 from at least N − f instances of BA, provide
// input 0 to each instance of BA that has not yet been provided input.
for (id, binary_agreement) in &mut self.ba_instances {
if binary_agreement.accepts_input() {
let to_msg = |a_msg| Message::BinaryAgreement(id.clone(), a_msg);
for output in step.extend_with(
binary_agreement
.handle_input(false)
.map_err(Error::ProcessBinaryAgreement1)?,
to_msg,
) {
if self.ba_results.insert(id.clone(), output).is_some() {
return Err(Error::MultipleBinaryAgreementResults);
if accepted {
if self.count_true() == self.netinfo.num_correct() {
// Upon delivery of value 1 from at least N − f instances of BA, provide
// input 0 to each instance of BA that has not yet been provided input.
for (id, binary_agreement) in &mut self.ba_instances {
if binary_agreement.accepts_input() {
let to_msg = |a_msg| Message::BinaryAgreement(id.clone(), a_msg);
for output in step.extend_with(
binary_agreement
.handle_input(false)
.map_err(Error::ProcessBinaryAgreement1)?,
to_msg,
) {
if self.ba_results.insert(id.clone(), output).is_some() {
return Err(Error::MultipleBinaryAgreementResults);
}
}
}
}
}
if let Some(Some(value)) = self.broadcast_results.insert(proposer_id.clone(), None) {
debug!(" {:?} → {:?}", proposer_id, HexBytes(&value));
step.output
.extend(Some(SubsetOutput::Contribution(proposer_id.clone(), value)));
}
}

step.output.extend(self.try_binary_agreement_completion());
Ok(step)
}
Expand All @@ -295,7 +335,7 @@ impl<N: NodeIdT + Rand> Subset<N> {
self.ba_results.values().filter(|v| **v).count()
}

fn try_binary_agreement_completion(&mut self) -> Option<BTreeMap<N, ProposedValue>> {
fn try_binary_agreement_completion(&mut self) -> Option<SubsetOutput<N>> {
if self.decided || self.count_true() < self.netinfo.num_correct() {
return None;
}
Expand All @@ -322,23 +362,20 @@ impl<N: NodeIdT + Rand> Subset<N> {
);

// Results of Broadcast instances in `delivered_1`
let broadcast_results: BTreeMap<N, ProposedValue> = self
let broadcast_results: BTreeSet<&N> = self
.broadcast_results
.iter()
.filter(|(k, _)| delivered_1.contains(k))
.map(|(k, v)| (k.clone(), v.clone()))
.map(|(k, _)| k)
.collect();

if delivered_1.len() == broadcast_results.len() {
debug!(
"{:?} Binary Agreement instances completed:",
self.netinfo.our_id()
);
for (id, result) in &broadcast_results {
debug!(" {:?} → {:?}", id, HexBytes(&result));
}
self.decided = true;
Some(broadcast_results)
Some(SubsetOutput::Done)
DemiMarie marked this conversation as resolved.
Show resolved Hide resolved
} else {
None
}
Expand Down
35 changes: 19 additions & 16 deletions tests/subset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use std::iter::once;
use std::sync::Arc;

use hbbft::messaging::NetworkInfo;
use hbbft::subset::Subset;
use hbbft::subset::{Subset, SubsetOutput};

use network::{Adversary, MessageScheduler, NodeId, SilentAdversary, TestNetwork, TestNode};

Expand All @@ -43,23 +43,26 @@ fn test_subset<A: Adversary<Subset<NodeId>>>(
network.step();
}
// Verify that all instances output the same set.
let mut expected = None;
for node in network.nodes.values() {
if let Some(output) = expected.as_ref() {
assert!(once(output).eq(node.outputs()));
continue;
let outputs = node.outputs();
let mut actual = BTreeMap::default();

for i in outputs {
match i {
SubsetOutput::Contribution(k, v) => {
assert!(actual.insert(k, v).is_none());
}
SubsetOutput::Done => break,
}
}
assert_eq!(outputs.len(), actual.len() + 1);

// The Subset algorithm guarantees that more than two thirds of the proposed elements
// are in the set.
assert!(actual.len() * 3 > inputs.len() * 2);
for (id, value) in actual {
assert_eq!(&inputs[&id], value);
}
assert_eq!(1, node.outputs().len());
expected = Some(node.outputs()[0].clone());
}
let output = expected.unwrap();
assert!(once(&output).eq(network.observer.outputs()));
DemiMarie marked this conversation as resolved.
Show resolved Hide resolved
// The Subset algorithm guarantees that more than two thirds of the proposed elements
// are in the set.
assert!(output.len() * 3 > inputs.len() * 2);
// Verify that the set's elements match the proposed values.
for (id, value) in output {
assert_eq!(inputs[&id], value);
}
}

Expand Down