Skip to content
This repository has been archived by the owner on Jan 13, 2025. It is now read-only.

[wip] Broadcast #149

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
152 changes: 149 additions & 3 deletions src/accountant_skel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,12 @@ use std::io::Write;
use std::net::{SocketAddr, UdpSocket};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::{channel, Receiver, Sender};
use std::sync::{Arc, Mutex};
use std::sync::{Arc, Mutex, RwLock};
use std::thread::{spawn, JoinHandle};
use std::time::Duration;
use streamer;
use transaction::Transaction;
use subscribers::Subscribers;

pub struct AccountantSkel<W: Write + Send + 'static> {
acc: Accountant,
Expand Down Expand Up @@ -245,8 +246,32 @@ impl<W: Write + Send + 'static> AccountantSkel<W> {
}
Ok(())
}
/// Process verified blobs, already in order
/// Respond with a signed hash of the state
fn replicate_state(
obj: &Arc<Mutex<AccountantSkel<W>>>,
verified_receiver: &streamer::BlobReceiver,
blob_recycler: &packet::BlobRecycler,
) -> Result<()> {
let timer = Duration::new(1, 0);
let blobs = verified_receiver.recv_timeout(timer)?;
for msgs in blobs {
let entries:Vec<Entry> = {
let m = msgs.read().unwrap();
let r = deserialize(&m.data[..m.meta.size])?;
r
};
for e in entries {
obj.lock().unwrap().acc.process_verified_events(e.events)?;
}
blob_recycler.recycle(msgs);
}
Ok(())
}


