Skip to content
This repository has been archived by the owner on Jan 13, 2025. It is now read-only.

Moved deserialization of blobs to entries from replicate_stage to window_service #1287

Merged
merged 1 commit into from
Sep 21, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions src/entry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,12 @@ use rayon::prelude::*;
use signature::Pubkey;
use std::io::Cursor;
use std::net::SocketAddr;
use std::sync::mpsc::{Receiver, Sender};
use transaction::Transaction;

pub type EntrySender = Sender<Vec<Entry>>;
pub type EntryReceiver = Receiver<Vec<Entry>>;

/// Each Entry contains three pieces of data. The `num_hashes` field is the number
/// of hashes performed since the previous entry. The `id` field is the result
/// of hashing `id` from the previous entry `num_hashes` times. The `transactions`
Expand Down
16 changes: 8 additions & 8 deletions src/replicate_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@
use bank::Bank;
use counter::Counter;
use crdt::Crdt;
use ledger::{reconstruct_entries_from_blobs, Block, LedgerWriter};
use entry::EntryReceiver;
use ledger::{Block, LedgerWriter};
use log::Level;
use result::{Error, Result};
use service::Service;
Expand All @@ -16,7 +17,7 @@ use std::sync::mpsc::RecvTimeoutError;
use std::sync::{Arc, RwLock};
use std::thread::{self, Builder, JoinHandle};
use std::time::Duration;
use streamer::{responder, BlobReceiver};
use streamer::responder;
use vote_stage::VoteStage;

