Skip to content
This repository has been archived by the owner on Jan 13, 2025. It is now read-only.

Commit

Permalink
Moved deserialization of blobs to entries from replicate_stage to win…
Browse files Browse the repository at this point in the history
…dow_service (#1287)
  • Loading branch information
carllin authored Sep 21, 2018
1 parent a9355c3 commit c50ac96
Show file tree
Hide file tree
Showing 5 changed files with 90 additions and 46 deletions.
4 changes: 4 additions & 0 deletions src/entry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,12 @@ use rayon::prelude::*;
use signature::Pubkey;
use std::io::Cursor;
use std::net::SocketAddr;
use std::sync::mpsc::{Receiver, Sender};
use transaction::Transaction;

pub type EntrySender = Sender<Vec<Entry>>;
pub type EntryReceiver = Receiver<Vec<Entry>>;

/// Each Entry contains three pieces of data. The `num_hashes` field is the number
/// of hashes performed since the previous entry. The `id` field is the result
/// of hashing `id` from the previous entry `num_hashes` times. The `transactions`
Expand Down
16 changes: 8 additions & 8 deletions src/replicate_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@
use bank::Bank;
use counter::Counter;
use crdt::Crdt;
use ledger::{reconstruct_entries_from_blobs, Block, LedgerWriter};
use entry::EntryReceiver;
use ledger::{Block, LedgerWriter};
use log::Level;
use result::{Error, Result};
use service::Service;
Expand All @@ -16,7 +17,7 @@ use std::sync::mpsc::RecvTimeoutError;
use std::sync::{Arc, RwLock};
use std::thread::{self, Builder, JoinHandle};
use std::time::Duration;
use streamer::{responder, BlobReceiver};
use streamer::responder;
use vote_stage::VoteStage;

pub struct ReplicateStage {
Expand All @@ -29,16 +30,15 @@ impl ReplicateStage {
fn replicate_requests(
bank: &Arc<Bank>,
crdt: &Arc<RwLock<Crdt>>,
window_receiver: &BlobReceiver,
window_receiver: &EntryReceiver,
ledger_writer: Option<&mut LedgerWriter>,
) -> Result<()> {
let timer = Duration::new(1, 0);
//coalesce all the available blobs into a single vote
let mut blobs = window_receiver.recv_timeout(timer)?;
//coalesce all the available entries into a single vote
let mut entries = window_receiver.recv_timeout(timer)?;
while let Ok(mut more) = window_receiver.try_recv() {
blobs.append(&mut more);
entries.append(&mut more);
}
let entries = reconstruct_entries_from_blobs(blobs)?;

let res = bank.process_entries(entries.clone());

Expand All @@ -64,7 +64,7 @@ impl ReplicateStage {
keypair: Arc<Keypair>,
bank: Arc<Bank>,
crdt: Arc<RwLock<Crdt>>,
window_receiver: BlobReceiver,
window_receiver: EntryReceiver,
ledger_path: Option<&str>,
exit: Arc<AtomicBool>,
) -> Self {
Expand Down
11 changes: 6 additions & 5 deletions src/retransmit_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,14 @@
use counter::Counter;
use crdt::Crdt;
use entry::Entry;
use log::Level;
use result::{Error, Result};
use service::Service;
use std::net::UdpSocket;
use std::sync::atomic::AtomicUsize;
use std::sync::mpsc::channel;
use std::sync::mpsc::RecvTimeoutError;
use std::sync::mpsc::{channel, Receiver};
use std::sync::{Arc, RwLock};
use std::thread::{self, Builder, JoinHandle};
use std::time::Duration;
Expand Down Expand Up @@ -68,23 +69,23 @@ impl RetransmitStage {
retransmit_socket: Arc<UdpSocket>,
repair_socket: Arc<UdpSocket>,
fetch_stage_receiver: BlobReceiver,
) -> (Self, BlobReceiver) {
) -> (Self, Receiver<Vec<Entry>>) {
let (retransmit_sender, retransmit_receiver) = channel();

let t_retransmit = retransmitter(retransmit_socket, crdt.clone(), retransmit_receiver);
let (blob_sender, blob_receiver) = channel();
let (entry_sender, entry_receiver) = channel();
let t_window = window_service(
crdt.clone(),
window,
entry_height,
fetch_stage_receiver,
blob_sender,
entry_sender,
retransmit_sender,
repair_socket,
);
let thread_hdls = vec![t_retransmit, t_window];

(RetransmitStage { thread_hdls }, blob_receiver)
(RetransmitStage { thread_hdls }, entry_receiver)
}
}

Expand Down
30 changes: 22 additions & 8 deletions src/window.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@ use crdt::{Crdt, NodeInfo};
use entry::Entry;
#[cfg(feature = "erasure")]
use erasure;
use ledger::Block;
use ledger::{reconstruct_entries_from_blobs, Block};
use log::Level;
use packet::{BlobRecycler, SharedBlob, SharedBlobs};
use packet::{BlobRecycler, SharedBlob};
use result::Result;
use signature::Pubkey;
use std::cmp;
Expand Down Expand Up @@ -67,7 +67,7 @@ pub trait WindowUtil {
id: &Pubkey,
blob: SharedBlob,
pix: u64,
consume_queue: &mut SharedBlobs,
consume_queue: &mut Vec<Entry>,
recycler: &BlobRecycler,
consumed: &mut u64,
leader_unknown: bool,
Expand Down Expand Up @@ -180,7 +180,7 @@ impl WindowUtil for Window {
id: &Pubkey,
blob: SharedBlob,
pix: u64,
consume_queue: &mut SharedBlobs,
consume_queue: &mut Vec<Entry>,
recycler: &BlobRecycler,
consumed: &mut u64,
leader_unknown: bool,
Expand Down Expand Up @@ -254,19 +254,33 @@ impl WindowUtil for Window {
let k = (*consumed % WINDOW_SIZE) as usize;
trace!("{}: k: {} consumed: {}", id, k, *consumed,);

if let Some(blob) = &self[k].data {
let k_data_blob;
let k_data_slot = &mut self[k].data;
if let Some(blob) = k_data_slot {
if blob.read().get_index().unwrap() < *consumed {
// window wrap-around, end of received
break;
}
k_data_blob = (*blob).clone();
} else {
// self[k].data is None, end of received
break;
}
let slot = self[k].clone();
if let Some(r) = slot.data {
consume_queue.push(r)

// Check that we can get the entries from this blob
match reconstruct_entries_from_blobs(vec![k_data_blob]) {
Ok(entries) => {
consume_queue.extend(entries);
}
Err(_) => {
// If the blob can't be deserialized, then remove it from the
// window and exit. *k_data_slot cannot be None at this point,
// so it's safe to unwrap.
k_data_slot.take();
break;
}
}

*consumed += 1;
}
}
Expand Down
75 changes: 50 additions & 25 deletions src/window_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
//!
use counter::Counter;
use crdt::{Crdt, NodeInfo};
use entry::EntrySender;
use log::Level;
use packet::{BlobRecycler, SharedBlob};
use rand::{thread_rng, Rng};
Expand Down Expand Up @@ -144,7 +145,7 @@ fn recv_window(
consumed: &mut u64,
received: &mut u64,
r: &BlobReceiver,
s: &BlobSender,
s: &EntrySender,
retransmit: &BlobSender,
pending_retransmits: &mut bool,
) -> Result<()> {
Expand Down Expand Up @@ -232,7 +233,7 @@ pub fn window_service(
window: SharedWindow,
entry_height: u64,
r: BlobReceiver,
s: BlobSender,
s: EntrySender,
retransmit: BlobSender,
repair_socket: Arc<UdpSocket>,
) -> JoinHandle<()> {
Expand Down Expand Up @@ -295,25 +296,54 @@ pub fn window_service(
#[cfg(test)]
mod test {
use crdt::{Crdt, Node};
use entry::Entry;
use hash::Hash;
use logger;
use packet::{BlobRecycler, PACKET_DATA_SIZE};
use std::net::UdpSocket;
use packet::{BlobRecycler, SharedBlobs, PACKET_DATA_SIZE};
use recorder::Recorder;
use signature::Pubkey;
use std::net::{SocketAddr, UdpSocket};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::channel;
use std::sync::mpsc::{channel, Receiver};
use std::sync::{Arc, RwLock};
use std::time::Duration;
use streamer::{blob_receiver, responder, BlobReceiver};
use streamer::{blob_receiver, responder};
use window::default_window;
use window_service::{repair_backoff, window_service};

fn get_blobs(r: BlobReceiver, num: &mut usize) {
fn make_consecutive_blobs(
me_id: Pubkey,
mut num_blobs_to_make: u64,
start_hash: Hash,
addr: &SocketAddr,
resp_recycler: &BlobRecycler,
) -> SharedBlobs {
let mut msgs = Vec::new();
let mut recorder = Recorder::new(start_hash);
while num_blobs_to_make != 0 {
let new_entries = recorder.record(vec![]);
let mut new_blobs: SharedBlobs = new_entries
.iter()
.enumerate()
.map(|(i, e)| {
let blob_index = num_blobs_to_make - i as u64 - 1;
let new_blob =
e.to_blob(&resp_recycler, Some(blob_index), Some(me_id), Some(addr));
assert_eq!(blob_index, new_blob.read().get_index().unwrap());
new_blob
}).collect();
new_blobs.truncate(num_blobs_to_make as usize);
num_blobs_to_make -= new_blobs.len() as u64;
msgs.extend(new_blobs);
}
msgs
}

fn get_entries(r: Receiver<Vec<Entry>>, num: &mut usize) {
for _t in 0..5 {
let timer = Duration::new(1, 0);
match r.recv_timeout(timer) {
Ok(m) => {
for (i, v) in m.iter().enumerate() {
assert_eq!(v.read().get_index().unwrap() as usize, *num + i);
}
*num += m.len();
}
e => info!("error {:?}", e),
Expand Down Expand Up @@ -355,26 +385,21 @@ mod test {
tn.sockets.replicate.into_iter().map(Arc::new).collect();

let t_responder = responder("window_send_test", blob_sockets[0].clone(), r_responder);
let mut msgs = Vec::new();
for v in 0..10 {
let i = 9 - v;
let b = resp_recycler.allocate();
{
let mut w = b.write();
w.set_index(i).unwrap();
w.set_id(me_id).unwrap();
assert_eq!(i, w.get_index().unwrap());
w.meta.size = PACKET_DATA_SIZE;
w.meta.set_addr(&tn.info.contact_info.ncp);
}
msgs.push(b);
}
let mut num_blobs_to_make = 10;
let gossip_address = &tn.info.contact_info.ncp;
let msgs = make_consecutive_blobs(
me_id,
num_blobs_to_make,
Hash::default(),
&gossip_address,
&resp_recycler,
);
s_responder.send(msgs).expect("send");
t_responder
};

let mut num = 0;
get_blobs(r_window, &mut num);
get_entries(r_window, &mut num);
assert_eq!(num, 10);
let mut q = r_retransmit.recv().unwrap();
while let Ok(mut nq) = r_retransmit.try_recv() {
Expand Down

0 comments on commit c50ac96

Please sign in to comment.