Skip to content

Commit

Permalink
Free up the name 'accounting_stage'
Browse files Browse the repository at this point in the history
  • Loading branch information
garious committed May 12, 2018
1 parent 1511dc4 commit a3d2831
Show file tree
Hide file tree
Showing 8 changed files with 64 additions and 66 deletions.
6 changes: 3 additions & 3 deletions src/bin/testnode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,10 @@ extern crate solana;
use getopts::Options;
use isatty::stdin_isatty;
use solana::accountant::Accountant;
use solana::accounting_stage::AccountingStage;
use solana::crdt::ReplicatedData;
use solana::entry::Entry;
use solana::event::Event;
use solana::event_processor::EventProcessor;
use solana::rpu::Rpu;
use solana::signature::{KeyPair, KeyPairUtil};
use std::env;
Expand Down Expand Up @@ -115,9 +115,9 @@ fn main() {

eprintln!("creating networking stack...");

let accounting_stage = AccountingStage::new(accountant, &last_id, Some(1000));
let event_processor = EventProcessor::new(accountant, &last_id, Some(1000));
let exit = Arc::new(AtomicBool::new(false));
let rpu = Arc::new(Rpu::new(accounting_stage));
let rpu = Rpu::new(event_processor);
let serve_sock = UdpSocket::bind(&serve_addr).unwrap();
let gossip_sock = UdpSocket::bind(&gossip_addr).unwrap();
let replicate_sock = UdpSocket::bind(&replicate_addr).unwrap();
Expand Down
16 changes: 7 additions & 9 deletions src/entry_writer.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
//! The `entry_writer` module helps implement the TPU's write stage.
use accounting_stage::AccountingStage;
use entry::Entry;
use event_processor::EventProcessor;
use ledger;
use packet;
use request_stage::RequestProcessor;
Expand All @@ -15,27 +15,25 @@ use std::time::Duration;
use streamer;

pub struct EntryWriter<'a> {
accounting_stage: &'a AccountingStage,
event_processor: &'a EventProcessor,
request_processor: &'a RequestProcessor,
}