/// Create a UDP microservice that forwards messages the given AccountantSkel.
/// This service is the network leader
/// Set `exit` to shutdown its threads.
pub fn serve(
obj: &Arc<Mutex<AccountantSkel<W>>>,
Expand Down Expand Up @@ -279,7 +304,7 @@ impl<W: Write + Send + 'static> AccountantSkel<W> {

let skel = obj.clone();
let t_server = spawn(move || loop {
let e = AccountantSkel::process(
let e = Self::process(
&skel,
&verified_receiver,
&blob_sender,
Expand All @@ -292,6 +317,71 @@ impl<W: Write + Send + 'static> AccountantSkel<W> {
});
Ok(vec![t_receiver, t_responder, t_server, t_verifier])
}

/// This service receives messages from a leader in the network and processes the transactions
/// on the accountant state.
/// # Arguments
/// * `obj` - The accountant state.
/// * `rsubs` - The subscribers.
/// * `exit` - The exit signal.
/// # Remarks
/// The pipeline is constructed as follows:
/// 1. receive blobs from the network, these are out of order
/// 2. verify blobs, PoH, signatures (TODO)
/// 3. reconstruct contiguous window
/// a. order the blobs
/// b. use erasure coding to reconstruct missing blobs
/// c. ask the network for missing blobs, if erasure coding is insufficient
/// d. make sure that the blobs PoH sequences connect (TODO)
/// 4. process the transaction state machine
/// 5. respond with the hash of the state back to the leader
pub fn replicate(
obj: &Arc<Mutex<AccountantSkel<W>>>,
rsubs: Subscribers,
exit: Arc<AtomicBool>,
) -> Result<Vec<JoinHandle<()>>> {
let read = UdpSocket::bind(rsubs.me.addr)?;
// make sure we are on the same interface
let mut local = read.local_addr()?;
local.set_port(0);
let write = UdpSocket::bind(local)?;

let blob_recycler = packet::BlobRecycler::default();
let (blob_sender, blob_receiver) = channel();
let t_blob_receiver =
streamer::blob_receiver(exit.clone(), blob_recycler.clone(), read, blob_sender)?;
let (window_sender, window_receiver) = channel();
let (retransmit_sender, retransmit_receiver) = channel();

let subs = Arc::new(RwLock::new(rsubs));
let t_retransmit = streamer::retransmitter(
write,
exit.clone(),
subs.clone(),
blob_recycler.clone(),
retransmit_receiver,
);
//TODO
//the packets comming out of blob_receiver need to be sent to the GPU and verified
//then sent to the window, which does the erasure coding reconstruction
let t_window = streamer::window(
exit.clone(),
subs,
blob_recycler.clone(),
blob_receiver,
window_sender,
retransmit_sender,
);

let skel = obj.clone();
let t_server = spawn(move || loop {
let e = Self::replicate_state(&skel, &window_receiver, &blob_recycler);
if e.is_err() && exit.load(Ordering::Relaxed) {
break;
}
});
Ok(vec![t_blob_receiver, t_retransmit, t_window, t_server])
}
}

#[cfg(test)]
Expand Down Expand Up @@ -319,7 +409,7 @@ mod tests {
use accountant_skel::{to_packets, Request};
use bincode::serialize;
use ecdsa;
use packet::{PacketRecycler, NUM_PACKETS};
use packet::{BlobRecycler, PacketRecycler, NUM_PACKETS};
use transaction::{memfind, test_tx};

use accountant::Accountant;
Expand All @@ -339,6 +429,12 @@ mod tests {
use std::time::Duration;
use transaction::Transaction;

use subscribers::{Node, Subscribers};
use streamer;
use std::sync::mpsc::channel;
use std::collections::VecDeque;
use packet::{PACKET_DATA_SIZE};

#[test]
fn test_layout() {
let tr = test_tx();
Expand Down Expand Up @@ -443,6 +539,56 @@ mod tests {
exit.store(true, Ordering::Relaxed);
}

#[test]
fn test_replicate() {
let read = UdpSocket::bind("127.0.0.1:0").expect("bind");
let addr = read.local_addr().unwrap();
let send = UdpSocket::bind("127.0.0.1:0").expect("bind");
let exit = Arc::new(AtomicBool::new(false));

let node_me = Node::default();
let node_leader = Node::new([0; 8], 0, send.local_addr().unwrap());
let subs = Subscribers::new(node_me, node_leader, &[]);

let recv_recycler = PacketRecycler::default();
let resp_recycler = BlobRecycler::default();
let (s_reader, r_reader) = channel();
let t_receiver = streamer::receiver(read, exit.clone(), recv_recycler.clone(), s_reader).unwrap();
let (s_responder, r_responder) = channel();
let t_responder = streamer::responder(send, exit.clone(), resp_recycler.clone(), r_responder);

let alice = Mint::new(10_000);
let acc = Accountant::new(&alice);
let historian = Historian::new(&alice.last_id(), Some(30));
let acc = Arc::new(Mutex::new(AccountantSkel::new(
acc,
alice.last_id(),
sink(),
historian,
)));

let threads = AccountantSkel::replicate(&acc, subs, exit.clone()).unwrap();

let mut msgs = VecDeque::new();
for i in 0..10 {
let b = resp_recycler.allocate();
let b_ = b.clone();
let mut w = b.write().unwrap();
w.data[0] = i as u8;
w.meta.size = PACKET_DATA_SIZE;
w.meta.set_addr(&addr);
msgs.push_back(b_);
}
s_responder.send(msgs).expect("send");

exit.store(true, Ordering::Relaxed);
t_receiver.join().expect("join");
t_responder.join().expect("join");
for t in threads {
t.join().expect("join");
}
}

}

#[cfg(all(feature = "unstable", test))]
Expand Down
2 changes: 1 addition & 1 deletion src/ecdsa.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,8 +135,8 @@ mod tests {
use ecdsa;
use packet::{Packet, Packets, SharedPackets};
use std::sync::RwLock;
use transaction::test_tx;
use transaction::Transaction;
use transaction::test_tx;

fn make_packet_from_transaction(tr: Transaction) -> Packet {
let tx = serialize(&Request::Transaction(tr)).unwrap();
Expand Down
2 changes: 1 addition & 1 deletion src/hash.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
//! The `hash` module provides functions for creating SHA-256 hashes.

use generic_array::typenum::U32;
use generic_array::GenericArray;
use generic_array::typenum::U32;
use sha2::{Digest, Sha256};

pub type Hash = GenericArray<u8, U32>;
Expand Down
2 changes: 1 addition & 1 deletion src/mint.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
//! The `mint` module is a library for generating the chain's genesis block.

use entry::create_entry;
use entry::Entry;
use entry::create_entry;
use event::Event;
use hash::{hash, Hash};
use ring::rand::SystemRandom;
Expand Down
9 changes: 8 additions & 1 deletion src/result.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
//! The `result` module exposes a Result type that propagates one of many different Error types.

use accountant;
use bincode;
use serde_json;
use std;
Expand All @@ -14,6 +15,7 @@ pub enum Error {
RecvError(std::sync::mpsc::RecvError),
RecvTimeoutError(std::sync::mpsc::RecvTimeoutError),
Serialize(std::boxed::Box<bincode::ErrorKind>),
AccountingError(accountant::AccountingError),
SendError,
Services,
}
Expand All @@ -30,6 +32,11 @@ impl std::convert::From<std::sync::mpsc::RecvTimeoutError> for Error {
Error::RecvTimeoutError(e)
}
}
impl std::convert::From<accountant::AccountingError> for Error {
fn from(e: accountant::AccountingError) -> Error {
Error::AccountingError(e)
}
}
impl<T> std::convert::From<std::sync::mpsc::SendError<T>> for Error {
fn from(_e: std::sync::mpsc::SendError<T>) -> Error {
Error::SendError
Expand Down Expand Up @@ -70,9 +77,9 @@ mod tests {
use std::io;
use std::io::Write;
use std::net::SocketAddr;
use std::sync::mpsc::channel;
use std::sync::mpsc::RecvError;
use std::sync::mpsc::RecvTimeoutError;
use std::sync::mpsc::channel;
use std::thread;

fn addr_parse_error() -> Result<SocketAddr> {
Expand Down
2 changes: 1 addition & 1 deletion src/signature.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
//! The `signature` module provides functionality for public, and private keys.

use generic_array::typenum::{U32, U64};
use generic_array::GenericArray;
use generic_array::typenum::{U32, U64};
use ring::signature::Ed25519KeyPair;
use ring::{rand, signature};
use untrusted;
Expand Down
Loading