From a59184b4a0ef821c955182af61ceb575983a57d9 Mon Sep 17 00:00:00 2001 From: Anatoly Yakovenko Date: Sun, 1 Jul 2018 14:45:45 -0700 Subject: [PATCH] rebase --- src/banking_stage.rs | 132 ++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 131 insertions(+), 1 deletion(-) diff --git a/src/banking_stage.rs b/src/banking_stage.rs index fddcf184f80ad0..6c38d1b7c36f13 100644 --- a/src/banking_stage.rs +++ b/src/banking_stage.rs @@ -263,6 +263,137 @@ impl BankingStage { // println!("{} tps", tps); // } //} +#[cfg(test)] +mod test { + use bank::Bank; + use banking_stage::BankingStage; + use entry::Entry; + use mint::Mint; + use packet::{to_packets_chunked, BlobRecycler, PacketRecycler, SharedPackets}; + use record_stage::RecordStage; + use serde_json; + use signature::{KeyPair, KeyPairUtil}; + use std; + use std::fs::File; + use std::io::BufRead; + use std::io::BufReader; + use std::iter; + use std::sync::atomic::{AtomicBool, Ordering}; + use std::sync::mpsc::channel; + use std::sync::{Arc, Mutex}; + use std::thread::sleep; + use std::time::Duration; + use transaction::Transaction; + use write_stage::WriteStage; + use std::io::Write; + + fn poll_file_for_entries(path: &str, num: usize) -> Option> { + for _ in 0..20 { + let entries: Vec = BufReader::new(File::open(path).unwrap()) + .lines() + .filter_map(|line| { + if let Ok(entry) = serde_json::from_str(&line.unwrap()) { + Some(entry) + } else { + println!("failed to parse"); + None + } + }) + .collect(); + if entries.len() >= num { + return Some(entries); + } + sleep(Duration::from_millis(100)); + } + None + } + + fn encode_transaction_to_send( + packet_recycler: &PacketRecycler, + tx: Transaction, + ) -> Vec<(SharedPackets, Vec)> { + let transactions = vec![tx]; + let verified: Vec<_> = to_packets_chunked(&packet_recycler, transactions.clone(), 1) + .into_iter() + .map(|x| { + let len = (*x).read().unwrap().packets.len(); + (x, iter::repeat(1).take(len).collect()) + }) + .collect(); + assert_eq!(verified.len(), 1); + verified + } + + #[test] + fn test_new_bank_vs_file_bank() { + let exit = Arc::new(AtomicBool::new(false)); + let alice = Mint::new(10_000); + let bank = Arc::new(Bank::new(&alice)); + let packet_recycler = PacketRecycler::default(); + let blob_recycler = BlobRecycler::default(); + let (transaction_sender, transaction_receiver) = channel(); + let id = { + let ids: Vec<_> = alice.pubkey().iter().map(|id| format!("{}", id)).collect(); + ids.join("") + }; + let path = format!("target/test_new_bank_vs_file-{}.log", id); + let bob = KeyPair::new(); + let banking_stage = BankingStage::new( + bank.clone(), + exit.clone(), + transaction_receiver, + packet_recycler.clone(), + ); + let record_stage = RecordStage::new(banking_stage.signal_receiver, &bank.last_id()); + //genensis + let mut file = File::create(&path).unwrap(); + + for x in alice.create_entries() { + let serialized = serde_json::to_string(&x).unwrap(); + write!(&mut file, "{}\n", serialized).unwrap(); + } + file.sync_all().unwrap(); + + let write_stage = WriteStage::new( + bank.clone(), + exit.clone(), + blob_recycler.clone(), + Mutex::new(file), + record_stage.entry_receiver, + ); + + //get two entries into the ledger + let tx1 = Transaction::new(&alice.keypair(), bob.pubkey(), 2, alice.last_id()); + let v1 = encode_transaction_to_send(&packet_recycler, tx1); + transaction_sender.send(v1).unwrap(); + let es = poll_file_for_entries(&path, 1).unwrap(); + assert!(es[0].id != alice.last_id()); + let tx2 = Transaction::new(&alice.keypair(), bob.pubkey(), 2, es[0].id); + let v2 = encode_transaction_to_send(&packet_recycler, tx2); + transaction_sender.send(v2).unwrap(); + let entries = poll_file_for_entries(&path, 4).unwrap(); + assert_eq!(entries.len(), 4); + assert_eq!(Some(4), bank.get_balance(&bob.pubkey())); + + let new_bank = Bank::default(); + let height = new_bank.process_ledger(entries.into_iter()).unwrap(); + assert_eq!( + new_bank.get_balance(&bob.pubkey()), + bank.get_balance(&bob.pubkey()) + ); + assert_eq!(height, 4); + let thread_hdls = vec![ + banking_stage.thread_hdl, + record_stage.thread_hdl, + write_stage.thread_hdl, + ]; + exit.store(true, Ordering::Relaxed); + for t in thread_hdls { + t.join().unwrap(); + } + std::fs::remove_file(&path).unwrap(); + } +} #[cfg(all(feature = "unstable", test))] mod bench { @@ -430,5 +561,4 @@ mod bench { check_txs(verified_len, &signal_receiver, tx); }); } - }