Skip to content

Commit

Permalink
Consistent naming of senders and receivers
Browse files Browse the repository at this point in the history
  • Loading branch information
garious committed May 12, 2018
1 parent a3869dd commit 6264508
Show file tree
Hide file tree
Showing 6 changed files with 24 additions and 24 deletions.
2 changes: 1 addition & 1 deletion src/event_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ impl EventProcessor {
sender.send(Signal::Events(events))?;

// Wait for the historian to tag our Events with an ID and then register it.
let entry = historian.output.lock().unwrap().recv()?;
let entry = historian.entry_receiver.lock().unwrap().recv()?;
self.accountant.register_entry_id(&entry.id);
Ok(entry)
}
Expand Down
20 changes: 10 additions & 10 deletions src/historian.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use std::thread::{spawn, JoinHandle};
use std::time::Instant;

pub struct Historian {
pub output: Mutex<Receiver<Entry>>,
pub entry_receiver: Mutex<Receiver<Entry>>,
pub thread_hdl: JoinHandle<ExitReason>,
}

Expand All @@ -20,11 +20,11 @@ impl Historian {
start_hash: &Hash,
ms_per_tick: Option<u64>,
) -> Self {
let (entry_sender, output) = channel();
let (entry_sender, entry_receiver) = channel();
let thread_hdl =
Historian::create_recorder(*start_hash, ms_per_tick, event_receiver, entry_sender);
Historian {
output: Mutex::new(output),
entry_receiver: Mutex::new(entry_receiver),
thread_hdl,
}
}
Expand Down Expand Up @@ -52,9 +52,9 @@ impl Historian {
}

pub fn receive(self: &Self) -> Result<Entry, TryRecvError> {
self.output
self.entry_receiver
.lock()
.expect("'output' lock in pub fn receive")
.expect("'entry_receiver' lock in pub fn receive")
.try_recv()
}
}
Expand All @@ -78,9 +78,9 @@ mod tests {
sleep(Duration::new(0, 1_000_000));
input.send(Signal::Tick).unwrap();

let entry0 = hist.output.lock().unwrap().recv().unwrap();
let entry1 = hist.output.lock().unwrap().recv().unwrap();
let entry2 = hist.output.lock().unwrap().recv().unwrap();
let entry0 = hist.entry_receiver.lock().unwrap().recv().unwrap();
let entry1 = hist.entry_receiver.lock().unwrap().recv().unwrap();
let entry2 = hist.entry_receiver.lock().unwrap().recv().unwrap();

assert_eq!(entry0.num_hashes, 0);
assert_eq!(entry1.num_hashes, 0);
Expand All @@ -100,7 +100,7 @@ mod tests {
let (input, event_receiver) = channel();
let zero = Hash::default();
let hist = Historian::new(event_receiver, &zero, None);
drop(hist.output);
drop(hist.entry_receiver);
input.send(Signal::Tick).unwrap();
assert_eq!(
hist.thread_hdl.join().unwrap(),
Expand All @@ -116,7 +116,7 @@ mod tests {
sleep(Duration::from_millis(300));
input.send(Signal::Tick).unwrap();
drop(input);
let entries: Vec<Entry> = hist.output.lock().unwrap().iter().collect();
let entries: Vec<Entry> = hist.entry_receiver.lock().unwrap().iter().collect();
assert!(entries.len() > 1);

// Ensure the ID is not the seed.
Expand Down
12 changes: 6 additions & 6 deletions src/request_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ impl RequestProcessor {
event_processor: &EventProcessor,
verified_receiver: &Receiver<Vec<(SharedPackets, Vec<u8>)>>,
entry_sender: &Sender<Entry>,
responder_sender: &streamer::BlobSender,
blob_sender: &streamer::BlobSender,
packet_recycler: &packet::PacketRecycler,
blob_recycler: &packet::BlobRecycler,
) -> Result<()> {
Expand Down Expand Up @@ -253,7 +253,7 @@ impl RequestProcessor {
if !blobs.is_empty() {
info!("process: sending blobs: {}", blobs.len());
//don't wake up the other side if there is nothing
responder_sender.send(blobs)?;
blob_sender.send(blobs)?;
}
packet_recycler.recycle(msgs);
}
Expand All @@ -274,7 +274,7 @@ impl RequestProcessor {
pub struct RequestStage {
pub thread_hdl: JoinHandle<()>,
pub entry_receiver: Receiver<Entry>,
pub output: streamer::BlobReceiver,
pub blob_receiver: streamer::BlobReceiver,
pub request_processor: Arc<RequestProcessor>,
}

Expand All @@ -290,13 +290,13 @@ impl RequestStage {
let request_processor = Arc::new(request_processor);
let request_processor_ = request_processor.clone();
let (entry_sender, entry_receiver) = channel();
let (responder_sender, output) = channel();
let (blob_sender, blob_receiver) = channel();
let thread_hdl = spawn(move || loop {
let e = request_processor_.process_request_packets(
&event_processor,
&verified_receiver,
&entry_sender,
&responder_sender,
&blob_sender,
&packet_recycler,
&blob_recycler,
);
Expand All @@ -309,7 +309,7 @@ impl RequestStage {
RequestStage {
thread_hdl,
entry_receiver,
output,
blob_receiver,
request_processor,
}
}
Expand Down
4 changes: 2 additions & 2 deletions src/rpu.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ impl Rpu {
request_processor,
self.event_processor.clone(),
exit.clone(),
sig_verify_stage.output,
sig_verify_stage.verified_receiver,
packet_recycler.clone(),
blob_recycler.clone(),
);
Expand Down Expand Up @@ -119,7 +119,7 @@ impl Rpu {
respond_socket,
exit.clone(),
blob_recycler.clone(),
request_stage.output,
request_stage.blob_receiver,
);

let mut threads = vec![
Expand Down
6 changes: 3 additions & 3 deletions src/sig_verify_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,17 @@ use streamer;
use timing;

pub struct SigVerifyStage {
pub output: Receiver<Vec<(SharedPackets, Vec<u8>)>>,
pub verified_receiver: Receiver<Vec<(SharedPackets, Vec<u8>)>>,
pub thread_hdls: Vec<JoinHandle<()>>,
}

impl SigVerifyStage {
pub fn new(exit: Arc<AtomicBool>, packets_receiver: Receiver<SharedPackets>) -> Self {
let (verified_sender, output) = channel();
let (verified_sender, verified_receiver) = channel();
let thread_hdls = Self::verifier_services(exit, packets_receiver, verified_sender);
SigVerifyStage {
thread_hdls,
output,
verified_receiver,
}
}

Expand Down
4 changes: 2 additions & 2 deletions src/tvu.rs
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ impl Tvu {
request_processor,
obj.event_processor.clone(),
exit.clone(),
sig_verify_stage.output,
sig_verify_stage.verified_receiver,
packet_recycler.clone(),
blob_recycler.clone(),
);
Expand All @@ -190,7 +190,7 @@ impl Tvu {
respond_socket,
exit.clone(),
blob_recycler.clone(),
request_stage.output,
request_stage.blob_receiver,
);

let mut threads = vec![
Expand Down

0 comments on commit 6264508

Please sign in to comment.