Skip to content

Commit

Permalink
Cleanup: field names should be nouns
Browse files Browse the repository at this point in the history
  • Loading branch information
garious committed May 9, 2018
1 parent ebc458c commit f2d4799
Show file tree
Hide file tree
Showing 4 changed files with 34 additions and 33 deletions.
18 changes: 9 additions & 9 deletions src/accounting_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,22 +66,22 @@ mod tests {
// Entry OR if the verifier tries to parallelize across multiple Entries.
let mint = Mint::new(2);
let acc = Accountant::new(&mint);
let stage = AccountingStage::new(acc, &mint.last_id(), None);
let accounting_stage = AccountingStage::new(acc, &mint.last_id(), None);

// Process a batch that includes a transaction that receives two tokens.
let alice = KeyPair::new();
let tr = Transaction::new(&mint.keypair(), alice.pubkey(), 2, mint.last_id());
let events = vec![Event::Transaction(tr)];
assert!(stage.process_events(events).is_ok());
assert!(accounting_stage.process_events(events).is_ok());

// Process a second batch that spends one of those tokens.
let tr = Transaction::new(&alice, mint.pubkey(), 1, mint.last_id());
let events = vec![Event::Transaction(tr)];
assert!(stage.process_events(events).is_ok());
assert!(accounting_stage.process_events(events).is_ok());

// Collect the ledger and feed it to a new accountant.
drop(stage.entry_sender);
let entries: Vec<Entry> = stage.output.lock().unwrap().iter().collect();
drop(accounting_stage.entry_sender);
let entries: Vec<Entry> = accounting_stage.output.lock().unwrap().iter().collect();

// Assert the user holds one token, not two. If the server only output one
// entry, then the second transaction will be rejected, because it drives
Expand Down Expand Up @@ -156,17 +156,17 @@ mod bench {
.collect();

let (input, event_receiver) = channel();
let stage = AccountingStage::new(acc, &mint.last_id(), None);
let accounting_stage = AccountingStage::new(acc, &mint.last_id(), None);

let now = Instant::now();
assert!(stage.process_events(events).is_ok());
assert!(accounting_stage.process_events(events).is_ok());
let duration = now.elapsed();
let sec = duration.as_secs() as f64 + duration.subsec_nanos() as f64 / 1_000_000_000.0;
let tps = txs as f64 / sec;

// Ensure that all transactions were successfully logged.
drop(stage.historian_input);
let entries: Vec<Entry> = stage.output.lock().unwrap().iter().collect();
drop(accounting_stage.historian_input);
let entries: Vec<Entry> = accounting_stage.output.lock().unwrap().iter().collect();
assert_eq!(entries.len(), 1);
assert_eq!(entries[0].events.len(), txs as usize);

Expand Down
4 changes: 2 additions & 2 deletions src/bin/testnode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,9 +115,9 @@ fn main() {

eprintln!("creating networking stack...");

let accounting = AccountingStage::new(acc, &last_id, Some(1000));
let accounting_stage = AccountingStage::new(acc, &last_id, Some(1000));
let exit = Arc::new(AtomicBool::new(false));
let tpu = Arc::new(Tpu::new(accounting));
let tpu = Arc::new(Tpu::new(accounting_stage));
let serve_sock = UdpSocket::bind(&serve_addr).unwrap();
let gossip_sock = UdpSocket::bind(&gossip_addr).unwrap();
let replicate_sock = UdpSocket::bind(&replicate_addr).unwrap();
Expand Down
16 changes: 8 additions & 8 deletions src/thin_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -182,8 +182,8 @@ mod tests {
let acc = Accountant::new(&alice);
let bob_pubkey = KeyPair::new().pubkey();
let exit = Arc::new(AtomicBool::new(false));
let accounting = AccountingStage::new(acc, &alice.last_id(), Some(30));
let acc = Arc::new(Tpu::new(accounting));
let accounting_stage = AccountingStage::new(acc, &alice.last_id(), Some(30));
let acc = Arc::new(Tpu::new(accounting_stage));
let threads = Tpu::serve(&acc, d, serve, skinny, gossip, exit.clone(), sink()).unwrap();
sleep(Duration::from_millis(300));

Expand Down Expand Up @@ -218,8 +218,8 @@ mod tests {
let acc = Accountant::new(&alice);
let bob_pubkey = KeyPair::new().pubkey();
let exit = Arc::new(AtomicBool::new(false));
let accounting = AccountingStage::new(acc, &alice.last_id(), Some(30));
let tpu = Arc::new(Tpu::new(accounting));
let accounting_stage = AccountingStage::new(acc, &alice.last_id(), Some(30));
let tpu = Arc::new(Tpu::new(accounting_stage));
let serve_addr = leader_serve.local_addr().unwrap();
let threads = Tpu::serve(
&tpu,
Expand Down Expand Up @@ -287,14 +287,14 @@ mod tests {

let leader_acc = {
let acc = Accountant::new(&alice);
let accounting = AccountingStage::new(acc, &alice.last_id(), Some(30));
Arc::new(Tpu::new(accounting))
let accounting_stage = AccountingStage::new(acc, &alice.last_id(), Some(30));
Arc::new(Tpu::new(accounting_stage))
};

let replicant_acc = {
let acc = Accountant::new(&alice);
let accounting = AccountingStage::new(acc, &alice.last_id(), Some(30));
Arc::new(Tpu::new(accounting))
let accounting_stage = AccountingStage::new(acc, &alice.last_id(), Some(30));
Arc::new(Tpu::new(accounting_stage))
};

let leader_threads = Tpu::serve(
Expand Down
29 changes: 15 additions & 14 deletions src/tpu.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,25 +30,25 @@ use thin_client_service::{Request, Response, ThinClientService};
use timing;

pub struct Tpu {
accounting: AccountingStage,
accounting_stage: AccountingStage,
thin_client_service: ThinClientService,
}

type SharedTpu = Arc<Tpu>;

impl Tpu {
/// Create a new Tpu that wraps the given Accountant.
pub fn new(accounting: AccountingStage) -> Self {
let thin_client_service = ThinClientService::new(accounting.acc.clone());
pub fn new(accounting_stage: AccountingStage) -> Self {
let thin_client_service = ThinClientService::new(accounting_stage.acc.clone());
Tpu {
accounting,
accounting_stage,
thin_client_service,
}
}

fn update_entry<W: Write>(obj: &Tpu, writer: &Mutex<W>, entry: &Entry) {
trace!("update_entry entry");
obj.accounting.acc.register_entry_id(&entry.id);
obj.accounting_stage.acc.register_entry_id(&entry.id);
writeln!(
writer.lock().unwrap(),
"{}",
Expand All @@ -61,14 +61,14 @@ impl Tpu {
fn receive_all<W: Write>(obj: &Tpu, writer: &Mutex<W>) -> Result<Vec<Entry>> {
//TODO implement a serialize for channel that does this without allocations
let mut l = vec![];
let entry = obj.accounting
let entry = obj.accounting_stage
.output
.lock()
.unwrap()
.recv_timeout(Duration::new(1, 0))?;
Self::update_entry(obj, writer, &entry);
l.push(entry);
while let Ok(entry) = obj.accounting.output.lock().unwrap().try_recv() {
while let Ok(entry) = obj.accounting_stage.output.lock().unwrap().try_recv() {
Self::update_entry(obj, writer, &entry);
l.push(entry);
}
Expand Down Expand Up @@ -338,7 +338,7 @@ impl Tpu {
debug!("events: {} reqs: {}", events.len(), reqs.len());

debug!("process_events");
obj.accounting.process_events(events)?;
obj.accounting_stage.process_events(events)?;
debug!("done process_events");

debug!("process_requests");
Expand Down Expand Up @@ -378,7 +378,7 @@ impl Tpu {
for msgs in &blobs {
let blob = msgs.read().unwrap();
let entries: Vec<Entry> = deserialize(&blob.data()[..blob.meta.size]).unwrap();
let acc = &obj.accounting.acc;
let acc = &obj.accounting_stage.acc;
for entry in entries {
acc.register_entry_id(&entry.id);
for result in acc.process_verified_events(entry.events) {
Expand Down Expand Up @@ -463,7 +463,8 @@ impl Tpu {
Mutex::new(writer),
);

let t_skinny = Self::thin_client_service(obj.accounting.acc.clone(), exit.clone(), skinny);
let t_skinny =
Self::thin_client_service(obj.accounting_stage.acc.clone(), exit.clone(), skinny);

let tpu = obj.clone();
let t_server = spawn(move || loop {
Expand Down Expand Up @@ -787,8 +788,8 @@ mod tests {
let starting_balance = 10_000;
let alice = Mint::new(starting_balance);
let acc = Accountant::new(&alice);
let accounting = AccountingStage::new(acc, &alice.last_id(), Some(30));
let tpu = Arc::new(Tpu::new(accounting));
let accounting_stage = AccountingStage::new(acc, &alice.last_id(), Some(30));
let tpu = Arc::new(Tpu::new(accounting_stage));
let replicate_addr = target1_data.replicate_addr;
let threads = Tpu::replicate(
&tpu,
Expand All @@ -813,7 +814,7 @@ mod tests {
w.set_index(i).unwrap();
w.set_id(leader_id).unwrap();

let acc = &tpu.accounting.acc;
let acc = &tpu.accounting_stage.acc;

let tr0 = Event::new_timestamp(&bob_keypair, Utc::now());
let entry0 = entry::create_entry(&cur_hash, i, vec![tr0]);
Expand Down Expand Up @@ -855,7 +856,7 @@ mod tests {
msgs.push(msg);
}

let acc = &tpu.accounting.acc;
let acc = &tpu.accounting_stage.acc;
let alice_balance = acc.get_balance(&alice.keypair().pubkey()).unwrap();
assert_eq!(alice_balance, alice_ref_balance);

Expand Down

0 comments on commit f2d4799

Please sign in to comment.