diff --git a/src/bank.rs b/src/bank.rs index 153aec2c286fa2..3645c953d3c362 100644 --- a/src/bank.rs +++ b/src/bank.rs @@ -478,9 +478,7 @@ impl Bank { result?; } } - if !entry.has_more { - self.register_entry_id(&entry.id); - } + self.register_entry_id(&entry.id); Ok(()) } @@ -680,11 +678,9 @@ mod tests { use hash::hash; use ledger; use logger; - use packet::BLOB_DATA_SIZE; use signature::{GenKeys, KeypairUtil}; use std; use std::io::{BufReader, Cursor, Seek, SeekFrom}; - use std::mem::size_of; #[test] fn test_bank_new() { @@ -901,25 +897,6 @@ mod tests { assert_eq!(bank.get_balance(&mint.pubkey()), 1); } - fn create_sample_block_with_next_entries( - mint: &Mint, - length: usize, - ) -> impl Iterator { - let keypair = Keypair::new(); - let hash = mint.last_id(); - let mut txs = Vec::with_capacity(length); - for i in 0..length { - txs.push(Transaction::system_new( - &mint.keypair(), - keypair.pubkey(), - i as i64, - hash, - )); - } - let entries = ledger::next_entries(&hash, 0, txs); - entries.into_iter() - } - fn create_sample_block_with_next_entries_using_keypairs( mint: &Mint, keypairs: &[Keypair], @@ -940,21 +917,12 @@ mod tests { for _ in 0..length { let keypair = Keypair::new(); let tx = Transaction::system_new(&mint.keypair(), keypair.pubkey(), 1, hash); - let entry = Entry::new_mut(&mut hash, &mut num_hashes, vec![tx], false); + let entry = Entry::new_mut(&mut hash, &mut num_hashes, vec![tx]); entries.push(entry); } entries.into_iter() } - fn create_sample_ledger_with_next_entries( - length: usize, - ) -> (impl Iterator, Pubkey) { - let mint = Mint::new((length * length) as i64); - let genesis = mint.create_entries(); - let block = create_sample_block_with_next_entries(&mint, length); - (genesis.into_iter().chain(block), mint.pubkey()) - } - fn create_sample_ledger(length: usize) -> (impl Iterator, Pubkey) { let mint = Mint::new(1 + length as i64); let genesis = mint.create_entries(); @@ -1038,16 +1006,6 @@ mod tests { assert_eq!(bank.get_balance(&mint.pubkey()), 1); } - #[test] - fn test_process_ledger_has_more_cross_block() { - // size_of is quite large for serialized size, so - // use 2 * verify_block_size to ensure we get enough txes to cross that - // block boundary with has_more set - let num_txs = (2 * VERIFY_BLOCK_SIZE) * BLOB_DATA_SIZE / size_of::(); - let (ledger, _pubkey) = create_sample_ledger_with_next_entries(num_txs); - let bank = Bank::default(); - assert!(bank.process_ledger(ledger).is_ok()); - } #[test] fn test_new_default() { let def_bank = Bank::default(); diff --git a/src/banking_stage.rs b/src/banking_stage.rs index ea7f2675467f25..c6a0dc941ededa 100644 --- a/src/banking_stage.rs +++ b/src/banking_stage.rs @@ -5,10 +5,13 @@ use bank::Bank; use bincode::deserialize; use counter::Counter; +use entry::Entry; +use hash::{Hash, Hasher}; use log::Level; use packet::{Packets, SharedPackets}; +use poh::PohEntry; +use poh_service::PohService; use rayon::prelude::*; -use record_stage::Signal; use result::{Error, Result}; use service::Service; use std::net::SocketAddr; @@ -34,21 +37,35 @@ impl BankingStage { pub fn new( bank: Arc, verified_receiver: Receiver)>>, - ) -> (Self, Receiver) { - let (signal_sender, signal_receiver) = channel(); + tick_duration: Option, + ) -> (Self, Receiver>) { + let (entry_sender, entry_receiver) = channel(); let thread_hdl = Builder::new() .name("solana-banking-stage".to_string()) - .spawn(move || loop { - if let Err(e) = Self::process_packets(&bank, &verified_receiver, &signal_sender) { - match e { - Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => break, - Error::RecvTimeoutError(RecvTimeoutError::Timeout) => (), - Error::SendError => break, - _ => error!("{:?}", e), + .spawn(move || { + let (hash_sender, hash_receiver) = channel(); + let (poh_service, poh_receiver) = + PohService::new(bank.last_id(), hash_receiver, tick_duration); + loop { + if let Err(e) = Self::process_packets( + &bank, + &hash_sender, + &poh_receiver, + &verified_receiver, + &entry_sender, + ) { + match e { + Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => break, + Error::RecvTimeoutError(RecvTimeoutError::Timeout) => (), + Error::SendError => break, + _ => error!("{:?}", e), + } } } + drop(hash_sender); + poh_service.join().unwrap(); }).unwrap(); - (BankingStage { thread_hdl }, signal_receiver) + (BankingStage { thread_hdl }, entry_receiver) } /// Convert the transactions from a blob of binary data to a vector of transactions and @@ -63,16 +80,97 @@ impl BankingStage { }).collect() } + fn process_transactions( + bank: &Arc, + transactions: Vec, + hash_sender: &Sender, + poh_receiver: &Receiver, + entry_sender: &Sender>, + ) -> Result<()> { + let mut entries = Vec::new(); + + debug!("transactions: {}", transactions.len()); + + let mut chunk_start = 0; + while chunk_start != transactions.len() { + let chunk_end = chunk_start + Entry::num_will_fit(transactions[chunk_start..].to_vec()); + + let results = bank.process_transactions(transactions[chunk_start..chunk_end].to_vec()); + + debug!("results: {}", results.len()); + + chunk_start = chunk_end; + + let mut hasher = Hasher::default(); + + let processed_transactions: Vec<_> = results + .into_iter() + .filter_map(|x| match x { + Ok(x) => { + hasher.hash(&x.signature.as_ref()); + Some(x) + } + Err(e) => { + debug!("process transaction failed {:?}", e); + None + } + }).collect(); + + debug!("processed ok: {}", processed_transactions.len()); + + let hash = hasher.result(); + + if processed_transactions.len() != 0 { + hash_sender.send(hash)?; + + let mut answered = false; + while !answered { + entries.extend(poh_receiver.try_iter().map(|poh| { + if let Some(mixin) = poh.mixin { + answered = true; + assert_eq!(mixin, hash); + bank.register_entry_id(&poh.id); + Entry { + num_hashes: poh.num_hashes, + id: poh.id, + transactions: processed_transactions.clone(), + } + } else { + Entry { + num_hashes: poh.num_hashes, + id: poh.id, + transactions: vec![], + } + } + })); + } + } else { + entries.extend(poh_receiver.try_iter().map(|poh| Entry { + num_hashes: poh.num_hashes, + id: poh.id, + transactions: vec![], + })); + } + } + + debug!("done process_transactions, {} entries", entries.len()); + + Ok(entry_sender.send(entries)?) + } + /// Process the incoming packets and send output `Signal` messages to `signal_sender`. /// Discard packets via `packet_recycler`. pub fn process_packets( bank: &Arc, + hash_sender: &Sender, + poh_receiver: &Receiver, verified_receiver: &Receiver)>>, - signal_sender: &Sender, + entry_sender: &Sender>, ) -> Result<()> { let timer = Duration::new(1, 0); let recv_start = Instant::now(); let mms = verified_receiver.recv_timeout(timer)?; + debug!("verified_recevier {:?}", verified_receiver); let mut reqs_len = 0; let mms_len = mms.len(); info!( @@ -87,7 +185,8 @@ impl BankingStage { for (msgs, vers) in mms { let transactions = Self::deserialize_transactions(&msgs.read()); reqs_len += transactions.len(); - let transactions = transactions + + let transactions: Vec<_> = transactions .into_iter() .zip(vers) .filter_map(|(tx, ver)| match tx { @@ -99,14 +198,15 @@ impl BankingStage { }, }).collect(); - debug!("process_transactions"); - let results = bank.process_transactions(transactions); - let transactions = results.into_iter().filter_map(|x| x.ok()).collect(); - if let Err(_) = signal_sender.send(Signal::Transactions(transactions)) { - return Err(Error::SendError); - } - debug!("done process_transactions"); + Self::process_transactions( + bank, + transactions, + hash_sender, + poh_receiver, + entry_sender, + )?; } + let total_time_s = timing::duration_as_s(&proc_start.elapsed()); let total_time_ms = timing::duration_as_ms(&proc_start.elapsed()); info!( diff --git a/src/broadcast_stage.rs b/src/broadcast_stage.rs index 8801a04268cb51..438e913d7b0a88 100644 --- a/src/broadcast_stage.rs +++ b/src/broadcast_stage.rs @@ -269,8 +269,8 @@ mod tests { use broadcast_stage::{BroadcastStage, BroadcastStageReturnType}; use crdt::{Crdt, Node}; use entry::Entry; + use ledger::next_entries_mut; use mint::Mint; - use recorder::Recorder; use service::Service; use signature::{Keypair, KeypairUtil, Pubkey}; use std::cmp; @@ -361,19 +361,19 @@ mod tests { } let genesis_len = broadcast_info.entries.len() as u64; - let last_entry_hash = broadcast_info + let mut last_id = broadcast_info .entries .last() .expect("Ledger should not be empty") .id; + let mut num_hashes = 0; // Input enough entries to make exactly leader_rotation_interval entries, which will // trigger a check for leader rotation. Because the next scheduled leader // is ourselves, we won't exit - let mut recorder = Recorder::new(last_entry_hash); - for _ in genesis_len..leader_rotation_interval { - let new_entry = recorder.record(vec![]); + let new_entry = next_entries_mut(&mut last_id, &mut num_hashes, vec![]); + broadcast_info.entry_sender.send(new_entry).unwrap(); } @@ -388,7 +388,8 @@ mod tests { // past the point of the leader rotation. The write_stage will see that // it's no longer the leader after checking the crdt, and exit for _ in 0..leader_rotation_interval { - let new_entry = recorder.record(vec![]); + let new_entry = next_entries_mut(&mut last_id, &mut num_hashes, vec![]); + match broadcast_info.entry_sender.send(new_entry) { // We disconnected, break out of loop and check the results Err(_) => break, diff --git a/src/entry.rs b/src/entry.rs index 365c382499b384..057dbb0895b8ab 100644 --- a/src/entry.rs +++ b/src/entry.rs @@ -3,8 +3,9 @@ //! transactions within it. Entries cannot be reordered, and its field `num_hashes` //! represents an approximate amount of time since the last Entry was created. use bincode::{serialize_into, serialized_size}; -use hash::{extend_and_hash, hash, Hash}; +use hash::Hash; use packet::{BlobRecycler, SharedBlob, BLOB_DATA_SIZE}; +use poh::Poh; use rayon::prelude::*; use signature::Pubkey; use std::io::Cursor; @@ -42,30 +43,17 @@ pub struct Entry { /// generated. They may have been observed before a previous Entry ID but were /// pushed back into this list to ensure deterministic interpretation of the ledger. pub transactions: Vec, - - /// Indication that: - /// 1. the next Entry in the ledger has transactions that can potentially - /// be verified in parallel with these transactions - /// 2. this Entry can be left out of the bank's entry_id cache for - /// purposes of duplicate rejection - pub has_more: bool, } impl Entry { /// Creates the next Entry `num_hashes` after `start_hash`. - pub fn new( - start_hash: &Hash, - num_hashes: u64, - transactions: Vec, - has_more: bool, - ) -> Self { + pub fn new(start_hash: &Hash, num_hashes: u64, transactions: Vec) -> Self { let num_hashes = num_hashes + if transactions.is_empty() { 0 } else { 1 }; let id = next_hash(start_hash, 0, &transactions); let entry = Entry { num_hashes, id, transactions, - has_more, }; let size = serialized_size(&entry).unwrap(); @@ -115,19 +103,53 @@ impl Entry { num_hashes: 0, id: Hash::default(), transactions, - has_more: false, }).unwrap() <= BLOB_DATA_SIZE as u64 } + pub fn num_will_fit(transactions: Vec) -> usize { + if transactions.len() == 0 { + return 0; + } + let mut num = transactions.len(); + let mut upper = transactions.len(); + let mut lower = 1; // if one won't fit, we have a lot of TODOs + let mut next = transactions.len(); // optimistic + loop { + debug!( + "num {}, upper {} lower {} next {} transactions.len() {}", + num, + upper, + lower, + next, + transactions.len() + ); + if Entry::will_fit(transactions[..num].to_vec()) { + next = (upper + num) / 2; + lower = num; + debug!("num {} fits, maybe too well? trying {}", num, next); + } else { + next = (lower + num) / 2; + upper = num; + debug!("num {} doesn't fit! trying {}", num, next); + } + // same as last time + if next == num { + debug!("converged on num {}", num); + break; + } + num = next; + } + num + } + /// Creates the next Tick Entry `num_hashes` after `start_hash`. pub fn new_mut( start_hash: &mut Hash, num_hashes: &mut u64, transactions: Vec, - has_more: bool, ) -> Self { - let entry = Self::new(start_hash, *num_hashes, transactions, has_more); + let entry = Self::new(start_hash, *num_hashes, transactions); *start_hash = entry.id; *num_hashes = 0; assert!(serialized_size(&entry).unwrap() <= BLOB_DATA_SIZE as u64); @@ -141,7 +163,6 @@ impl Entry { num_hashes, id: *id, transactions: vec![], - has_more: false, } } @@ -170,34 +191,22 @@ impl Entry { } } -fn add_transaction_data(hash_data: &mut Vec, tx: &Transaction) { - hash_data.push(0u8); - hash_data.extend_from_slice(&tx.signature.as_ref()); -} - /// Creates the hash `num_hashes` after `start_hash`. If the transaction contains /// a signature, the final hash will be a hash of both the previous ID and /// the signature. If num_hashes is zero and there's no transaction data, /// start_hash is returned. fn next_hash(start_hash: &Hash, num_hashes: u64, transactions: &[Transaction]) -> Hash { - let mut id = *start_hash; - for _ in 1..num_hashes { - id = hash(&id.as_ref()); + if num_hashes == 0 && transactions.len() == 0 { + return *start_hash; } - // Hash all the transaction data - let mut hash_data = vec![]; - for tx in transactions { - add_transaction_data(&mut hash_data, tx); - } + let mut poh = Poh::new(*start_hash, None); - if !hash_data.is_empty() { - extend_and_hash(&id, &hash_data) - } else if num_hashes != 0 { - hash(&id.as_ref()) - } else { - id + for _ in 1..num_hashes { + poh.hash(); } + + poh.record(Transaction::hash(transactions)).id } /// Creates the next Tick or Transaction Entry `num_hashes` after `start_hash`. @@ -207,7 +216,6 @@ pub fn next_entry(start_hash: &Hash, num_hashes: u64, transactions: Vec EntryWriter<'a, W> { fn write_and_register_entry(&mut self, entry: &Entry) -> io::Result<()> { trace!("write_and_register_entry entry"); - if !entry.has_more { - self.bank.register_entry_id(&entry.id); - } + self.bank.register_entry_id(&entry.id); + Self::write_entry(&mut self.writer, entry) } @@ -101,46 +100,8 @@ pub fn read_entries(reader: R) -> impl Iterator= PACKET_DATA_SIZE); - let threshold = (BLOB_DATA_SIZE / tx_size) - 1; // PACKET_DATA_SIZE is transaction size - - // Verify large entries are split up and the first sets has_more. - let txs = vec![tx.clone(); threshold * 2]; - let entries = ledger::next_entries(&mint.last_id(), 0, txs); - assert_eq!(entries.len(), 2); - assert!(entries[0].has_more); - assert!(!entries[1].has_more); - - // Verify that write_and_register_entry doesn't register the first entries after a split. - assert_eq!(bank.last_id(), mint.last_id()); - entry_writer.write_and_register_entry(&entries[0]).unwrap(); - assert_eq!(bank.last_id(), mint.last_id()); - - // Verify that write_and_register_entry registers the final entry after a split. - entry_writer.write_and_register_entry(&entries[1]).unwrap(); - assert_eq!(bank.last_id(), entries[1].id); - } /// Same as read_entries() but parsing a buffer and returning a vector. fn read_entries_from_buf(s: &[u8]) -> io::Result> { diff --git a/src/hash.rs b/src/hash.rs index 0a00ba2115f902..b0e8386460c0f6 100644 --- a/src/hash.rs +++ b/src/hash.rs @@ -9,6 +9,29 @@ use std::fmt; #[derive(Serialize, Deserialize, Clone, Copy, Default, Eq, PartialEq, Ord, PartialOrd, Hash)] pub struct Hash(GenericArray); +#[derive(Clone, Default)] +pub struct Hasher { + hasher: Sha256, +} + +impl Hasher { + pub fn hash(&mut self, val: &[u8]) -> () { + self.hasher.input(val); + } + pub fn hashv(&mut self, vals: &[&[u8]]) -> () { + for val in vals { + self.hash(val); + } + } + pub fn result(self) -> Hash { + // At the time of this writing, the sha2 library is stuck on an old version + // of generic_array (0.9.0). Decouple ourselves with a clone to our version. + Hash(GenericArray::clone_from_slice( + self.hasher.result().as_slice(), + )) + } +} + impl AsRef<[u8]> for Hash { fn as_ref(&self) -> &[u8] { &self.0[..] @@ -34,13 +57,9 @@ impl Hash { } /// Return a Sha256 hash for the given data. pub fn hashv(vals: &[&[u8]]) -> Hash { - let mut hasher = Sha256::default(); - for val in vals { - hasher.input(val); - } - // At the time of this writing, the sha2 library is stuck on an old version - // of generic_array (0.9.0). Decouple ourselves with a clone to our version. - Hash(GenericArray::clone_from_slice(hasher.result().as_slice())) + let mut hasher = Hasher::default(); + hasher.hashv(vals); + hasher.result() } /// Return a Sha256 hash for the given data. diff --git a/src/ledger.rs b/src/ledger.rs index 26ebcb020f3fd4..fa71153c774e75 100644 --- a/src/ledger.rs +++ b/src/ledger.rs @@ -479,10 +479,10 @@ pub fn next_entries_mut( num_hashes: &mut u64, transactions: Vec, ) -> Vec { - // TODO: find a magic number that works better than | ? - // V + // TODO: ?? find a number that works better than |? + // V if transactions.is_empty() || transactions.len() == 1 { - vec![Entry::new_mut(start_hash, num_hashes, transactions, false)] + vec![Entry::new_mut(start_hash, num_hashes, transactions)] } else { let mut chunk_start = 0; let mut entries = Vec::new(); @@ -526,7 +526,6 @@ pub fn next_entries_mut( start_hash, num_hashes, transactions[chunk_start..chunk_end].to_vec(), - transactions.len() - chunk_end > 0, )); chunk_start = chunk_end; } @@ -612,7 +611,6 @@ mod tests { Utc::now(), one, )], - false, ) }).collect() } @@ -698,7 +696,6 @@ mod tests { num_hashes: 0, id: Hash::default(), transactions: vec![], - has_more: false, }).unwrap() as usize; assert!(tx_small_size < tx_large_size); assert!(tx_large_size < PACKET_DATA_SIZE); @@ -715,8 +712,6 @@ mod tests { let transactions = vec![tx_small.clone(); threshold * 2]; let entries0 = next_entries(&id, 0, transactions.clone()); assert_eq!(entries0.len(), 2); - assert!(entries0[0].has_more); - assert!(!entries0[entries0.len() - 1].has_more); assert!(entries0.verify(&id)); // verify the split with small transactions followed by large @@ -728,8 +723,6 @@ mod tests { let entries0 = next_entries(&id, 0, transactions.clone()); assert!(entries0.len() >= 2); - assert!(entries0[0].has_more); - assert!(!entries0[entries0.len() - 1].has_more); assert!(entries0.verify(&id)); } diff --git a/src/lib.rs b/src/lib.rs index d0da09ce1a977a..3c487d3641fdad 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -38,8 +38,6 @@ pub mod packet; pub mod payment_plan; pub mod poh; pub mod poh_service; -pub mod record_stage; -pub mod recorder; pub mod recvmmsg; pub mod recycler; pub mod replicate_stage; diff --git a/src/mint.rs b/src/mint.rs index adbbe66205471c..e7545142cd2c27 100644 --- a/src/mint.rs +++ b/src/mint.rs @@ -57,8 +57,8 @@ impl Mint { } pub fn create_entries(&self) -> Vec { - let e0 = Entry::new(&self.seed(), 0, vec![], false); - let e1 = Entry::new(&e0.id, 0, self.create_transactions(), false); + let e0 = Entry::new(&self.seed(), 0, vec![]); + let e1 = Entry::new(&e0.id, 0, self.create_transactions()); vec![e0, e1] } } diff --git a/src/poh_service.rs b/src/poh_service.rs index 23b7f143840240..ed07cbac5f0a9a 100644 --- a/src/poh_service.rs +++ b/src/poh_service.rs @@ -22,34 +22,29 @@ pub struct PohService { impl PohService { /// A background thread that will continue tagging received Transaction messages and /// sending back Entry messages until either the receiver or sender channel is closed. - pub fn new(start_hash: Hash, hash_receiver: Receiver) -> (Self, Receiver) { - let (poh_sender, poh_receiver) = channel(); - let thread_hdl = Builder::new() - .name("solana-record-service".to_string()) - .spawn(move || { - let mut poh = Poh::new(start_hash, None); - let _ = Self::process_hashes(&mut poh, &hash_receiver, &poh_sender); - }).unwrap(); - - (PohService { thread_hdl }, poh_receiver) - } - - /// Same as `PohService::new`, but will automatically produce entries every `tick_duration`. - pub fn new_with_clock( + /// if tick_duration is some, service will automatically produce entries every + /// `tick_duration`. + pub fn new( start_hash: Hash, hash_receiver: Receiver, - tick_duration: Duration, + tick_duration: Option, ) -> (Self, Receiver) { let (poh_sender, poh_receiver) = channel(); + let thread_hdl = Builder::new() .name("solana-record-service".to_string()) .spawn(move || { - let mut poh = Poh::new(start_hash, Some(tick_duration)); - loop { - if Self::try_process_hashes(&mut poh, &hash_receiver, &poh_sender).is_err() { - return; + let mut poh = Poh::new(start_hash, tick_duration); + if tick_duration.is_some() { + loop { + if Self::try_process_hashes(&mut poh, &hash_receiver, &poh_sender).is_err() + { + return; + } + poh.hash(); } - poh.hash(); + } else { + let _ = Self::process_hashes(&mut poh, &hash_receiver, &poh_sender); } }).unwrap(); @@ -111,7 +106,7 @@ mod tests { #[test] fn test_poh() { let (hash_sender, hash_receiver) = channel(); - let (poh_service, poh_receiver) = PohService::new(Hash::default(), hash_receiver); + let (poh_service, poh_receiver) = PohService::new(Hash::default(), hash_receiver, None); hash_sender.send(Hash::default()).unwrap(); sleep(Duration::from_millis(1)); @@ -136,7 +131,7 @@ mod tests { #[test] fn test_poh_closed_sender() { let (hash_sender, hash_receiver) = channel(); - let (poh_service, poh_receiver) = PohService::new(Hash::default(), hash_receiver); + let (poh_service, poh_receiver) = PohService::new(Hash::default(), hash_receiver, None); drop(poh_receiver); hash_sender.send(Hash::default()).unwrap(); assert_eq!(poh_service.thread_hdl.join().unwrap(), ()); @@ -145,8 +140,11 @@ mod tests { #[test] fn test_poh_clock() { let (hash_sender, hash_receiver) = channel(); - let (_poh_service, poh_receiver) = - PohService::new_with_clock(Hash::default(), hash_receiver, Duration::from_millis(1)); + let (_poh_service, poh_receiver) = PohService::new( + Hash::default(), + hash_receiver, + Some(Duration::from_millis(1)), + ); sleep(Duration::from_millis(50)); drop(hash_sender); diff --git a/src/record_stage.rs b/src/record_stage.rs deleted file mode 100644 index e2da8665593b00..00000000000000 --- a/src/record_stage.rs +++ /dev/null @@ -1,240 +0,0 @@ -//! The `record_stage` module provides an object for generating a Proof of History. -//! It records Transaction items on behalf of its users. It continuously generates -//! new hashes, only stopping to check if it has been sent an Transaction item. It -//! tags each Transaction with an Entry, and sends it back. The Entry includes the -//! Transaction, the latest hash, and the number of hashes since the last transaction. -//! The resulting stream of entries represents ordered transactions in time. - -use bank::Bank; -use counter::Counter; -use entry::Entry; -use log::Level; -use recorder::Recorder; -use service::Service; -use std::sync::atomic::AtomicUsize; -use std::sync::mpsc::{channel, Receiver, RecvError, Sender, TryRecvError}; -use std::sync::Arc; -use std::thread::{self, Builder, JoinHandle}; -use std::time::{Duration, Instant}; -use transaction::Transaction; - -#[cfg_attr(feature = "cargo-clippy", allow(large_enum_variant))] -pub enum Signal { - Tick, - Transactions(Vec), -} - -pub struct RecordStage { - thread_hdl: JoinHandle<()>, -} - -impl RecordStage { - /// A background thread that will continue tagging received Transaction messages and - /// sending back Entry messages until either the receiver or sender channel is closed. - pub fn new(signal_receiver: Receiver, bank: Arc) -> (Self, Receiver>) { - let (entry_sender, entry_receiver) = channel(); - let start_hash = bank.last_id(); - - let thread_hdl = Builder::new() - .name("solana-record-stage".to_string()) - .spawn(move || { - let mut recorder = Recorder::new(start_hash); - let _ = - Self::process_signals(&mut recorder, &signal_receiver, &bank, &entry_sender); - }).unwrap(); - - (RecordStage { thread_hdl }, entry_receiver) - } - - /// Same as `RecordStage::new`, but will automatically produce entries every `tick_duration`. - pub fn new_with_clock( - signal_receiver: Receiver, - bank: Arc, - tick_duration: Duration, - ) -> (Self, Receiver>) { - let (entry_sender, entry_receiver) = channel(); - let start_hash = bank.last_id(); - - let thread_hdl = Builder::new() - .name("solana-record-stage".to_string()) - .spawn(move || { - let mut recorder = Recorder::new(start_hash); - let start_time = Instant::now(); - loop { - if Self::try_process_signals( - &mut recorder, - start_time, - tick_duration, - &signal_receiver, - &bank, - &entry_sender, - ).is_err() - { - return; - } - recorder.hash(); - } - }).unwrap(); - - (RecordStage { thread_hdl }, entry_receiver) - } - - fn process_signal( - signal: Signal, - bank: &Arc, - recorder: &mut Recorder, - sender: &Sender>, - ) -> Result<(), ()> { - let txs = if let Signal::Transactions(txs) = signal { - txs - } else { - vec![] - }; - let txs_len = txs.len(); - let entries = recorder.record(txs); - - for entry in &entries { - if !entry.has_more { - bank.register_entry_id(&entry.id); - } - } - - let entries_len = entries.len(); - sender.send(entries).or(Err(()))?; - - inc_new_counter_info!("record_stage-txs", txs_len); - inc_new_counter_info!("record_stage-entries", entries_len); - - Ok(()) - } - - fn process_signals( - recorder: &mut Recorder, - receiver: &Receiver, - bank: &Arc, - sender: &Sender>, - ) -> Result<(), ()> { - loop { - match receiver.recv() { - Ok(signal) => Self::process_signal(signal, bank, recorder, sender)?, - Err(RecvError) => return Err(()), - } - } - } - - fn try_process_signals( - recorder: &mut Recorder, - start_time: Instant, - tick_duration: Duration, - receiver: &Receiver, - bank: &Arc, - sender: &Sender>, - ) -> Result<(), ()> { - loop { - if let Some(entry) = recorder.tick(start_time, tick_duration) { - sender.send(vec![entry]).or(Err(()))?; - } - match receiver.try_recv() { - Ok(signal) => Self::process_signal(signal, &bank, recorder, sender)?, - Err(TryRecvError::Empty) => return Ok(()), - Err(TryRecvError::Disconnected) => return Err(()), - }; - } - } -} - -impl Service for RecordStage { - type JoinReturnType = (); - - fn join(self) -> thread::Result<()> { - self.thread_hdl.join() - } -} - -#[cfg(test)] -mod tests { - use super::*; - use bank::Bank; - use ledger::Block; - use mint::Mint; - use signature::{Keypair, KeypairUtil}; - use std::sync::mpsc::channel; - use std::sync::Arc; - use std::thread::sleep; - - #[test] - fn test_historian() { - let (tx_sender, tx_receiver) = channel(); - let mint = Mint::new(1234); - let bank = Arc::new(Bank::new(&mint)); - let zero = bank.last_id(); - let (record_stage, entry_receiver) = RecordStage::new(tx_receiver, bank); - - tx_sender.send(Signal::Tick).unwrap(); - sleep(Duration::new(0, 1_000_000)); - tx_sender.send(Signal::Tick).unwrap(); - sleep(Duration::new(0, 1_000_000)); - tx_sender.send(Signal::Tick).unwrap(); - - let entry0 = entry_receiver.recv().unwrap()[0].clone(); - let entry1 = entry_receiver.recv().unwrap()[0].clone(); - let entry2 = entry_receiver.recv().unwrap()[0].clone(); - - assert_eq!(entry0.num_hashes, 0); - assert_eq!(entry1.num_hashes, 0); - assert_eq!(entry2.num_hashes, 0); - - drop(tx_sender); - assert_eq!(record_stage.thread_hdl.join().unwrap(), ()); - - assert!([entry0, entry1, entry2].verify(&zero)); - } - - #[test] - fn test_historian_closed_sender() { - let (tx_sender, tx_receiver) = channel(); - let mint = Mint::new(1234); - let bank = Arc::new(Bank::new(&mint)); - let (record_stage, entry_receiver) = RecordStage::new(tx_receiver, bank); - drop(entry_receiver); - tx_sender.send(Signal::Tick).unwrap(); - assert_eq!(record_stage.thread_hdl.join().unwrap(), ()); - } - - #[test] - fn test_transactions() { - let (tx_sender, signal_receiver) = channel(); - let mint = Mint::new(1234); - let bank = Arc::new(Bank::new(&mint)); - let zero = bank.last_id(); - let (_record_stage, entry_receiver) = RecordStage::new(signal_receiver, bank); - let alice_keypair = Keypair::new(); - let bob_pubkey = Keypair::new().pubkey(); - let tx0 = Transaction::new(&alice_keypair, bob_pubkey, 1, zero); - let tx1 = Transaction::new(&alice_keypair, bob_pubkey, 2, zero); - tx_sender - .send(Signal::Transactions(vec![tx0, tx1])) - .unwrap(); - drop(tx_sender); - let entries: Vec<_> = entry_receiver.iter().collect(); - assert_eq!(entries.len(), 1); - } - - #[test] - fn test_clock() { - let (tx_sender, tx_receiver) = channel(); - let mint = Mint::new(1234); - let bank = Arc::new(Bank::new(&mint)); - let zero = bank.last_id(); - let (_record_stage, entry_receiver) = - RecordStage::new_with_clock(tx_receiver, bank, Duration::from_millis(20)); - sleep(Duration::from_millis(900)); - tx_sender.send(Signal::Tick).unwrap(); - drop(tx_sender); - let entries: Vec<_> = entry_receiver.iter().flat_map(|x| x).collect(); - assert!(entries.len() > 1); - - // Ensure the ID is not the seed. - assert_ne!(entries[0].id, zero); - } -} diff --git a/src/recorder.rs b/src/recorder.rs deleted file mode 100644 index 85135c40867922..00000000000000 --- a/src/recorder.rs +++ /dev/null @@ -1,48 +0,0 @@ -//! The `recorder` module provides an object for generating a Proof of History. -//! It records Transaction items on behalf of its users. - -use entry::Entry; -use hash::{hash, Hash}; -use ledger; -use std::time::{Duration, Instant}; -use transaction::Transaction; - -pub struct Recorder { - last_hash: Hash, - num_hashes: u64, - num_ticks: u32, -} - -impl Recorder { - pub fn new(last_hash: Hash) -> Self { - Recorder { - last_hash, - num_hashes: 0, - num_ticks: 0, - } - } - - pub fn hash(&mut self) { - self.last_hash = hash(&self.last_hash.as_ref()); - self.num_hashes += 1; - } - - pub fn record(&mut self, transactions: Vec) -> Vec { - ledger::next_entries_mut(&mut self.last_hash, &mut self.num_hashes, transactions) - } - - pub fn tick(&mut self, start_time: Instant, tick_duration: Duration) -> Option { - if start_time.elapsed() > tick_duration * (self.num_ticks + 1) { - // TODO: don't let this overflow u32 - self.num_ticks += 1; - Some(Entry::new_mut( - &mut self.last_hash, - &mut self.num_hashes, - vec![], - false, - )) - } else { - None - } - } -} diff --git a/src/thin_client.rs b/src/thin_client.rs index 0d8cf283feeb12..a758a8eda04338 100644 --- a/src/thin_client.rs +++ b/src/thin_client.rs @@ -192,7 +192,7 @@ impl ThinClient { self.balances .get(pubkey) .map(Bank::read_balance) - .ok_or_else(|| io::Error::new(io::ErrorKind::Other, "nokey")) + .ok_or_else(|| io::Error::new(io::ErrorKind::Other, "AccountNotFound")) } /// Request the finality from the leader node @@ -670,7 +670,9 @@ mod tests { let balance = client.poll_get_balance(&bob_keypair.pubkey()); assert!(balance.is_err()); - server.close().unwrap(); + server + .close() + .unwrap_or_else(|e| panic!("close() failed! {:?}", e)); remove_dir_all(ledger_path).unwrap(); } } diff --git a/src/tpu.rs b/src/tpu.rs index 40e10e79e77844..312bd683043bf3 100644 --- a/src/tpu.rs +++ b/src/tpu.rs @@ -2,27 +2,27 @@ //! 5-stage transaction processing pipeline in software. //! //! ```text -//! .---------------------------------------------------------------. -//! | TPU .-----. | -//! | | PoH | | -//! | `--+--` | -//! | | | -//! | v | -//! | .-------. .-----------. .---------. .--------. .-------. | -//! .---------. | | Fetch | | SigVerify | | Banking | | Record | | Write | | .------------. -//! | Clients |--->| Stage |->| Stage |->| Stage |->| Stage |->| Stage +--->| Validators | -//! `---------` | | | | | | | | | | | | `------------` -//! | `-------` `-----------` `----+----` `--------` `---+---` | -//! | | | | -//! | | | | -//! | | | | -//! | | | | -//! `---------------------------------|-----------------------|-----` -//! | | -//! v v -//! .------. .--------. -//! | Bank | | Ledger | -//! `------` `--------` +//! .----------------------------------------------------. +//! | TPU .-------------. | +//! | | PoH Service | | +//! | `-------+-----` | +//! | ^ | | +//! | | v | +//! | .-------. .-----------. .-+-------. .-------. | +//! .---------. | | Fetch | | SigVerify | | Banking | | Write | | .------------. +//! | Clients |--->| Stage |->| Stage |->| Stage |-->| Stage +--->| Validators | +//! `---------` | | | | | | | | | | `------------` +//! | `-------` `-----------` `----+----` `---+---` | +//! | | | | +//! | | | | +//! | | | | +//! | | | | +//! `---------------------------------|------------|-----` +//! | | +//! v v +//! .------. .--------. +//! | Bank | | Ledger | +//! `------` `--------` //! ``` use bank::Bank; @@ -30,7 +30,6 @@ use banking_stage::BankingStage; use crdt::Crdt; use entry::Entry; use fetch_stage::FetchStage; -use record_stage::RecordStage; use service::Service; use signature::Keypair; use sigverify_stage::SigVerifyStage; @@ -50,7 +49,6 @@ pub struct Tpu { fetch_stage: FetchStage, sigverify_stage: SigVerifyStage, banking_stage: BankingStage, - record_stage: RecordStage, write_stage: WriteStage, exit: Arc, } @@ -73,14 +71,8 @@ impl Tpu { let (sigverify_stage, verified_receiver) = SigVerifyStage::new(packet_receiver, sigverify_disabled); - let (banking_stage, signal_receiver) = BankingStage::new(bank.clone(), verified_receiver); - - let (record_stage, entry_receiver) = match tick_duration { - Some(tick_duration) => { - RecordStage::new_with_clock(signal_receiver, bank.clone(), tick_duration) - } - None => RecordStage::new(signal_receiver, bank.clone()), - }; + let (banking_stage, entry_receiver) = + BankingStage::new(bank.clone(), verified_receiver, tick_duration); let (write_stage, entry_forwarder) = WriteStage::new( keypair, @@ -95,7 +87,6 @@ impl Tpu { fetch_stage, sigverify_stage, banking_stage, - record_stage, write_stage, exit: exit.clone(), }; @@ -119,7 +110,6 @@ impl Service for Tpu { self.fetch_stage.join()?; self.sigverify_stage.join()?; self.banking_stage.join()?; - self.record_stage.join()?; match self.write_stage.join()? { WriteStageReturnType::LeaderRotation => Ok(Some(TpuReturnType::LeaderRotation)), _ => Ok(None), diff --git a/src/transaction.rs b/src/transaction.rs index 0188de588ec5b3..b6d339f873aec5 100644 --- a/src/transaction.rs +++ b/src/transaction.rs @@ -4,7 +4,7 @@ use bincode::{deserialize, serialize}; use budget::{Budget, Condition}; use budget_program::BudgetState; use chrono::prelude::*; -use hash::Hash; +use hash::{Hash, Hasher}; use instruction::{Contract, Instruction, Vote}; use payment_plan::Payment; use signature::{Keypair, KeypairUtil, Pubkey, Signature}; @@ -292,6 +292,14 @@ impl Transaction { true } } + // a hash of a slice of transactions only needs to hash the signatures + pub fn hash(transactions: &[Transaction]) -> Hash { + let mut hasher = Hasher::default(); + transactions + .iter() + .for_each(|tx| hasher.hash(&tx.signature.as_ref())); + hasher.result() + } } pub fn test_tx() -> Transaction { diff --git a/src/tvu.rs b/src/tvu.rs index 9d4d970068c157..f16ab087a206ca 100644 --- a/src/tvu.rs +++ b/src/tvu.rs @@ -245,7 +245,7 @@ pub mod tests { let transfer_amount = 501; let bob_keypair = Keypair::new(); for i in 0..num_transfers { - let entry0 = Entry::new(&cur_hash, i, vec![], false); + let entry0 = Entry::new(&cur_hash, i, vec![]); bank.register_entry_id(&cur_hash); cur_hash = hash(&cur_hash.as_ref()); @@ -257,7 +257,7 @@ pub mod tests { ); bank.register_entry_id(&cur_hash); cur_hash = hash(&cur_hash.as_ref()); - let entry1 = Entry::new(&cur_hash, i + num_transfers, vec![tx0], false); + let entry1 = Entry::new(&cur_hash, i + num_transfers, vec![tx0]); bank.register_entry_id(&cur_hash); cur_hash = hash(&cur_hash.as_ref()); diff --git a/src/window.rs b/src/window.rs index a725f670ae52fb..6e283a7220cfa6 100644 --- a/src/window.rs +++ b/src/window.rs @@ -99,9 +99,9 @@ impl WindowUtil for Window { received: u64, ) -> Vec<(SocketAddr, Vec)> { let num_peers = crdt.read().unwrap().table.len() as u64; - let highest_lost = calculate_highest_lost_blob_index(num_peers, consumed, received); + let max_repair = calculate_max_repair(num_peers, consumed, received, times); - let idxs = self.clear_slots(consumed, highest_lost); + let idxs = self.clear_slots(consumed, max_repair); let reqs: Vec<_> = idxs .into_iter() .filter_map(|pix| crdt.read().unwrap().window_index_request(pix).ok()) @@ -110,14 +110,14 @@ impl WindowUtil for Window { inc_new_counter_info!("streamer-repair_window-repair", reqs.len()); if log_enabled!(Level::Trace) { trace!( - "{}: repair_window counter times: {} consumed: {} highest_lost: {} missing: {}", + "{}: repair_window counter times: {} consumed: {} received: {} max_repair: {} missing: {}", id, times, consumed, - highest_lost, + received, + max_repair, reqs.len() ); - for (to, _) in &reqs { trace!("{}: repair_window request to {}", id, to); } @@ -286,17 +286,22 @@ impl WindowUtil for Window { } } -fn calculate_highest_lost_blob_index(num_peers: u64, consumed: u64, received: u64) -> u64 { +fn calculate_max_repair(num_peers: u64, consumed: u64, received: u64, times: usize) -> u64 { // Calculate the highest blob index that this node should have already received // via avalanche. The avalanche splits data stream into nodes and each node retransmits // the data to their peer nodes. So there's a possibility that a blob (with index lower // than current received index) is being retransmitted by a peer node. - let highest_lost = cmp::max(consumed, received.saturating_sub(num_peers)); + let max_repair = if times >= 8 { + // if repair backoff is getting high, don't wait for avalanche + cmp::max(consumed, received) + } else { + cmp::max(consumed, received.saturating_sub(num_peers)) + }; // This check prevents repairing a blob that will cause window to roll over. Even if // the highes_lost blob is actually missing, asking to repair it might cause our // current window to move past other missing blobs - cmp::min(consumed + WINDOW_SIZE - 1, highest_lost) + cmp::min(consumed + WINDOW_SIZE - 1, max_repair) } pub fn blob_idx_in_window(id: &Pubkey, pix: u64, consumed: u64, received: &mut u64) -> bool { @@ -415,7 +420,7 @@ mod test { use std::sync::Arc; use std::time::Duration; use streamer::{receiver, responder, PacketReceiver}; - use window::{blob_idx_in_window, calculate_highest_lost_blob_index, WINDOW_SIZE}; + use window::{blob_idx_in_window, calculate_max_repair, WINDOW_SIZE}; fn get_msgs(r: PacketReceiver, num: &mut usize) { for _t in 0..5 { @@ -473,27 +478,28 @@ mod test { } #[test] - pub fn calculate_highest_lost_blob_index_test() { - assert_eq!(calculate_highest_lost_blob_index(0, 10, 90), 90); - assert_eq!(calculate_highest_lost_blob_index(15, 10, 90), 75); - assert_eq!(calculate_highest_lost_blob_index(90, 10, 90), 10); - assert_eq!(calculate_highest_lost_blob_index(90, 10, 50), 10); - assert_eq!(calculate_highest_lost_blob_index(90, 10, 99), 10); - assert_eq!(calculate_highest_lost_blob_index(90, 10, 101), 11); + pub fn calculate_max_repair_test() { + assert_eq!(calculate_max_repair(0, 10, 90, 0), 90); + assert_eq!(calculate_max_repair(15, 10, 90, 32), 90); + assert_eq!(calculate_max_repair(15, 10, 90, 0), 75); + assert_eq!(calculate_max_repair(90, 10, 90, 0), 10); + assert_eq!(calculate_max_repair(90, 10, 50, 0), 10); + assert_eq!(calculate_max_repair(90, 10, 99, 0), 10); + assert_eq!(calculate_max_repair(90, 10, 101, 0), 11); assert_eq!( - calculate_highest_lost_blob_index(90, 10, 95 + WINDOW_SIZE), + calculate_max_repair(90, 10, 95 + WINDOW_SIZE, 0), WINDOW_SIZE + 5 ); assert_eq!( - calculate_highest_lost_blob_index(90, 10, 99 + WINDOW_SIZE), + calculate_max_repair(90, 10, 99 + WINDOW_SIZE, 0), WINDOW_SIZE + 9 ); assert_eq!( - calculate_highest_lost_blob_index(90, 10, 100 + WINDOW_SIZE), + calculate_max_repair(90, 10, 100 + WINDOW_SIZE, 0), WINDOW_SIZE + 9 ); assert_eq!( - calculate_highest_lost_blob_index(90, 10, 120 + WINDOW_SIZE), + calculate_max_repair(90, 10, 120 + WINDOW_SIZE, 0), WINDOW_SIZE + 9 ); } diff --git a/src/window_service.rs b/src/window_service.rs index ea549cc0ec0e4b..5d13e986b0a960 100644 --- a/src/window_service.rs +++ b/src/window_service.rs @@ -272,6 +272,12 @@ pub fn window_service( } if received <= consumed { + trace!( + "{} we have everything received:{} consumed:{}", + id, + received, + consumed + ); continue; } @@ -280,6 +286,7 @@ pub fn window_service( trace!("{} !repair_backoff() times = {}", id, times); continue; } + trace!("{} let's repair! times = {}", id, times); let mut window = window.write().unwrap(); let reqs = window.repair(&crdt, &id, times, consumed, received); diff --git a/src/write_stage.rs b/src/write_stage.rs index 781b1fe17a6906..e423c94142c50d 100644 --- a/src/write_stage.rs +++ b/src/write_stage.rs @@ -300,8 +300,7 @@ mod tests { use crdt::{Crdt, Node}; use entry::Entry; use hash::Hash; - use ledger::{genesis, read_ledger}; - use recorder::Recorder; + use ledger::{genesis, next_entries_mut, read_ledger}; use service::Service; use signature::{Keypair, KeypairUtil, Pubkey}; use std::fs::remove_dir_all; @@ -384,20 +383,20 @@ mod tests { wcrdt.set_scheduled_leader(leader_rotation_interval, write_stage_info.my_id); } - let last_entry_hash = write_stage_info + let mut last_id = write_stage_info .ledger_tail .last() .expect("Ledger should not be empty") .id; + let mut num_hashes = 0; let genesis_entry_height = write_stage_info.ledger_tail.len() as u64; // Input enough entries to make exactly leader_rotation_interval entries, which will // trigger a check for leader rotation. Because the next scheduled leader // is ourselves, we won't exit - let mut recorder = Recorder::new(last_entry_hash); for _ in genesis_entry_height..leader_rotation_interval { - let new_entry = recorder.record(vec![]); + let new_entry = next_entries_mut(&mut last_id, &mut num_hashes, vec![]); write_stage_info.entry_sender.send(new_entry).unwrap(); } @@ -416,7 +415,7 @@ mod tests { // The write_stage will see that it's no longer the leader after // checking the schedule, and exit for _ in 0..leader_rotation_interval { - let new_entry = recorder.record(vec![]); + let new_entry = next_entries_mut(&mut last_id, &mut num_hashes, vec![]); write_stage_info.entry_sender.send(new_entry).unwrap(); } @@ -452,7 +451,7 @@ mod tests { } let crdt = Arc::new(RwLock::new(crdt)); - let entry = Entry::new(&Hash::default(), 0, vec![], false); + let entry = Entry::new(&Hash::default(), 0, vec![]); // A vector that is completely within a certain epoch should return that // entire vector diff --git a/tests/multinode.rs b/tests/multinode.rs index 283355f1b17866..28997540bd0ba0 100644 --- a/tests/multinode.rs +++ b/tests/multinode.rs @@ -111,7 +111,7 @@ fn make_tiny_test_entries(start_hash: Hash, num: usize) -> Vec { let mut id = start_hash; let mut num_hashes = 0; (0..num) - .map(|_| Entry::new_mut(&mut id, &mut num_hashes, vec![], false)) + .map(|_| Entry::new_mut(&mut id, &mut num_hashes, vec![])) .collect() } @@ -136,7 +136,7 @@ fn test_multi_node_ledger_window() -> result::Result<()> { // write a bunch more ledger into leader's ledger, this should populate his window // and force him to respond to repair from the ledger window { - let entries = make_tiny_test_entries(alice.last_id(), WINDOW_SIZE as usize * 2); + let entries = make_tiny_test_entries(alice.last_id(), WINDOW_SIZE as usize); let mut writer = LedgerWriter::open(&leader_ledger_path, false).unwrap(); writer.write_entries(entries).unwrap(); @@ -159,6 +159,7 @@ fn test_multi_node_ledger_window() -> result::Result<()> { // start up another validator from zero, converge and then check // balances let keypair = Keypair::new(); + let validator_pubkey = keypair.pubkey().clone(); let validator = Node::new_localhost_with_pubkey(keypair.pubkey()); let validator_data = validator.info.clone(); let validator = Fullnode::new( @@ -170,24 +171,32 @@ fn test_multi_node_ledger_window() -> result::Result<()> { None, ); + // Send validator some tokens to vote + let validator_balance = + send_tx_and_retry_get_balance(&leader_data, &alice, &validator_pubkey, 500, None).unwrap(); + info!("leader balance {}", validator_balance); + // contains the leader and new node info!("converging...."); let _servers = converge(&leader_data, 2); + info!("converged."); // another transaction with leader - let leader_balance = - send_tx_and_retry_get_balance(&leader_data, &alice, &bob_pubkey, 500, None).unwrap(); - info!("bob balance on leader {}", leader_balance); - assert_eq!(leader_balance, 500); - + let bob_balance = + send_tx_and_retry_get_balance(&leader_data, &alice, &bob_pubkey, 1, None).unwrap(); + info!("bob balance on leader {}", bob_balance); + let mut checks = 1; loop { let mut client = mk_client(&validator_data); let bal = client.poll_get_balance(&bob_pubkey); - info!("bob balance on validator {:?}...", bal); - if bal.unwrap_or(0) == leader_balance { + info!( + "bob balance on validator {:?} after {} checks...", + bal, checks + ); + if bal.unwrap_or(0) == bob_balance { break; } - sleep(Duration::from_millis(300)); + checks += 1; } info!("done!"); @@ -240,6 +249,7 @@ fn test_multi_node_validator_catchup_from_zero() -> result::Result<()> { let mut nodes = vec![server]; for _ in 0..N { let keypair = Keypair::new(); + let validator_pubkey = keypair.pubkey().clone(); let validator = Node::new_localhost_with_pubkey(keypair.pubkey()); let ledger_path = tmp_copy_ledger( &leader_ledger_path, @@ -247,6 +257,12 @@ fn test_multi_node_validator_catchup_from_zero() -> result::Result<()> { ); ledger_paths.push(ledger_path.clone()); + // Send each validator some tokens to vote + let validator_balance = + send_tx_and_retry_get_balance(&leader_data, &alice, &validator_pubkey, 500, None) + .unwrap(); + info!("validator balance {}", validator_balance); + let mut val = Fullnode::new( validator, &ledger_path, @@ -366,9 +382,17 @@ fn test_multi_node_basic() { let mut nodes = vec![server]; for _ in 0..N { let keypair = Keypair::new(); + let validator_pubkey = keypair.pubkey().clone(); let validator = Node::new_localhost_with_pubkey(keypair.pubkey()); let ledger_path = tmp_copy_ledger(&leader_ledger_path, "multi_node_basic"); ledger_paths.push(ledger_path.clone()); + + // Send each validator some tokens to vote + let validator_balance = + send_tx_and_retry_get_balance(&leader_data, &alice, &validator_pubkey, 500, None) + .unwrap(); + info!("validator balance {}", validator_balance); + let val = Fullnode::new( validator, &ledger_path, @@ -620,7 +644,7 @@ fn test_multi_node_dynamic_network() { .spawn(move || { info!("Spawned thread {}", n); let keypair = Keypair::new(); - //send some tokens to the new validator + //send some tokens to the new validators let bal = retry_send_tx_and_retry_get_balance( &leader_data, &alice_clone.read().unwrap(),