Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tvu cleanup #238

Merged
merged 4 commits into from
May 22, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ pub mod packet;
pub mod plan;
pub mod record_stage;
pub mod recorder;
pub mod replicate_stage;
pub mod request;
pub mod request_processor;
pub mod request_stage;
Expand Down
52 changes: 52 additions & 0 deletions src/replicate_stage.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
//! The `replicate_stage` replicates transactions broadcast by the leader.

use bank::Bank;
use ledger;
use packet;
use result::Result;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use std::thread::{spawn, JoinHandle};
use std::time::Duration;
use streamer;

pub struct ReplicateStage {
pub thread_hdl: JoinHandle<()>,
}

impl ReplicateStage {
/// Process verified blobs, already in order
fn replicate_requests(
bank: &Arc<Bank>,
verified_receiver: &streamer::BlobReceiver,
blob_recycler: &packet::BlobRecycler,
) -> Result<()> {
let timer = Duration::new(1, 0);
let blobs = verified_receiver.recv_timeout(timer)?;
let entries = ledger::reconstruct_entries_from_blobs(&blobs);
let res = bank.process_verified_entries(entries);
if res.is_err() {
error!("process_verified_entries {} {:?}", blobs.len(), res);
}
res?;
for blob in blobs {
blob_recycler.recycle(blob);
}
Ok(())
}

pub fn new(
bank: Arc<Bank>,
exit: Arc<AtomicBool>,
window_receiver: streamer::BlobReceiver,
blob_recycler: packet::BlobRecycler,
) -> Self {
let thread_hdl = spawn(move || loop {
let e = Self::replicate_requests(&bank, &window_receiver, &blob_recycler);
if e.is_err() && exit.load(Ordering::Relaxed) {
break;
}
});
ReplicateStage { thread_hdl }
}
}
44 changes: 10 additions & 34 deletions src/tvu.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,16 @@ use bank::Bank;
use banking_stage::BankingStage;
use crdt::{Crdt, ReplicatedData};
use hash::Hash;
use ledger;
use packet;
use record_stage::RecordStage;
use replicate_stage::ReplicateStage;
use result::Result;
use sig_verify_stage::SigVerifyStage;
use std::net::UdpSocket;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::atomic::AtomicBool;
use std::sync::mpsc::channel;
use std::sync::{Arc, RwLock};
use std::thread::{spawn, JoinHandle};
use std::thread::JoinHandle;
use std::time::Duration;
use streamer;
use write_stage::WriteStage;
Expand All @@ -35,27 +35,6 @@ impl Tvu {
}
}

/// Process verified blobs, already in order
/// Respond with a signed hash of the state
fn replicate_state(
obj: &Tvu,
verified_receiver: &streamer::BlobReceiver,
blob_recycler: &packet::BlobRecycler,
) -> Result<()> {
let timer = Duration::new(1, 0);
let blobs = verified_receiver.recv_timeout(timer)?;
let entries = ledger::reconstruct_entries_from_blobs(&blobs);
let res = obj.bank.process_verified_entries(entries);
if res.is_err() {
error!("process_verified_entries {} {:?}", blobs.len(), res);
}
res?;
for blob in blobs {
blob_recycler.recycle(blob);
}
Ok(())
}

/// This service receives messages from a leader in the network and processes the transactions
/// on the bank state.
/// # Arguments
Expand Down Expand Up @@ -132,14 +111,12 @@ impl Tvu {
retransmit_sender,
);

let tvu = obj.clone();
let s_exit = exit.clone();
let t_replicator = spawn(move || loop {
let e = Self::replicate_state(&tvu, &window_receiver, &blob_recycler);
if e.is_err() && s_exit.load(Ordering::Relaxed) {
break;
}
});
let replicate_stage = ReplicateStage::new(
obj.bank.clone(),
exit.clone(),
window_receiver,
blob_recycler.clone(),
);

//serve pipeline
// make sure we are on the same interface
Expand Down Expand Up @@ -178,7 +155,7 @@ impl Tvu {
t_blob_receiver,
t_retransmit,
t_window,
t_replicator,
replicate_stage.thread_hdl,
t_gossip,
t_listen,
//serve threads
Expand Down Expand Up @@ -375,5 +352,4 @@ mod tests {
t_l_gossip.join().expect("join");
t_l_listen.join().expect("join");
}

}