pub struct ReplicateStage {
Expand All @@ -29,16 +30,15 @@ impl ReplicateStage {
fn replicate_requests(
bank: &Arc<Bank>,
crdt: &Arc<RwLock<Crdt>>,
window_receiver: &BlobReceiver,
window_receiver: &EntryReceiver,
ledger_writer: Option<&mut LedgerWriter>,
) -> Result<()> {
let timer = Duration::new(1, 0);
//coalesce all the available blobs into a single vote
let mut blobs = window_receiver.recv_timeout(timer)?;
//coalesce all the available entries into a single vote
let mut entries = window_receiver.recv_timeout(timer)?;
while let Ok(mut more) = window_receiver.try_recv() {
blobs.append(&mut more);
entries.append(&mut more);
}
let entries = reconstruct_entries_from_blobs(blobs)?;

let res = bank.process_entries(entries.clone());

Expand All @@ -64,7 +64,7 @@ impl ReplicateStage {
keypair: Arc<Keypair>,
bank: Arc<Bank>,
crdt: Arc<RwLock<Crdt>>,
window_receiver: BlobReceiver,
window_receiver: EntryReceiver,
ledger_path: Option<&str>,
exit: Arc<AtomicBool>,
) -> Self {
Expand Down
11 changes: 6 additions & 5 deletions src/retransmit_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,14 @@
use counter::Counter;
use crdt::Crdt;
use entry::Entry;
use log::Level;
use result::{Error, Result};
use service::Service;
use std::net::UdpSocket;
use std::sync::atomic::AtomicUsize;
use std::sync::mpsc::channel;
use std::sync::mpsc::RecvTimeoutError;
use std::sync::mpsc::{channel, Receiver};
use std::sync::{Arc, RwLock};
use std::thread::{self, Builder, JoinHandle};
use std::time::Duration;
Expand Down Expand Up @@ -68,23 +69,23 @@ impl RetransmitStage {
retransmit_socket: Arc<UdpSocket>,
repair_socket: Arc<UdpSocket>,
fetch_stage_receiver: BlobReceiver,
) -> (Self, BlobReceiver) {
) -> (Self, Receiver<Vec<Entry>>) {
let (retransmit_sender, retransmit_receiver) = channel();

let t_retransmit = retransmitter(retransmit_socket, crdt.clone(), retransmit_receiver);
let (blob_sender, blob_receiver) = channel();
let (entry_sender, entry_receiver) = channel();
let t_window = window_service(
crdt.clone(),
window,
entry_height,
fetch_stage_receiver,
blob_sender,
entry_sender,
retransmit_sender,
repair_socket,
);
let thread_hdls = vec![t_retransmit, t_window];

(RetransmitStage { thread_hdls }, blob_receiver)
(RetransmitStage { thread_hdls }, entry_receiver)
}
}

Expand Down
30 changes: 22 additions & 8 deletions src/window.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@ use crdt::{Crdt, NodeInfo};
use entry::Entry;
#[cfg(feature = "erasure")]
use erasure;
use ledger::Block;
use ledger::{reconstruct_entries_from_blobs, Block};
use log::Level;
use packet::{BlobRecycler, SharedBlob, SharedBlobs};
use packet::{BlobRecycler, SharedBlob};
use result::Result;
use signature::Pubkey;
use std::cmp;
Expand Down Expand Up @@ -67,7 +67,7 @@ pub trait WindowUtil {
id: &Pubkey,
blob: SharedBlob,
pix: u64,
consume_queue: &mut SharedBlobs,
consume_queue: &mut Vec<Entry>,
recycler: &BlobRecycler,
consumed: &mut u64,
leader_unknown: bool,
Expand Down Expand Up @@ -180,7 +180,7 @@ impl WindowUtil for Window {
id: &Pubkey,
blob: SharedBlob,
pix: u64,
consume_queue: &mut SharedBlobs,
consume_queue: &mut Vec<Entry>,
recycler: &BlobRecycler,
consumed: &mut u64,
leader_unknown: bool,
Expand Down Expand Up @@ -254,19 +254,33 @@ impl WindowUtil for Window {
let k = (*consumed % WINDOW_SIZE) as usize;
trace!("{}: k: {} consumed: {}", id, k, *consumed,);

if let Some(blob) = &self[k].data {
let k_data_blob;
let k_data_slot = &mut self[k].data;
if let Some(blob) = k_data_slot {
if blob.read().get_index().unwrap() < *consumed {
// window wrap-around, end of received
break;
}
k_data_blob = (*blob).clone();
} else {
// self[k].data is None, end of received
break;
}
let slot = self[k].clone();
if let Some(r) = slot.data {
consume_queue.push(r)

// Check that we can get the entries from this blob
match reconstruct_entries_from_blobs(vec![k_data_blob]) {
Ok(entries) => {
consume_queue.extend(entries);
}
Err(_) => {
// If the blob can't be deserialized, then remove it from the
// window and exit. *k_data_slot cannot be None at this point,
// so it's safe to unwrap.
k_data_slot.take();
break;
}
}

*consumed += 1;
}
}
Expand Down
75 changes: 50 additions & 25 deletions src/window_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
//!
use counter::Counter;
use crdt::{Crdt, NodeInfo};
use entry::EntrySender;
use log::Level;
use packet::{BlobRecycler, SharedBlob};
use rand::{thread_rng, Rng};
Expand Down Expand Up @@ -144,7 +145,7 @@ fn recv_window(
consumed: &mut u64,
received: &mut u64,
r: &BlobReceiver,
s: &BlobSender,
s: &EntrySender,
retransmit: &BlobSender,
pending_retransmits: &mut bool,
) -> Result<()> {
Expand Down Expand Up @@ -232,7 +233,7 @@ pub fn window_service(
window: SharedWindow,
entry_height: u64,
r: BlobReceiver,
s: BlobSender,
s: EntrySender,
retransmit: BlobSender,
repair_socket: Arc<UdpSocket>,
) -> JoinHandle<()> {
Expand Down Expand Up @@ -295,25 +296,54 @@ pub fn window_service(
#[cfg(test)]
mod test {
use crdt::{Crdt, Node};
use entry::Entry;
use hash::Hash;
use logger;
use packet::{BlobRecycler, PACKET_DATA_SIZE};
use std::net::UdpSocket;
use packet::{BlobRecycler, SharedBlobs, PACKET_DATA_SIZE};
use recorder::Recorder;
use signature::Pubkey;
use std::net::{SocketAddr, UdpSocket};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::channel;
use std::sync::mpsc::{channel, Receiver};
use std::sync::{Arc, RwLock};
use std::time::Duration;
use streamer::{blob_receiver, responder, BlobReceiver};
use streamer::{blob_receiver, responder};
use window::default_window;
use window_service::{repair_backoff, window_service};

fn get_blobs(r: BlobReceiver, num: &mut usize) {
fn make_consecutive_blobs(
me_id: Pubkey,
mut num_blobs_to_make: u64,
start_hash: Hash,
addr: &SocketAddr,
resp_recycler: &BlobRecycler,
) -> SharedBlobs {
let mut msgs = Vec::new();
let mut recorder = Recorder::new(start_hash);
while num_blobs_to_make != 0 {
let new_entries = recorder.record(vec![]);
let mut new_blobs: SharedBlobs = new_entries
.iter()
.enumerate()
.map(|(i, e)| {
let blob_index = num_blobs_to_make - i as u64 - 1;
let new_blob =
e.to_blob(&resp_recycler, Some(blob_index), Some(me_id), Some(addr));
assert_eq!(blob_index, new_blob.read().get_index().unwrap());
new_blob
}).collect();
new_blobs.truncate(num_blobs_to_make as usize);
num_blobs_to_make -= new_blobs.len() as u64;
msgs.extend(new_blobs);
}
msgs
}

fn get_entries(r: Receiver<Vec<Entry>>, num: &mut usize) {
for _t in 0..5 {
let timer = Duration::new(1, 0);
match r.recv_timeout(timer) {
Ok(m) => {
for (i, v) in m.iter().enumerate() {
assert_eq!(v.read().get_index().unwrap() as usize, *num + i);
}
*num += m.len();
}
e => info!("error {:?}", e),
Expand Down Expand Up @@ -355,26 +385,21 @@ mod test {
tn.sockets.replicate.into_iter().map(Arc::new).collect();

let t_responder = responder("window_send_test", blob_sockets[0].clone(), r_responder);
let mut msgs = Vec::new();
for v in 0..10 {
let i = 9 - v;
let b = resp_recycler.allocate();
{
let mut w = b.write();
w.set_index(i).unwrap();
w.set_id(me_id).unwrap();
assert_eq!(i, w.get_index().unwrap());
w.meta.size = PACKET_DATA_SIZE;
w.meta.set_addr(&tn.info.contact_info.ncp);
}
msgs.push(b);
}
let mut num_blobs_to_make = 10;
let gossip_address = &tn.info.contact_info.ncp;
let msgs = make_consecutive_blobs(
me_id,
num_blobs_to_make,
Hash::default(),
&gossip_address,
&resp_recycler,
);
s_responder.send(msgs).expect("send");
t_responder
};

let mut num = 0;
get_blobs(r_window, &mut num);
get_entries(r_window, &mut num);
assert_eq!(num, 10);
let mut q = r_retransmit.recv().unwrap();
while let Ok(mut nq) = r_retransmit.try_recv() {
Expand Down