Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Extract SBV broadcast from agreement. #182

Merged
merged 1 commit into from
Aug 2, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 67 additions & 0 deletions src/agreement/bool_multimap.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
use std::collections::{btree_set, BTreeSet};
use std::ops::{Index, IndexMut};

/// A map from `bool` to `BTreeSet<N>`.
#[derive(Debug, Clone)]
pub struct BoolMultimap<N>([BTreeSet<N>; 2]);

impl<N: Ord> Default for BoolMultimap<N> {
fn default() -> Self {
BoolMultimap([BTreeSet::default(), BTreeSet::default()])
}
}

impl<N: Ord> Index<bool> for BoolMultimap<N> {
type Output = BTreeSet<N>;

fn index(&self, index: bool) -> &BTreeSet<N> {
&self.0[if index { 1 } else { 0 }]
}
}

impl<N: Ord> IndexMut<bool> for BoolMultimap<N> {
fn index_mut(&mut self, index: bool) -> &mut BTreeSet<N> {
&mut self.0[if index { 1 } else { 0 }]
}
}

impl<'a, N: Ord> IntoIterator for &'a BoolMultimap<N> {
type Item = (bool, &'a N);
type IntoIter = Iter<'a, N>;

fn into_iter(self) -> Iter<'a, N> {
Iter::new(self)
}
}

pub struct Iter<'a, N: 'a> {
key: bool,
set_iter: btree_set::Iter<'a, N>,
map: &'a BoolMultimap<N>,
}

impl<'a, N: 'a + Ord> Iter<'a, N> {
fn new(map: &'a BoolMultimap<N>) -> Self {
Iter {
key: false,
set_iter: map[false].iter(),
map,
}
}
}

impl<'a, N: 'a + Ord> Iterator for Iter<'a, N> {
type Item = (bool, &'a N);

fn next(&mut self) -> Option<(bool, &'a N)> {
if let Some(n) = self.set_iter.next() {
Some((self.key, n))
} else if self.key {
None
} else {
self.key = true;
self.set_iter = self.map[true].iter();
self.next()
}
}
}
202 changes: 61 additions & 141 deletions src/agreement/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,18 +63,21 @@
//! * After _f + 1_ nodes have sent us their coin shares, we receive the coin output and assign it
//! to `s`.

mod bool_multimap;
pub mod bool_set;
mod sbv_broadcast;

use rand;
use std::collections::{BTreeMap, BTreeSet};
use std::collections::BTreeMap;
use std::fmt::Debug;
use std::sync::Arc;

use itertools::Itertools;

use self::bool_multimap::BoolMultimap;
use self::sbv_broadcast::SbvBroadcast;
use agreement::bool_set::BoolSet;
use common_coin::{self, CommonCoin, CommonCoinMessage};
use fault_log::{Fault, FaultKind};
use messaging::{self, DistAlgorithm, NetworkInfo, Target};

/// An agreement error.
Expand All @@ -95,10 +98,8 @@ pub type Result<T> = ::std::result::Result<T, Error>;

