Skip to content
This repository has been archived by the owner on Jan 13, 2025. It is now read-only.

Multinode #176

Merged
merged 5 commits into from
May 7, 2018
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
143 changes: 112 additions & 31 deletions src/accountant_skel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use serde_json;
use signature::PublicKey;
use std::cmp::max;
use std::collections::VecDeque;
use std::io::sink;
use std::io::{Cursor, Write};
use std::mem::size_of;
use std::net::{SocketAddr, UdpSocket};
Expand Down Expand Up @@ -100,6 +101,8 @@ impl AccountantSkel {
num_events: entry.events.len() as u64,
};
let data = serialize(&Response::EntryInfo(entry_info)).expect("serialize EntryInfo");
trace!("sending {} to {}", data.len(), addr);
//TODO dont do IO here, this needs to be on a separate channel
let res = socket.send_to(&data, addr);
if res.is_err() {
eprintln!("couldn't send response: {:?}", res);
Expand Down Expand Up @@ -182,16 +185,11 @@ impl AccountantSkel {
broadcast: &streamer::BlobSender,
blob_recycler: &packet::BlobRecycler,
writer: &Arc<Mutex<W>>,
exit: Arc<AtomicBool>,
) -> Result<()> {
let mut q = VecDeque::new();
while let Ok(list) = Self::receive_all(&obj, writer) {
trace!("New blobs? {}", list.len());
Self::process_entry_list_into_blobs(&list, blob_recycler, &mut q);
if exit.load(Ordering::Relaxed) {
break;
}
}
let list = Self::receive_all(&obj, writer)?;
trace!("New blobs? {}", list.len());
Self::process_entry_list_into_blobs(&list, blob_recycler, &mut q);
if !q.is_empty() {
broadcast.send(q)?;
}
Expand All @@ -206,14 +204,26 @@ impl AccountantSkel {
writer: Arc<Mutex<W>>,
) -> JoinHandle<()> {
spawn(move || loop {
let e = Self::run_sync(
obj.clone(),
&broadcast,
&blob_recycler,
&writer,
exit.clone(),
);
if e.is_err() && exit.load(Ordering::Relaxed) {
let _ = Self::run_sync(obj.clone(), &broadcast, &blob_recycler, &writer);
if exit.load(Ordering::Relaxed) {
info!("sync_service exiting");
break;
}
})
}

/// Process any Entry items that have been published by the Historian.
/// continuosly broadcast blobs of entries out
fn run_sync_no_broadcast(obj: SharedSkel) -> Result<()> {
Self::receive_all(&obj, &Arc::new(Mutex::new(sink())))?;
Ok(())
}

pub fn sync_no_broadcast_service(obj: SharedSkel, exit: Arc<AtomicBool>) -> JoinHandle<()> {
spawn(move || loop {
let _ = Self::run_sync_no_broadcast(obj.clone());
if exit.load(Ordering::Relaxed) {
info!("sync_no_broadcast_service exiting");
break;
}
})
Expand All @@ -228,7 +238,9 @@ impl AccountantSkel {
match msg {
Request::GetBalance { key } => {
let val = self.acc.lock().unwrap().get_balance(&key);
Some((Response::Balance { key, val }, rsp_addr))
let rsp = (Response::Balance { key, val }, rsp_addr);
info!("Response::Balance {:?}", rsp);
Some(rsp)
}
Request::Transaction(_) => unreachable!(),
Request::Subscribe { subscriptions } => {
Expand All @@ -247,10 +259,10 @@ impl AccountantSkel {
fn recv_batch(recvr: &streamer::PacketReceiver) -> Result<Vec<SharedPackets>> {
let timer = Duration::new(1, 0);
let msgs = recvr.recv_timeout(timer)?;
trace!("got msgs");
debug!("got msgs");
let mut batch = vec![msgs];
while let Ok(more) = recvr.try_recv() {
trace!("got more msgs");
debug!("got more msgs");
batch.push(more);
}
info!("batch len {}", batch.len());
Expand All @@ -275,6 +287,7 @@ impl AccountantSkel {
) -> Result<()> {
let batch = Self::recv_batch(recvr)?;
let verified_batches = Self::verify_batch(batch);
debug!("verified batches: {}", verified_batches.len());
for xs in verified_batches {
sendr.send(xs)?;
}
Expand Down Expand Up @@ -315,8 +328,9 @@ impl AccountantSkel {
&self,
req_vers: Vec<(Request, SocketAddr, u8)>,
) -> Result<Vec<(Response, SocketAddr)>> {
trace!("partitioning");
debug!("partitioning");
let (trs, reqs) = Self::partition_requests(req_vers);
debug!("trs: {} reqs: {}", trs.len(), reqs.len());

// Process the transactions in parallel and then log the successful ones.
for result in self.acc.lock().unwrap().process_verified_transactions(trs) {
Expand All @@ -328,15 +342,21 @@ impl AccountantSkel {
}
}

debug!("processing verified");

// Let validators know they should not attempt to process additional
// transactions in parallel.
self.historian_input.lock().unwrap().send(Signal::Tick)?;

debug!("after historian_input");

// Process the remaining requests serially.
let rsps = reqs.into_iter()
.filter_map(|(req, rsp_addr)| self.process_request(req, rsp_addr))
.collect();

debug!("returning rsps");

Ok(rsps)
}

Expand Down Expand Up @@ -377,7 +397,7 @@ impl AccountantSkel {
) -> Result<()> {
let timer = Duration::new(1, 0);
let mms = verified_receiver.recv_timeout(timer)?;
trace!("got some messages: {}", mms.len());
debug!("got some messages: {}", mms.len());
for (msgs, vers) in mms {
let reqs = Self::deserialize_packets(&msgs.read().unwrap());
let req_vers = reqs.into_iter()
Expand All @@ -389,18 +409,18 @@ impl AccountantSkel {
v
})
.collect();
trace!("process_packets");
debug!("process_packets");
let rsps = obj.process_packets(req_vers)?;
trace!("done process_packets");
debug!("done process_packets");
let blobs = Self::serialize_responses(rsps, blob_recycler)?;
trace!("sending blobs: {}", blobs.len());
if !blobs.is_empty() {
info!("process: sending blobs: {}", blobs.len());
//don't wake up the other side if there is nothing
responder_sender.send(blobs)?;
}
packet_recycler.recycle(msgs);
}
trace!("done responding");
debug!("done responding");
Ok(())
}
/// Process verified blobs, already in order
Expand All @@ -412,6 +432,7 @@ impl AccountantSkel {
) -> Result<()> {
let timer = Duration::new(1, 0);
let blobs = verified_receiver.recv_timeout(timer)?;
trace!("replicating blobs {}", blobs.len());
for msgs in &blobs {
let blob = msgs.read().unwrap();
let entries: Vec<Entry> = deserialize(&blob.data()[..blob.meta.size]).unwrap();
Expand Down Expand Up @@ -541,10 +562,12 @@ impl AccountantSkel {
obj: &SharedSkel,
me: ReplicatedData,
gossip: UdpSocket,
serve: UdpSocket,
replicate: UdpSocket,
leader: ReplicatedData,
exit: Arc<AtomicBool>,
) -> Result<Vec<JoinHandle<()>>> {
//replicate pipeline
let crdt = Arc::new(RwLock::new(Crdt::new(me)));
crdt.write().unwrap().set_leader(leader.id);
crdt.write().unwrap().insert(leader);
Expand Down Expand Up @@ -580,27 +603,84 @@ impl AccountantSkel {
//then sent to the window, which does the erasure coding reconstruction
let t_window = streamer::window(
exit.clone(),
crdt,
crdt.clone(),
blob_recycler.clone(),
blob_receiver,
window_sender,
retransmit_sender,
);

let skel = obj.clone();
let t_server = spawn(move || loop {
let s_exit = exit.clone();
let t_replicator = spawn(move || loop {
let e = Self::replicate_state(&skel, &window_receiver, &blob_recycler);
if e.is_err() && exit.load(Ordering::Relaxed) {
if e.is_err() && s_exit.load(Ordering::Relaxed) {
break;
}
});

//serve pipeline
// make sure we are on the same interface
let mut local = serve.local_addr()?;
local.set_port(0);
let respond_socket = UdpSocket::bind(local.clone())?;

let packet_recycler = packet::PacketRecycler::default();
let blob_recycler = packet::BlobRecycler::default();
let (packet_sender, packet_receiver) = channel();
let t_packet_receiver =
streamer::receiver(serve, exit.clone(), packet_recycler.clone(), packet_sender)?;
let (responder_sender, responder_receiver) = channel();
let t_responder = streamer::responder(
respond_socket,
exit.clone(),
blob_recycler.clone(),
responder_receiver,
);
let (verified_sender, verified_receiver) = channel();

let exit_ = exit.clone();
let t_verifier = spawn(move || loop {
let e = Self::verifier(&packet_receiver, &verified_sender);
if e.is_err() && exit_.load(Ordering::Relaxed) {
trace!("verifier exiting");
break;
}
});

let t_sync = Self::sync_no_broadcast_service(obj.clone(), exit.clone());

let skel = obj.clone();
let s_exit = exit.clone();
let t_server = spawn(move || loop {
let e = Self::process(
&mut skel.clone(),
&verified_receiver,
&responder_sender,
&packet_recycler,
&blob_recycler,
);
if e.is_err() {
if s_exit.load(Ordering::Relaxed) {
break;
}
}
});

Ok(vec![
//replicate threads
t_blob_receiver,
t_retransmit,
t_window,
t_server,
t_replicator,
t_gossip,
t_listen,
//serve threads
t_packet_receiver,
t_responder,
t_server,
t_verifier,
t_sync,
])
}
}
Expand Down Expand Up @@ -769,7 +849,7 @@ mod tests {
tr2.data.plan = Plan::new_payment(502, bob_pubkey);
let _sig = acc_stub.transfer_signed(tr2).unwrap();

assert_eq!(acc_stub.get_balance(&bob_pubkey).wait().unwrap(), 500);
assert_eq!(acc_stub.get_balance(&bob_pubkey).unwrap(), 500);
trace!("exiting");
exit.store(true, Ordering::Relaxed);
trace!("joining threads");
Expand Down Expand Up @@ -797,7 +877,7 @@ mod tests {
fn test_replicate() {
logger::setup();
let (leader_data, leader_gossip, _, leader_serve) = test_node();
let (target1_data, target1_gossip, target1_replicate, _) = test_node();
let (target1_data, target1_gossip, target1_replicate, target1_serve) = test_node();
let (target2_data, target2_gossip, target2_replicate, _) = test_node();
let exit = Arc::new(AtomicBool::new(false));

Expand Down Expand Up @@ -851,6 +931,7 @@ mod tests {
&acc,
target1_data,
target1_gossip,
target1_serve,
target1_replicate,
leader_data,
exit.clone(),
Expand Down
Loading