Skip to content

Commit

Permalink
Add historian to pipeline
Browse files Browse the repository at this point in the history
No longer intercept entries to register_entry_id(). Intead,
register the ID in the Write stage.

EventProcessor is now just being used as a place to store data.

Fixes #216
  • Loading branch information
garious authored and aeyakovenko committed May 14, 2018
1 parent 916fda7 commit a8e4c63
Show file tree
Hide file tree
Showing 6 changed files with 38 additions and 30 deletions.
6 changes: 5 additions & 1 deletion src/event_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ pub struct EventProcessor {
pub accountant: Arc<Accountant>,
historian_input: Mutex<Sender<Signal>>,
historian: Mutex<Historian>,
pub start_hash: Hash,
pub ms_per_tick: Option<u64>,
}

impl EventProcessor {
Expand All @@ -25,6 +27,8 @@ impl EventProcessor {
accountant: Arc::new(accountant),
historian_input: Mutex::new(historian_input),
historian: Mutex::new(historian),
start_hash: *start_hash,
ms_per_tick,
}
}

Expand All @@ -37,7 +41,7 @@ impl EventProcessor {
sender.send(Signal::Events(events))?;

// Wait for the historian to tag our Events with an ID and then register it.
let entry = historian.entry_receiver.lock().unwrap().recv()?;
let entry = historian.entry_receiver.recv()?;
self.accountant.register_entry_id(&entry.id);
Ok(entry)
}
Expand Down
18 changes: 7 additions & 11 deletions src/historian.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,12 @@
use entry::Entry;
use hash::Hash;
use recorder::{ExitReason, Recorder, Signal};
use std::sync::Mutex;
use std::sync::mpsc::{channel, Receiver, Sender, TryRecvError};
use std::thread::{spawn, JoinHandle};
use std::time::Instant;

pub struct Historian {
pub entry_receiver: Mutex<Receiver<Entry>>,
pub entry_receiver: Receiver<Entry>,
pub thread_hdl: JoinHandle<ExitReason>,
}

Expand All @@ -24,7 +23,7 @@ impl Historian {
let thread_hdl =
Historian::create_recorder(*start_hash, ms_per_tick, event_receiver, entry_sender);
Historian {
entry_receiver: Mutex::new(entry_receiver),
entry_receiver,
thread_hdl,
}
}
Expand Down Expand Up @@ -52,10 +51,7 @@ impl Historian {
}

pub fn receive(self: &Self) -> Result<Entry, TryRecvError> {
self.entry_receiver
.lock()
.expect("'entry_receiver' lock in pub fn receive")
.try_recv()
self.entry_receiver.try_recv()
}
}