#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)]
pub enum AgreementContent {
/// `BVal` message.
BVal(bool),
/// `Aux` message.
Aux(bool),
/// Synchronized Binary Value Broadcast message.
SbvBroadcast(sbv_broadcast::Message),
/// `Conf` message.
Conf(BoolSet),
/// `Term` message.
Expand Down Expand Up @@ -137,13 +138,10 @@ pub struct AgreementMessage {
// with no replacement in sight.
impl rand::Rand for AgreementContent {
fn rand<R: rand::Rng>(rng: &mut R) -> Self {
let message_type = *rng
.choose(&["bval", "aux", "conf", "term", "coin"])
.unwrap();
let message_type = *rng.choose(&["sbvb", "conf", "term", "coin"]).unwrap();

match message_type {
"bval" => AgreementContent::BVal(rand::random()),
"aux" => AgreementContent::Aux(rand::random()),
"sbvb" => AgreementContent::SbvBroadcast(rand::random()),
"conf" => AgreementContent::Conf(rand::random()),
"term" => AgreementContent::Term(rand::random()),
"coin" => AgreementContent::Coin(Box::new(rand::random())),
Expand Down Expand Up @@ -189,19 +187,13 @@ pub struct Agreement<NodeUid> {
proposer_id: NodeUid,
/// Agreement algorithm epoch.
epoch: u32,
/// Bin values. Reset on every epoch update.
bin_values: BoolSet,
/// Values received in `BVal` messages. Reset on every epoch update.
received_bval: BTreeMap<bool, BTreeSet<NodeUid>>,
/// Sent `BVal` values. Reset on every epoch update.
sent_bval: BoolSet,
/// Values received in `Aux` messages. Reset on every epoch update.
received_aux: BTreeMap<bool, BTreeSet<NodeUid>>,
/// This epoch's Synchronized Binary Value Broadcast instance.
sbv_broadcast: SbvBroadcast<NodeUid>,
/// Received `Conf` messages. Reset on every epoch update.
received_conf: BTreeMap<NodeUid, BoolSet>,
/// Received `Term` messages. Kept throughout epoch updates. These count as `BVal`, `Aux` and
/// `Conf` messages for all future epochs.
received_term: BTreeMap<bool, BTreeSet<NodeUid>>,
received_term: BoolMultimap<NodeUid>,
/// The estimate of the decision value in the current epoch.
estimated: Option<bool>,
/// A permanent, latching copy of the output value. This copy is required because `output` can
Expand Down Expand Up @@ -271,16 +263,13 @@ impl<NodeUid: Clone + Debug + Ord> Agreement<NodeUid> {
return Err(Error::UnknownProposer);
}
Ok(Agreement {
netinfo,
netinfo: netinfo.clone(),
session_id,
proposer_id,
epoch: 0,
bin_values: bool_set::NONE,
received_bval: BTreeMap::new(),
sent_bval: bool_set::NONE,
received_aux: BTreeMap::new(),
sbv_broadcast: SbvBroadcast::new(netinfo),
received_conf: BTreeMap::new(),
received_term: BTreeMap::new(),
received_term: BoolMultimap::default(),
estimated: None,
decision: None,
incoming_queue: BTreeMap::new(),
Expand All @@ -297,8 +286,8 @@ impl<NodeUid: Clone + Debug + Ord> Agreement<NodeUid> {
// Set the initial estimated value to the input value.
self.estimated = Some(input);
debug!("{:?}/{:?} Input {}", self.our_id(), self.proposer_id, input);
// Record the input value as sent.
self.send_bval(input)
let sbvb_step = self.sbv_broadcast.input(input)?;
self.handle_sbvb_step(sbvb_step)
}

/// Acceptance check to be performed before setting the input value.
Expand All @@ -313,65 +302,50 @@ impl<NodeUid: Clone + Debug + Ord> Agreement<NodeUid> {
content: AgreementContent,
) -> Result<Step<NodeUid>> {
match content {
AgreementContent::BVal(b) => self.handle_bval(sender_id, b),
AgreementContent::Aux(b) => self.handle_aux(sender_id, b),
AgreementContent::SbvBroadcast(msg) => self.handle_sbv_broadcast(sender_id, msg),
AgreementContent::Conf(v) => self.handle_conf(sender_id, v),
AgreementContent::Term(v) => self.handle_term(sender_id, v),
AgreementContent::Coin(msg) => self.handle_coin(sender_id, *msg),
}
}

/// Handles a `BVal(b)` message.
///
/// Upon receiving _f + 1_ `BVal(b)`, multicast `BVal(b)`. Upon receiving _2 f + 1_ `BVal(b)`,
/// update `bin_values`. When `bin_values` gets its first entry, multicast `Aux(b)`. If the
/// condition is met, starts the `Conf` round or decides. (See `on_bval_or_aux`.)
fn handle_bval(&mut self, sender_id: &NodeUid, b: bool) -> Result<Step<NodeUid>> {
let count_bval = {
let entry = self.received_bval.entry(b).or_insert_with(BTreeSet::new);
if !entry.insert(sender_id.clone()) {
return Ok(Fault::new(sender_id.clone(), FaultKind::DuplicateBVal).into());
}
entry.len()
};

let mut step = Step::default();

if count_bval == 2 * self.netinfo.num_faulty() + 1 {
self.bin_values.insert(b);

if self.bin_values != bool_set::BOTH {
step.extend(self.send(AgreementContent::Aux(b))?) // First entry: send `Aux(b)`.
} else {
step.extend(self.on_bval_or_aux()?); // Otherwise just check for `Conf` condition.
}
}

if count_bval == self.netinfo.num_faulty() + 1 {
step.extend(self.send_bval(b)?);
}

Ok(step)
/// Handles a Synchroniced Binary Value Broadcast message.
fn handle_sbv_broadcast(
&mut self,
sender_id: &NodeUid,
msg: sbv_broadcast::Message,
) -> Result<Step<NodeUid>> {
let sbvb_step = self.sbv_broadcast.handle_message(sender_id, msg)?;
self.handle_sbvb_step(sbvb_step)
}

/// Handles an `Aux` message.
///
/// If the condition is met, starts the `Conf` round or decides. (See `on_bval_or_aux`.)
fn handle_aux(&mut self, sender_id: &NodeUid, b: bool) -> Result<Step<NodeUid>> {
// Perform the `Aux` message round only if a `Conf` round hasn't started yet.
/// Handles a Synchronized Binary Value Broadcast step. On output, starts the `Conf` round or
/// decides.
fn handle_sbvb_step(
&mut self,
sbvb_step: sbv_broadcast::Step<NodeUid>,
) -> Result<Step<NodeUid>> {
let mut step = Step::default();
let output = step.extend_with(sbvb_step, |msg| {
AgreementContent::SbvBroadcast(msg).with_epoch(self.epoch)
});
if self.conf_values.is_some() {
return Ok(Step::default());
return Ok(step); // The `Conf` round has already started.
}
// TODO: Detect duplicate `Aux` messages and report faults.
if !self
.received_aux
.entry(b)
.or_insert_with(BTreeSet::new)
.insert(sender_id.clone())
{
return Ok(Fault::new(sender_id.clone(), FaultKind::DuplicateAux).into());
if let Some(aux_vals) = output.into_iter().next() {
// Execute the Common Coin schedule `false, true, get_coin(), false, true, get_coin(), ...`
match self.coin_state {
CoinState::Decided(_) => {
self.conf_values = Some(aux_vals);
step.extend(self.try_update_epoch()?)
}
CoinState::InProgress(_) => {
// Start the `Conf` message round.
step.extend(self.send_conf(aux_vals)?)
}
}
}
self.on_bval_or_aux()
Ok(step)
}

/// Handles a `Conf` message. When _N - f_ `Conf` messages with values in `bin_values` have
Expand All @@ -385,19 +359,17 @@ impl<NodeUid: Clone + Debug + Ord> Agreement<NodeUid> {
/// _f_ such messages with the same value from different nodes, performs expedite termination:
/// decides on `v`, broadcasts `Term(v)` and terminates the instance.
fn handle_term(&mut self, sender_id: &NodeUid, b: bool) -> Result<Step<NodeUid>> {
self.received_term
.entry(b)
.or_insert_with(BTreeSet::new)
.insert(sender_id.clone());
self.received_term[b].insert(sender_id.clone());
// Check for the expedite termination condition.
if self.decision.is_some() {
Ok(Step::default())
} else if self.received_term[&b].len() > self.netinfo.num_faulty() {
} else if self.received_term[b].len() > self.netinfo.num_faulty() {
Ok(self.decide(b))
} else {
// Otherwise handle the `Term` as a `BVal`, `Aux` and `Conf`.
let mut step = self.handle_bval(sender_id, b)?;
step.extend(self.handle_aux(sender_id, b)?);
let mut sbvb_step = self.sbv_broadcast.handle_bval(sender_id, b)?;
sbvb_step.extend(self.sbv_broadcast.handle_aux(sender_id, b)?);
let mut step = self.handle_sbvb_step(sbvb_step)?;
step.extend(self.handle_conf(sender_id, BoolSet::from(b))?);
Ok(step)
}
Expand All @@ -419,35 +391,6 @@ impl<NodeUid: Clone + Debug + Ord> Agreement<NodeUid> {
self.on_coin_step(coin_step)
}

/// Checks whether there are _N - f_ `Aux` messages with values in `bin_values`. If so, starts
/// the `Conf` round or decides.
fn on_bval_or_aux(&mut self) -> Result<Step<NodeUid>> {
if self.bin_values == bool_set::NONE || self.conf_values.is_some() {
return Ok(Step::default());
}
let (aux_count, aux_vals) = self.count_aux();
if aux_count < self.netinfo.num_correct() {
return Ok(Step::default());
}
// Execute the Common Coin schedule `false, true, get_coin(), false, true, get_coin(), ...`
match self.coin_state {
CoinState::Decided(_) => {
self.conf_values = Some(aux_vals);
self.try_update_epoch()
}
CoinState::InProgress(_) => self.send_conf(aux_vals), // Start the `Conf` message round.
}
}

/// Multicasts a `BVal(b)` message, and handles it.
fn send_bval(&mut self, b: bool) -> Result<Step<NodeUid>> {
// Record the value `b` as sent. If it was already there, don't send it again.
if !self.sent_bval.insert(b) {
return Ok(Step::default());
}
self.send(AgreementContent::BVal(b))
}

/// Multicasts a `Conf(values)` message, and handles it.
fn send_conf(&mut self, values: BoolSet) -> Result<Step<NodeUid>> {
if self.conf_values.is_some() {
Expand Down Expand Up @@ -583,40 +526,16 @@ impl<NodeUid: Clone + Debug + Ord> Agreement<NodeUid> {

/// Counts the number of received `Conf` messages with values in `bin_values`.
fn count_conf(&self) -> usize {
let is_bin_val = |conf: &&BoolSet| conf.is_subset(self.bin_values);
let is_bin_val = |conf: &&BoolSet| conf.is_subset(self.sbv_broadcast.bin_values());
self.received_conf.values().filter(is_bin_val).count()
}

/// The count of `Aux` messages such that the set of values carried by those messages is a
/// subset of `bin_values`.
///
/// In general, we can't expect every good node to send the same `Aux` value, so waiting for
/// _N - f_ agreeing messages would not always terminate. We can, however, expect every good
/// node to send an `Aux` value that will eventually end up in our `bin_values`.
fn count_aux(&self) -> (usize, BoolSet) {
let mut values = bool_set::NONE;
let mut count = 0;
for b in self.bin_values {
let b_count = self.received_aux.get(&b).map_or(0, BTreeSet::len);
if b_count > 0 {
values.insert(b);
count += b_count;
}
}
(count, values)
}

/// Increments the epoch, sets the new estimate and handles queued messages.
fn update_epoch(&mut self, b: bool) -> Result<Step<NodeUid>> {
self.bin_values = bool_set::NONE;
self.received_bval = self.received_term.clone();
self.sent_bval = bool_set::NONE;
self.received_aux = self.received_term.clone();
self.sbv_broadcast.clear(&self.received_term);
self.received_conf.clear();
for (v, ids) in &self.received_term {
for id in ids {
self.received_conf.insert(id.clone(), BoolSet::from(*v));
}
for (v, id) in &self.received_term {
self.received_conf.insert(id.clone(), BoolSet::from(v));
}
self.conf_values = None;
self.epoch += 1;
Expand All @@ -630,7 +549,8 @@ impl<NodeUid: Clone + Debug + Ord> Agreement<NodeUid> {
);

self.estimated = Some(b);
let mut step = self.send_bval(b)?;
let sbvb_step = self.sbv_broadcast.input(b)?;
let mut step = self.handle_sbvb_step(sbvb_step)?;
let queued_msgs = Itertools::flatten(self.incoming_queue.remove(&self.epoch).into_iter());
for (sender_id, content) in queued_msgs {
step.extend(self.handle_message_content(&sender_id, content)?);
Expand Down
Loading