diff --git a/src/bank.rs b/src/bank.rs index 26fd7d6466ff04..7358b69a1dadfb 100644 --- a/src/bank.rs +++ b/src/bank.rs @@ -87,6 +87,11 @@ pub struct Bank { /// The number of transactions the bank has processed without error since the /// start of the ledger. transaction_count: AtomicUsize, + + /// The number of Entries the bank has processed without error since start + /// of the ledger, i.e. poor-man's network synchronization + /// TODO: upgrade to U64 when stable? + entry_count: AtomicUsize, } impl Bank { @@ -100,6 +105,7 @@ impl Bank { time_sources: RwLock::new(HashSet::new()), last_time: RwLock::new(Utc.timestamp(0, 0)), transaction_count: AtomicUsize::new(0), + entry_count: AtomicUsize::new(0), }; bank.apply_payment(deposit, &mut bank.balances.write().unwrap()); bank @@ -296,11 +302,13 @@ impl Bank { } /// Process an ordered list of entries. - pub fn process_entries(&self, entries: I) -> Result<()> + pub fn process_entries(&self, entries: I) -> Result where I: IntoIterator, { for entry in entries { + self.entry_count.fetch_add(1, Ordering::Relaxed); + if !entry.transactions.is_empty() { for result in self.process_transactions(entry.transactions) { result?; @@ -308,7 +316,7 @@ impl Bank { } self.register_entry_id(&entry.id); } - Ok(()) + Ok(self.entry_count()) } /// Process a Witness Signature. Any payment plans waiting on this signature @@ -422,6 +430,9 @@ impl Bank { pub fn transaction_count(&self) -> usize { self.transaction_count.load(Ordering::Relaxed) } + pub fn entry_count(&self) -> usize { + self.entry_count.load(Ordering::Relaxed) + } } #[cfg(test)] diff --git a/src/bin/fullnode.rs b/src/bin/fullnode.rs index b1acf1bdd4f6f6..b4f4dbe351724f 100644 --- a/src/bin/fullnode.rs +++ b/src/bin/fullnode.rs @@ -102,7 +102,8 @@ fn main() { bank.register_entry_id(&entry1.id); eprintln!("processing entries..."); - bank.process_entries(entries).expect("process_entries"); + let num_entries = bank.process_entries(entries).expect("process_entries"); + eprintln!("processed {} entries...", num_entries); eprintln!("creating networking stack..."); diff --git a/src/streamer.rs b/src/streamer.rs index 072fc39d903a0d..9c4e6cb0eb962a 100644 --- a/src/streamer.rs +++ b/src/streamer.rs @@ -395,13 +395,14 @@ pub fn window( r: BlobReceiver, s: BlobSender, retransmit: BlobSender, + entry_count: usize, ) -> JoinHandle<()> { Builder::new() .name("solana-window".to_string()) .spawn(move || { - let mut consumed = 0; - let mut received = 0; - let mut last = 0; + let mut consumed = entry_count; + let mut received = entry_count; + let mut last = entry_count; let mut times = 0; loop { if exit.load(Ordering::Relaxed) { @@ -816,6 +817,7 @@ mod test { r_reader, s_window, s_retransmit, + 0, ); let (s_responder, r_responder) = channel(); let t_responder = responder( diff --git a/src/tvu.rs b/src/tvu.rs index f338d0b1e7bf87..014beb06ec4009 100644 --- a/src/tvu.rs +++ b/src/tvu.rs @@ -86,6 +86,7 @@ impl Tvu { exit.clone(), blob_recycler.clone(), fetch_stage.blob_receiver, + bank.entry_count(), ); let replicate_stage = diff --git a/src/window_stage.rs b/src/window_stage.rs index 36264a88d26411..4cc7eceb6853fc 100644 --- a/src/window_stage.rs +++ b/src/window_stage.rs @@ -22,6 +22,7 @@ impl WindowStage { exit: Arc, blob_recycler: packet::BlobRecycler, fetch_stage_receiver: streamer::BlobReceiver, + entry_count: usize, ) -> Self { let (retransmit_sender, retransmit_receiver) = channel(); @@ -41,6 +42,7 @@ impl WindowStage { fetch_stage_receiver, blob_sender, retransmit_sender, + entry_count, ); let thread_hdls = vec![t_retransmit, t_window];