Expand All @@ -78,9 +74,9 @@ mod tests {
sleep(Duration::new(0, 1_000_000));
input.send(Signal::Tick).unwrap();

let entry0 = hist.entry_receiver.lock().unwrap().recv().unwrap();
let entry1 = hist.entry_receiver.lock().unwrap().recv().unwrap();
let entry2 = hist.entry_receiver.lock().unwrap().recv().unwrap();
let entry0 = hist.entry_receiver.recv().unwrap();
let entry1 = hist.entry_receiver.recv().unwrap();
let entry2 = hist.entry_receiver.recv().unwrap();

assert_eq!(entry0.num_hashes, 0);
assert_eq!(entry1.num_hashes, 0);
Expand Down Expand Up @@ -117,7 +113,7 @@ mod tests {
sleep(Duration::from_millis(900));
input.send(Signal::Tick).unwrap();
drop(input);
let entries: Vec<Entry> = hist.entry_receiver.lock().unwrap().iter().collect();
let entries: Vec<Entry> = hist.entry_receiver.iter().collect();
assert!(entries.len() > 1);

// Ensure the ID is not the seed.
Expand Down
11 changes: 5 additions & 6 deletions src/request_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,11 @@
use accountant::Accountant;
use bincode::{deserialize, serialize};
use entry::Entry;
use event::Event;
use event_processor::EventProcessor;
use packet;
use packet::SharedPackets;
use rayon::prelude::*;
use recorder::Signal;
use request::{Request, Response};
use result::Result;
use std::collections::VecDeque;
Expand Down Expand Up @@ -140,9 +139,8 @@ impl RequestProcessor {

pub fn process_request_packets(
&self,
event_processor: &EventProcessor,
verified_receiver: &Receiver<Vec<(SharedPackets, Vec<u8>)>>,
entry_sender: &Sender<Entry>,
signal_sender: &Sender<Signal>,
blob_sender: &streamer::BlobSender,
packet_recycler: &packet::PacketRecycler,
blob_recycler: &packet::BlobRecycler,
Expand Down Expand Up @@ -176,8 +174,9 @@ impl RequestProcessor {
debug!("events: {} reqs: {}", events.len(), reqs.len());

debug!("process_events");
let entry = event_processor.process_events(events)?;
entry_sender.send(entry)?;
let results = self.accountant.process_verified_events(events);
let events = results.into_iter().filter_map(|x| x.ok()).collect();
signal_sender.send(Signal::Events(events))?;
debug!("done process_events");

debug!("process_requests");
Expand Down
13 changes: 5 additions & 8 deletions src/request_stage.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
//! The `request_stage` processes thin client Request messages.
use entry::Entry;
use event_processor::EventProcessor;
use packet;
use packet::SharedPackets;
use recorder::Signal;
use request_processor::RequestProcessor;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
Expand All @@ -13,29 +12,27 @@ use streamer;

pub struct RequestStage {
pub thread_hdl: JoinHandle<()>,
pub entry_receiver: Receiver<Entry>,
pub signal_receiver: Receiver<Signal>,
pub blob_receiver: streamer::BlobReceiver,
pub request_processor: Arc<RequestProcessor>,
}

impl RequestStage {
pub fn new(
request_processor: RequestProcessor,
event_processor: Arc<EventProcessor>,
exit: Arc<AtomicBool>,
verified_receiver: Receiver<Vec<(SharedPackets, Vec<u8>)>>,
packet_recycler: packet::PacketRecycler,
blob_recycler: packet::BlobRecycler,
) -> Self {
let request_processor = Arc::new(request_processor);
let request_processor_ = request_processor.clone();
let (entry_sender, entry_receiver) = channel();
let (signal_sender, signal_receiver) = channel();
let (blob_sender, blob_receiver) = channel();
let thread_hdl = spawn(move || loop {
let e = request_processor_.process_request_packets(
&event_processor,
&verified_receiver,
&entry_sender,
&signal_sender,
&blob_sender,
&packet_recycler,
&blob_recycler,
Expand All @@ -48,7 +45,7 @@ impl RequestStage {
});
RequestStage {
thread_hdl,
entry_receiver,
signal_receiver,
blob_receiver,
request_processor,
}
Expand Down
10 changes: 8 additions & 2 deletions src/rpu.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use crdt::{Crdt, ReplicatedData};
use entry::Entry;
use entry_writer::EntryWriter;
use event_processor::EventProcessor;
use historian::Historian;
use packet;
use request_processor::RequestProcessor;
use request_stage::RequestStage;
Expand Down Expand Up @@ -88,21 +89,26 @@ impl Rpu {
let request_processor = RequestProcessor::new(self.event_processor.accountant.clone());
let request_stage = RequestStage::new(
request_processor,
self.event_processor.clone(),
exit.clone(),
sig_verify_stage.verified_receiver,
packet_recycler.clone(),
blob_recycler.clone(),
);

let historian_stage = Historian::new(
request_stage.signal_receiver,
&self.event_processor.start_hash,
self.event_processor.ms_per_tick,
);

let (broadcast_sender, broadcast_receiver) = channel();
let t_write = Self::write_service(
self.event_processor.accountant.clone(),
exit.clone(),
broadcast_sender,
blob_recycler.clone(),
Mutex::new(writer),
request_stage.entry_receiver,
historian_stage.entry_receiver,
);

let broadcast_socket = UdpSocket::bind(local)?;
Expand Down
10 changes: 8 additions & 2 deletions src/tvu.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use crdt::{Crdt, ReplicatedData};
use entry::Entry;
use entry_writer::EntryWriter;
use event_processor::EventProcessor;
use historian::Historian;
use ledger;
use packet;
use request_processor::RequestProcessor;
Expand Down Expand Up @@ -173,17 +174,22 @@ impl Tvu {
let request_processor = RequestProcessor::new(obj.event_processor.accountant.clone());
let request_stage = RequestStage::new(
request_processor,
obj.event_processor.clone(),
exit.clone(),
sig_verify_stage.verified_receiver,
packet_recycler.clone(),
blob_recycler.clone(),
);

let historian_stage = Historian::new(
request_stage.signal_receiver,
&obj.event_processor.start_hash,
obj.event_processor.ms_per_tick,
);

let t_write = Self::drain_service(
obj.event_processor.accountant.clone(),
exit.clone(),
request_stage.entry_receiver,
historian_stage.entry_receiver,
);

let t_responder = streamer::responder(
Expand Down

0 comments on commit a8e4c63

Please sign in to comment.