impl<'a> EntryWriter<'a> {
/// Create a new Tpu that wraps the given Accountant.
pub fn new(
accounting_stage: &'a AccountingStage,
event_processor: &'a EventProcessor,
request_processor: &'a RequestProcessor,
) -> Self {
EntryWriter {
accounting_stage,
event_processor,
request_processor,
}
}

fn write_entry<W: Write>(&self, writer: &Mutex<W>, entry: &Entry) {
trace!("write_entry entry");
self.accounting_stage
.accountant
.register_entry_id(&entry.id);
self.event_processor.accountant.register_entry_id(&entry.id);
writeln!(
writer.lock().expect("'writer' lock in fn fn write_entry"),
"{}",
Expand All @@ -47,14 +45,14 @@ impl<'a> EntryWriter<'a> {
fn write_entries<W: Write>(&self, writer: &Mutex<W>) -> Result<Vec<Entry>> {
//TODO implement a serialize for channel that does this without allocations
let mut l = vec![];
let entry = self.accounting_stage
let entry = self.event_processor
.output
.lock()
.expect("'ouput' lock in fn receive_all")
.recv_timeout(Duration::new(1, 0))?;
self.write_entry(writer, &entry);
l.push(entry);
while let Ok(entry) = self.accounting_stage
while let Ok(entry) = self.event_processor
.output
.lock()
.expect("'output' lock in fn write_entries")
Expand Down
30 changes: 15 additions & 15 deletions src/accounting_stage.rs → src/event_processor.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
//! The `accounting_stage` module implements the accounting stage of the TPU.
//! The `event_processor` module implements the accounting stage of the TPU.
use accountant::Accountant;
use entry::Entry;
Expand All @@ -10,21 +10,21 @@ use result::Result;
use std::sync::mpsc::{channel, Receiver, Sender};
use std::sync::{Arc, Mutex};

pub struct AccountingStage {
pub struct EventProcessor {
pub output: Mutex<Receiver<Entry>>,
entry_sender: Mutex<Sender<Entry>>,
pub accountant: Arc<Accountant>,
historian_input: Mutex<Sender<Signal>>,
historian: Mutex<Historian>,
}

impl AccountingStage {
impl EventProcessor {
/// Create a new stage of the TPU for event and transaction processing
pub fn new(accountant: Accountant, start_hash: &Hash, ms_per_tick: Option<u64>) -> Self {
let (historian_input, event_receiver) = channel();
let historian = Historian::new(event_receiver, start_hash, ms_per_tick);
let (entry_sender, output) = channel();
AccountingStage {
EventProcessor {
output: Mutex::new(output),
entry_sender: Mutex::new(entry_sender),
accountant: Arc::new(accountant),
Expand Down Expand Up @@ -52,9 +52,9 @@ impl AccountingStage {
#[cfg(test)]
mod tests {
use accountant::Accountant;
use accounting_stage::AccountingStage;
use entry::Entry;
use event::Event;
use event_processor::EventProcessor;
use mint::Mint;
use signature::{KeyPair, KeyPairUtil};
use transaction::Transaction;
Expand All @@ -66,22 +66,22 @@ mod tests {
// Entry OR if the verifier tries to parallelize across multiple Entries.
let mint = Mint::new(2);
let accountant = Accountant::new(&mint);
let accounting_stage = AccountingStage::new(accountant, &mint.last_id(), None);
let event_processor = EventProcessor::new(accountant, &mint.last_id(), None);

// Process a batch that includes a transaction that receives two tokens.
let alice = KeyPair::new();
let tr = Transaction::new(&mint.keypair(), alice.pubkey(), 2, mint.last_id());
let events = vec![Event::Transaction(tr)];
assert!(accounting_stage.process_events(events).is_ok());
assert!(event_processor.process_events(events).is_ok());

// Process a second batch that spends one of those tokens.
let tr = Transaction::new(&alice, mint.pubkey(), 1, mint.last_id());
let events = vec![Event::Transaction(tr)];
assert!(accounting_stage.process_events(events).is_ok());
assert!(event_processor.process_events(events).is_ok());

// Collect the ledger and feed it to a new accountant.
drop(accounting_stage.entry_sender);
let entries: Vec<Entry> = accounting_stage.output.lock().unwrap().iter().collect();
drop(event_processor.entry_sender);
let entries: Vec<Entry> = event_processor.output.lock().unwrap().iter().collect();

// Assert the user holds one token, not two. If the server only output one
// entry, then the second transaction will be rejected, because it drives
Expand All @@ -104,8 +104,8 @@ mod bench {
extern crate test;
use self::test::Bencher;
use accountant::{Accountant, MAX_ENTRY_IDS};
use accounting_stage::*;
use bincode::serialize;
use event_processor::*;
use hash::hash;
use mint::Mint;
use rayon::prelude::*;
Expand Down Expand Up @@ -154,17 +154,17 @@ mod bench {
.map(|tr| Event::Transaction(tr))
.collect();

let accounting_stage = AccountingStage::new(accountant, &mint.last_id(), None);
let event_processor = EventProcessor::new(accountant, &mint.last_id(), None);

let now = Instant::now();
assert!(accounting_stage.process_events(events).is_ok());
assert!(event_processor.process_events(events).is_ok());
let duration = now.elapsed();
let sec = duration.as_secs() as f64 + duration.subsec_nanos() as f64 / 1_000_000_000.0;
let tps = txs as f64 / sec;

// Ensure that all transactions were successfully logged.
drop(accounting_stage.historian_input);
let entries: Vec<Entry> = accounting_stage.output.lock().unwrap().iter().collect();
drop(event_processor.historian_input);
let entries: Vec<Entry> = event_processor.output.lock().unwrap().iter().collect();
assert_eq!(entries.len(), 1);
assert_eq!(entries[0].events.len(), txs as usize);

Expand Down
2 changes: 1 addition & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
#![cfg_attr(feature = "unstable", feature(test))]
pub mod accountant;
pub mod accounting_stage;
pub mod crdt;
pub mod ecdsa;
pub mod entry;
pub mod entry_writer;
#[cfg(feature = "erasure")]
pub mod erasure;
pub mod event;
pub mod event_processor;
pub mod hash;
pub mod historian;
pub mod ledger;
Expand Down
10 changes: 5 additions & 5 deletions src/request_stage.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
//! The `request_stage` processes thin client Request messages.
use accountant::Accountant;
use accounting_stage::AccountingStage;
use bincode::{deserialize, serialize};
use entry::Entry;
use event::Event;
use event_processor::EventProcessor;
use hash::Hash;
use packet;
use packet::SharedPackets;
Expand Down Expand Up @@ -205,7 +205,7 @@ impl RequestProcessor {

pub fn process_request_packets(
&self,
accounting_stage: &AccountingStage,
event_processor: &EventProcessor,
verified_receiver: &Receiver<Vec<(SharedPackets, Vec<u8>)>>,
responder_sender: &streamer::BlobSender,
packet_recycler: &packet::PacketRecycler,
Expand Down Expand Up @@ -240,7 +240,7 @@ impl RequestProcessor {
debug!("events: {} reqs: {}", events.len(), reqs.len());

debug!("process_events");
accounting_stage.process_events(events)?;
event_processor.process_events(events)?;
debug!("done process_events");

debug!("process_requests");
Expand Down Expand Up @@ -278,7 +278,7 @@ pub struct RequestStage {
impl RequestStage {
pub fn new(
request_processor: RequestProcessor,
accounting_stage: Arc<AccountingStage>,
event_processor: Arc<EventProcessor>,
exit: Arc<AtomicBool>,
verified_receiver: Receiver<Vec<(SharedPackets, Vec<u8>)>>,
packet_recycler: packet::PacketRecycler,
Expand All @@ -289,7 +289,7 @@ impl RequestStage {
let (responder_sender, output) = channel();
let thread_hdl = spawn(move || loop {
let e = request_processor_.process_request_packets(
&accounting_stage,
&event_processor,
&verified_receiver,
&responder_sender,
&packet_recycler,
Expand Down
18 changes: 9 additions & 9 deletions src/rpu.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
//! The `rpu` module implements the Request Processing Unit, a
//! 5-stage transaction processing pipeline in software.
use accounting_stage::AccountingStage;
use crdt::{Crdt, ReplicatedData};
use entry_writer::EntryWriter;
use event_processor::EventProcessor;
use packet;
use request_stage::{RequestProcessor, RequestStage};
use result::Result;
Expand All @@ -17,27 +17,27 @@ use std::thread::{spawn, JoinHandle};
use streamer;

pub struct Rpu {
accounting_stage: Arc<AccountingStage>,
event_processor: Arc<EventProcessor>,
}

impl Rpu {
/// Create a new Rpu that wraps the given Accountant.
pub fn new(accounting_stage: AccountingStage) -> Self {
pub fn new(event_processor: EventProcessor) -> Self {
Rpu {
accounting_stage: Arc::new(accounting_stage),
event_processor: Arc::new(event_processor),
}
}

fn write_service<W: Write + Send + 'static>(
accounting_stage: Arc<AccountingStage>,
event_processor: Arc<EventProcessor>,
request_processor: Arc<RequestProcessor>,
exit: Arc<AtomicBool>,
broadcast: streamer::BlobSender,
blob_recycler: packet::BlobRecycler,
writer: Mutex<W>,
) -> JoinHandle<()> {
spawn(move || loop {
let entry_writer = EntryWriter::new(&accounting_stage, &request_processor);
let entry_writer = EntryWriter::new(&event_processor, &request_processor);
let _ = entry_writer.write_and_send_entries(&broadcast, &blob_recycler, &writer);
if exit.load(Ordering::Relaxed) {
info!("broadcat_service exiting");
Expand Down Expand Up @@ -77,10 +77,10 @@ impl Rpu {
let sig_verify_stage = SigVerifyStage::new(exit.clone(), packet_receiver);

let blob_recycler = packet::BlobRecycler::default();
let request_processor = RequestProcessor::new(self.accounting_stage.accountant.clone());
let request_processor = RequestProcessor::new(self.event_processor.accountant.clone());
let request_stage = RequestStage::new(
request_processor,
self.accounting_stage.clone(),
self.event_processor.clone(),
exit.clone(),
sig_verify_stage.output,
packet_recycler.clone(),
Expand All @@ -89,7 +89,7 @@ impl Rpu {

let (broadcast_sender, broadcast_receiver) = channel();
let t_write = Self::write_service(
self.accounting_stage.clone(),
self.event_processor.clone(),
request_stage.request_processor.clone(),
exit.clone(),
broadcast_sender,
Expand Down
18 changes: 9 additions & 9 deletions src/thin_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,8 +155,8 @@ impl ThinClient {
mod tests {
use super::*;
use accountant::Accountant;
use accounting_stage::AccountingStage;
use crdt::{Crdt, ReplicatedData};
use event_processor::EventProcessor;
use futures::Future;
use logger;
use mint::Mint;
Expand Down Expand Up @@ -190,8 +190,8 @@ mod tests {
let accountant = Accountant::new(&alice);
let bob_pubkey = KeyPair::new().pubkey();
let exit = Arc::new(AtomicBool::new(false));
let accounting_stage = AccountingStage::new(accountant, &alice.last_id(), Some(30));
let rpu = Arc::new(Rpu::new(accounting_stage));
let event_processor = EventProcessor::new(accountant, &alice.last_id(), Some(30));
let rpu = Arc::new(Rpu::new(event_processor));
let threads = rpu.serve(d, serve, gossip, exit.clone(), sink()).unwrap();
sleep(Duration::from_millis(300));

Expand Down Expand Up @@ -228,8 +228,8 @@ mod tests {
let accountant = Accountant::new(&alice);
let bob_pubkey = KeyPair::new().pubkey();
let exit = Arc::new(AtomicBool::new(false));
let accounting_stage = AccountingStage::new(accountant, &alice.last_id(), Some(30));
let rpu = Arc::new(Rpu::new(accounting_stage));
let event_processor = EventProcessor::new(accountant, &alice.last_id(), Some(30));
let rpu = Arc::new(Rpu::new(event_processor));
let serve_addr = leader_serve.local_addr().unwrap();
let threads = rpu.serve(
leader_data,
Expand Down Expand Up @@ -298,14 +298,14 @@ mod tests {

let leader_acc = {
let accountant = Accountant::new(&alice);
let accounting_stage = AccountingStage::new(accountant, &alice.last_id(), Some(30));
Arc::new(Rpu::new(accounting_stage))
let event_processor = EventProcessor::new(accountant, &alice.last_id(), Some(30));
Arc::new(Rpu::new(event_processor))
};

let replicant_acc = {
let accountant = Accountant::new(&alice);
let accounting_stage = AccountingStage::new(accountant, &alice.last_id(), Some(30));
Arc::new(Tvu::new(accounting_stage))
let event_processor = EventProcessor::new(accountant, &alice.last_id(), Some(30));
Arc::new(Tvu::new(event_processor))
};

let leader_threads = leader_acc
Expand Down
Loading

0 comments on commit a3d2831

Please sign in to comment.