diff --git a/core/src/queue.rs b/core/src/queue.rs index efae479d60e..85310578b63 100644 --- a/core/src/queue.rs +++ b/core/src/queue.rs @@ -8,6 +8,7 @@ )] use core::time::Duration; +use std::collections::HashSet; use crossbeam_queue::ArrayQueue; use dashmap::{mapref::entry::Entry, DashMap}; @@ -29,12 +30,9 @@ pub struct Queue { queue: ArrayQueue>, /// [`VersionedAcceptedTransaction`]s addressed by `Hash`. txs: DashMap, VersionedAcceptedTransaction>, - /// Length of [`DashMap`]. - /// - /// [`DashMap`] right now just iterates over itself and calculates its length like this: - /// self.txs.iter().len() + /// The maximum number of transactions in the block pub txs_in_block: usize, - /// The maximum number of transactions + /// The maximum number of transactions in the queue max_txs: usize, /// Length of time after which transactions are dropped. pub tx_time_to_live: Duration, @@ -110,6 +108,7 @@ impl Queue { ) } + // FIXME: Currently it is impossible to distinguish if signature check condition failed or if there is just not enough signatures (#2595). fn check_tx( &self, tx: &VersionedAcceptedTransaction, @@ -169,17 +168,16 @@ impl Queue { Entry::Vacant(entry) => entry, }; + // Reason for such insertion order is to avoid situation + // when poped from the `queue` hash does not yet has corresponding (hash, tx) record in `txs` entry.insert(tx); - - if let Err(err_hash) = self.queue.push(hash) { + self.queue.push(hash).map_err(|err_hash| { let (_, err_tx) = self .txs .remove(&err_hash) .expect("Inserted just before match"); - Err((err_tx, Error::Full)) - } else { - Ok(()) - } + (err_tx, Error::Full) + }) } } @@ -209,6 +207,7 @@ impl Queue { continue; } + // Transactions are not removed from the queue until expired or committed seen.push(hash); if *entry .get() @@ -220,39 +219,7 @@ impl Queue { } } - /// Pop a single transaction. - /// - /// Unlike [`Self::pop`], unsigned transactions are not recorded. - #[allow( - clippy::expect_used, - clippy::unwrap_in_result, - clippy::cognitive_complexity - )] - pub fn pop_without_seen(&self, wsv: &WorldStateView) -> Option { - loop { - let hash = self.queue.pop()?; - let entry = match self.txs.entry(hash) { - Entry::Occupied(entry) => entry, - // As practice shows this code is not `unreachable!()`. - // When transactions are submitted quickly it can be reached. - Entry::Vacant(_) => continue, - }; - if self.check_tx(entry.get(), wsv).is_err() { - entry.remove_entry(); - continue; - } - - if *entry - .get() - .check_signature_condition(wsv) - .expect("Checked in `check_tx` just above") - { - return Some(entry.get().clone()); - } - } - } - - /// Return the number of transactions in the queue + /// Return the number of transactions in the queue. pub fn tx_len(&self) -> usize { self.txs.len() } @@ -260,24 +227,43 @@ impl Queue { /// Gets transactions till they fill whole block or till the end of queue. /// /// BEWARE: Shouldn't be called in parallel with itself. - #[allow(clippy::missing_panics_doc, clippy::unwrap_in_result)] - pub fn get_transactions_for_block( + #[cfg(test)] + fn collect_transactions_for_block( &self, wsv: &WorldStateView, ) -> Vec { + let mut transactions = Vec::with_capacity(self.txs_in_block); + self.get_transactions_for_block(wsv, &mut transactions); + transactions + } + + /// Put transactions into provided vector until they fill the whole block or there are no more transactions in the queue. + /// + /// BEWARE: Shouldn't be called in parallel with itself. + pub fn get_transactions_for_block( + &self, + wsv: &WorldStateView, + transactions: &mut Vec, + ) { + if transactions.len() >= self.txs_in_block { + return; + } + let mut seen = Vec::new(); - let out = std::iter::repeat_with(|| self.pop(&mut seen, wsv)) - .take_while(Option::is_some) - .map(Option::unwrap) - .take(self.txs_in_block) - .collect::>(); + let transactions_hashes: HashSet> = transactions + .iter() + .map(VersionedAcceptedTransaction::hash) + .collect(); + let out = std::iter::from_fn(|| self.pop(&mut seen, wsv)) + .filter(|tx| !transactions_hashes.contains(&tx.hash())) + .take(self.txs_in_block - transactions.len()); + transactions.extend(out); #[allow(clippy::expect_used)] seen.into_iter() .try_for_each(|hash| self.queue.push(hash)) .expect("As we never exceed the number of transactions pending"); - out } } @@ -288,7 +274,11 @@ mod tests { use std::{str::FromStr, sync::Arc, thread, time::Duration}; use iroha_config::{base::proxy::Builder, queue::ConfigurationProxy}; - use iroha_data_model::prelude::*; + use iroha_data_model::{ + account::{ACCOUNT_SIGNATORIES_VALUE, TRANSACTION_SIGNATORIES_VALUE}, + prelude::*, + }; + use iroha_primitives::must_use::MustUse; use rand::Rng as _; use super::*; @@ -424,18 +414,44 @@ mod tests { } #[test] + #[ignore = "Multisignature is not working for now. See #2595"] fn push_multisignature_tx() { let key_pairs = [KeyPair::generate().unwrap(), KeyPair::generate().unwrap()]; let kura = Kura::blank_kura_for_testing(); - let wsv = Arc::new(WorldStateView::new( - world_with_test_domains( - key_pairs - .iter() - .map(|key_pair| key_pair.public_key()) - .cloned(), - ), - kura.clone(), - )); + let wsv = { + let domain_id = DomainId::from_str("wonderland").expect("Valid"); + let mut domain = Domain::new(domain_id.clone()).build(); + let account_id = AccountId::from_str("alice@wonderland").expect("Valid"); + let mut account = Account::new( + account_id, + key_pairs.iter().map(KeyPair::public_key).cloned(), + ) + .build(); + account.set_signature_check_condition(SignatureCheckCondition( + ContainsAll::new( + EvaluatesTo::new_unchecked( + ContextValue::new( + Name::from_str(TRANSACTION_SIGNATORIES_VALUE) + .expect("TRANSACTION_SIGNATORIES_VALUE should be valid."), + ) + .into(), + ), + EvaluatesTo::new_unchecked( + ContextValue::new( + Name::from_str(ACCOUNT_SIGNATORIES_VALUE) + .expect("ACCOUNT_SIGNATORIES_VALUE should be valid."), + ) + .into(), + ), + ) + .into(), + )); + assert!(domain.add_account(account).is_none()); + Arc::new(WorldStateView::new( + World::with([domain], PeersIds::new()), + kura.clone(), + )) + }; let queue = Queue::from_configuration(&Configuration { maximum_transactions_in_block: 2, @@ -450,31 +466,57 @@ mod tests { Vec::::new().into(), 100_000, ); + let tx_limits = TransactionLimits { + max_instruction_number: 4096, + max_wasm_size_bytes: 0, + }; + let fully_signed_tx = { + let mut signed_tx = tx + .clone() + .sign((&key_pairs[0]).clone()) + .expect("Failed to sign."); + for key_pair in &key_pairs[1..] { + signed_tx = signed_tx.sign(key_pair.clone()).expect("Failed to sign"); + } + VersionedAcceptedTransaction::from_transaction(signed_tx, &tx_limits) + .expect("Failed to accept Transaction.") + }; + // Check that fully signed transaction pass signature check + assert!(matches!( + fully_signed_tx.check_signature_condition(&wsv), + Ok(MustUse(true)) + )); + let get_tx = |key_pair| { - let tx_limits = TransactionLimits { - max_instruction_number: 4096, - max_wasm_size_bytes: 0, - }; VersionedAcceptedTransaction::from_transaction( tx.clone().sign(key_pair).expect("Failed to sign."), &tx_limits, ) .expect("Failed to accept Transaction.") }; - for key_pair in key_pairs { - queue.push(get_tx(key_pair), &wsv).unwrap(); + let partially_signed_tx = get_tx(key_pair); + // Check that non of partially signed pass signature check + assert!(matches!( + partially_signed_tx.check_signature_condition(&wsv), + Ok(MustUse(false)) + )); + queue + .push(partially_signed_tx, &wsv) + .expect("Should be possible to put partially signed transaction into the queue"); } - assert_eq!(queue.queue.len(), 1); - let signature_count = queue - .txs - .get(&queue.queue.pop().unwrap()) - .unwrap() - .as_v1() - .signatures - .len(); - assert_eq!(signature_count, 2); + // Check that transactions combined into one instead of duplicating + assert_eq!(queue.tx_len(), 1); + + let mut available = queue.collect_transactions_for_block(&wsv); + assert_eq!(available.len(), 1); + let tx_from_queue = available.pop().expect("Checked that have one transactions"); + // Check that transaction from queue pass signature check + assert!(matches!( + tx_from_queue.check_signature_condition(&wsv), + Ok(MustUse(true)) + )); } #[test] @@ -504,7 +546,7 @@ mod tests { thread::sleep(Duration::from_millis(10)); } - let available = queue.get_transactions_for_block(&wsv); + let available = queue.collect_transactions_for_block(&wsv); assert_eq!(available.len(), max_block_tx as usize); } @@ -554,7 +596,7 @@ mod tests { }); queue.push(tx.clone(), &wsv).unwrap(); wsv.transactions.insert(tx.hash()); - assert_eq!(queue.get_transactions_for_block(&wsv).len(), 0); + assert_eq!(queue.collect_transactions_for_block(&wsv).len(), 0); assert_eq!(queue.txs.len(), 0); } @@ -592,13 +634,13 @@ mod tests { ) .expect("Failed to push tx into queue"); std::thread::sleep(Duration::from_millis(101)); - assert_eq!(queue.get_transactions_for_block(&wsv).len(), 1); + assert_eq!(queue.collect_transactions_for_block(&wsv).len(), 1); queue .push(accepted_tx("alice@wonderland", 300, alice_key), &wsv) .expect("Failed to push tx into queue"); std::thread::sleep(Duration::from_millis(210)); - assert_eq!(queue.get_transactions_for_block(&wsv).len(), 0); + assert_eq!(queue.collect_transactions_for_block(&wsv).len(), 0); } // Queue should only drop transactions which are already committed or ttl expired. @@ -624,12 +666,12 @@ mod tests { .expect("Failed to push tx into queue"); let a = queue - .get_transactions_for_block(&wsv) + .collect_transactions_for_block(&wsv) .into_iter() .map(|tx| tx.hash()) .collect::>(); let b = queue - .get_transactions_for_block(&wsv) + .collect_transactions_for_block(&wsv) .into_iter() .map(|tx| tx.hash()) .collect::>(); @@ -683,7 +725,7 @@ mod tests { thread::spawn(move || { while start_time.elapsed() < run_for { - for tx in queue_arc_clone.get_transactions_for_block(&wsv_clone) { + for tx in queue_arc_clone.collect_transactions_for_block(&wsv_clone) { wsv_clone.transactions.insert(tx.hash()); } // Simulate random small delays @@ -695,14 +737,8 @@ mod tests { push_txs_handle.join().unwrap(); get_txs_handle.join().unwrap(); - // Last update for queue to drop invalid txs. - let _unused = queue.get_transactions_for_block(&wsv); - // Validate the queue state. - let array_queue: Vec<_> = core::iter::repeat_with(|| queue.queue.pop()) - .take_while(Option::is_some) - .map(Option::unwrap) - .collect(); + let array_queue: Vec<_> = core::iter::from_fn(|| queue.queue.pop()).collect(); assert_eq!(array_queue.len(), queue.txs.len()); for tx in array_queue { diff --git a/core/src/sumeragi/main_loop.rs b/core/src/sumeragi/main_loop.rs index 1dae47db25c..938d08f2b45 100644 --- a/core/src/sumeragi/main_loop.rs +++ b/core/src/sumeragi/main_loop.rs @@ -643,12 +643,9 @@ pub fn run( .retain(|tx| !tx.is_expired(sumeragi.queue.tx_time_to_live)); // Pull in new transactions into the cache. - while state.transaction_cache.len() < sumeragi.queue.txs_in_block { - match sumeragi.queue.pop_without_seen(&state.wsv) { - Some(tx) => state.transaction_cache.push(tx), - None => break, - } - } + sumeragi + .queue + .get_transactions_for_block(&state.wsv, &mut state.transaction_cache); }; gossip_transactions( diff --git a/core/src/sumeragi/mod.rs b/core/src/sumeragi/mod.rs index d8cbc8d134b..3d6fd69e490 100644 --- a/core/src/sumeragi/mod.rs +++ b/core/src/sumeragi/mod.rs @@ -211,6 +211,15 @@ impl Sumeragi { .wrap_err("Failed to compose domains")? .set(domain.accounts().len() as u64); } + + metrics_guard + .view_changes + .set(wsv_guard.latest_block_view_change_index()); + + metrics_guard + .queue_size + .set(self.internal.queue.tx_len() as u64); + Ok(()) } diff --git a/telemetry/src/metrics.rs b/telemetry/src/metrics.rs index fa4ddca37f4..1732c61a6b8 100644 --- a/telemetry/src/metrics.rs +++ b/telemetry/src/metrics.rs @@ -37,6 +37,8 @@ pub struct Status { pub uptime: Uptime, /// Number of view changes in the current round pub view_changes: u64, + /// Number of the transactions in the queue + pub queue_size: u64, } impl> From<&T> for Status { @@ -49,6 +51,7 @@ impl> From<&T> for Status { txs_rejected: val.txs.with_label_values(&["rejected"]).get(), uptime: Uptime(Duration::from_millis(val.uptime_since_genesis_ms.get())), view_changes: val.view_changes.get(), + queue_size: val.queue_size.get(), } } } @@ -76,6 +79,8 @@ pub struct Metrics { pub isi_times: HistogramVec, /// Number of view changes in the current round pub view_changes: GenericGauge, + /// Number of transactions in the queue + pub queue_size: GenericGauge, // Internal use only. registry: Registry, } @@ -124,6 +129,8 @@ impl Default for Metrics { "Number of view changes in the current round", ) .expect("Infallible"); + let queue_size = GenericGauge::new("queue_size", "Number of the transactions in the queue") + .expect("Infallible"); let registry = Registry::new(); macro_rules! register { @@ -146,7 +153,8 @@ impl Default for Metrics { accounts, isi, isi_times, - view_changes + view_changes, + queue_size ); Self { @@ -161,6 +169,7 @@ impl Default for Metrics { isi, isi_times, view_changes, + queue_size, } } }