diff --git a/config/iroha_test_config.json b/config/iroha_test_config.json index d109999afc5..e9507503ed3 100644 --- a/config/iroha_test_config.json +++ b/config/iroha_test_config.json @@ -53,7 +53,6 @@ }, "QUEUE": { "MAX_TRANSACTIONS_IN_QUEUE": 65536, - "MAX_TRANSACTIONS_IN_SIGNATURE_BUFFER": 65536, "TRANSACTION_TIME_TO_LIVE_MS": 86400000, "FUTURE_THRESHOLD_MS": 1000 }, diff --git a/config/src/queue.rs b/config/src/queue.rs index a1d925b1ed9..6e5116a4e81 100644 --- a/config/src/queue.rs +++ b/config/src/queue.rs @@ -4,7 +4,6 @@ use iroha_config_base::derive::{Documented, Proxy}; use serde::{Deserialize, Serialize}; const DEFAULT_MAX_TRANSACTIONS_IN_QUEUE: u32 = 2_u32.pow(16); -const DEFAULT_MAX_TRANSACTIONS_IN_SIGNATURE_BUFFER: u32 = 2_u32.pow(16); // 24 hours const DEFAULT_TRANSACTION_TIME_TO_LIVE_MS: u64 = 24 * 60 * 60 * 1000; const DEFAULT_FUTURE_THRESHOLD_MS: u64 = 1000; @@ -16,8 +15,6 @@ const DEFAULT_FUTURE_THRESHOLD_MS: u64 = 1000; pub struct Configuration { /// The upper limit of the number of transactions waiting in the queue. pub max_transactions_in_queue: u32, - /// The upper limit of the number of transactions waiting for more signatures. - pub max_transactions_in_signature_buffer: u32, /// The transaction will be dropped after this time if it is still in the queue. pub transaction_time_to_live_ms: u64, /// The threshold to determine if a transaction has been tampered to have a future timestamp. @@ -28,9 +25,6 @@ impl Default for ConfigurationProxy { fn default() -> Self { Self { max_transactions_in_queue: Some(DEFAULT_MAX_TRANSACTIONS_IN_QUEUE), - max_transactions_in_signature_buffer: Some( - DEFAULT_MAX_TRANSACTIONS_IN_SIGNATURE_BUFFER, - ), transaction_time_to_live_ms: Some(DEFAULT_TRANSACTION_TIME_TO_LIVE_MS), future_threshold_ms: Some(DEFAULT_FUTURE_THRESHOLD_MS), } @@ -47,12 +41,11 @@ pub mod tests { pub fn arb_proxy() ( max_transactions_in_queue in prop::option::of(Just(DEFAULT_MAX_TRANSACTIONS_IN_QUEUE)), - max_transactions_in_signature_buffer in prop::option::of(Just(DEFAULT_MAX_TRANSACTIONS_IN_SIGNATURE_BUFFER)), transaction_time_to_live_ms in prop::option::of(Just(DEFAULT_TRANSACTION_TIME_TO_LIVE_MS)), future_threshold_ms in prop::option::of(Just(DEFAULT_FUTURE_THRESHOLD_MS)), ) -> ConfigurationProxy { - ConfigurationProxy { max_transactions_in_queue, max_transactions_in_signature_buffer, transaction_time_to_live_ms, future_threshold_ms } + ConfigurationProxy { max_transactions_in_queue, transaction_time_to_live_ms, future_threshold_ms } } } } diff --git a/configs/peer/config.json b/configs/peer/config.json index a9d0087b4c3..def54c93a87 100644 --- a/configs/peer/config.json +++ b/configs/peer/config.json @@ -34,7 +34,6 @@ }, "QUEUE": { "MAX_TRANSACTIONS_IN_QUEUE": 65536, - "MAX_TRANSACTIONS_IN_SIGNATURE_BUFFER": 65536, "TRANSACTION_TIME_TO_LIVE_MS": 86400000, "FUTURE_THRESHOLD_MS": 1000 }, diff --git a/core/src/queue.rs b/core/src/queue.rs index 67dea12dbda..1f00900f470 100644 --- a/core/src/queue.rs +++ b/core/src/queue.rs @@ -16,7 +16,7 @@ use iroha_config::queue::Configuration; use iroha_crypto::HashOf; use iroha_data_model::transaction::prelude::*; use iroha_logger::{debug, info, trace, warn}; -use iroha_primitives::{must_use::MustUse, riffle_iter::RiffleIter}; +use iroha_primitives::must_use::MustUse; use rand::seq::IteratorRandom; use thiserror::Error; @@ -27,12 +27,8 @@ use crate::{prelude::*, tx::CheckSignatureCondition as _}; /// Multiple producers, single consumer #[derive(Debug)] pub struct Queue { - /// The queue for transactions that passed signature check + /// The queue for transactions queue: ArrayQueue>, - /// The queue for transactions that didn't pass signature check and are waiting for additional signatures - /// - /// Second queue is needed to prevent situation when multisig transactions prevent ordinary transactions from being added into the queue - signature_buffer: ArrayQueue>, /// [`VersionedAcceptedTransaction`]s addressed by `Hash`. txs: DashMap, VersionedAcceptedTransaction>, /// The maximum number of transactions in the queue @@ -89,10 +85,8 @@ impl Queue { pub fn from_configuration(cfg: &Configuration) -> Self { Self { queue: ArrayQueue::new(cfg.max_transactions_in_queue as usize), - signature_buffer: ArrayQueue::new(cfg.max_transactions_in_signature_buffer as usize), txs: DashMap::new(), - max_txs: (cfg.max_transactions_in_queue + cfg.max_transactions_in_signature_buffer) - as usize, + max_txs: cfg.max_transactions_in_queue as usize, tx_time_to_live: Duration::from_millis(cfg.transaction_time_to_live_ms), future_threshold: Duration::from_millis(cfg.future_threshold_ms), } @@ -159,13 +153,10 @@ impl Queue { wsv: &WorldStateView, ) -> Result<(), Failure> { trace!(?tx, "Pushing to the queue"); - let signature_check_succeed = match self.check_tx(&tx, wsv) { - Err(err) => { - warn!("Failed to evaluate signature check"); - return Err(Failure { tx, err }); - } - Ok(MustUse(signature_check)) => signature_check, - }; + if let Err(err) = self.check_tx(&tx, wsv) { + warn!("Failed to evaluate signature check"); + return Err(Failure { tx, err }); + } // Get `txs_len` before entry to avoid deadlock let txs_len = self.txs.len(); @@ -196,14 +187,8 @@ impl Queue { // Insert entry first so that the `tx` popped from `queue` will always have a `(hash, tx)` record in `txs`. entry.insert(tx); - let queue_to_push = if signature_check_succeed { - &self.queue - } else { - info!("New multisignature transaction detected"); - &self.signature_buffer - }; - let res = queue_to_push.push(hash).map_err(|err_hash| { - warn!("Concrete sub-queue to push is full"); + let res = self.queue.push(hash).map_err(|err_hash| { + warn!("Queue is full"); let (_, err_tx) = self .txs .remove(&err_hash) @@ -213,47 +198,19 @@ impl Queue { err: Error::Full, } }); - trace!( - "Transaction queue length = {}, multisig transaction queue length = {}", - self.queue.len(), - self.signature_buffer.len() - ); + trace!("Transaction queue length = {}", self.queue.len(),); res } - /// Pop single transaction from the signature buffer. Record all visited and not removed transactions in `seen`. - fn pop_from_signature_buffer( - &self, - seen: &mut Vec>, - wsv: &WorldStateView, - expired_transactions: &mut Vec, - ) -> Option { - // NOTE: `SKIP_SIGNATURE_CHECK=false` because `signature_buffer` contains transaction which signature check can be either `true` or `false`. - self.pop_from::(&self.signature_buffer, seen, wsv, expired_transactions) - } - /// Pop single transaction from the queue. Record all visited and not removed transactions in `seen`. fn pop_from_queue( &self, seen: &mut Vec>, wsv: &WorldStateView, expired_transactions: &mut Vec, - ) -> Option { - // NOTE: `SKIP_SIGNATURE_CHECK=true` because `queue` contains only transactions for which signature check is `true`. - self.pop_from::(&self.queue, seen, wsv, expired_transactions) - } - - /// Pop single transaction either from the queue or waiting buffer - #[inline] - fn pop_from( - &self, - queue: &ArrayQueue>, - seen: &mut Vec>, - wsv: &WorldStateView, - expired_transactions: &mut Vec, ) -> Option { loop { - let Some(hash) = queue.pop() else { + let Some(hash) = self.queue.pop() else { return None; }; let entry = match self.txs.entry(hash) { @@ -280,8 +237,7 @@ impl Queue { continue; } seen.push(hash); - if SKIP_SIGNATURE_CHECK || *tx.check_signature_condition(wsv).unwrap_or(MustUse(false)) - { + if *tx.check_signature_condition(wsv).unwrap_or(MustUse(false)) { // Transactions are not removed from the queue until expired or committed return Some(entry.get().clone()); } @@ -322,46 +278,26 @@ impl Queue { } let mut seen_queue = Vec::new(); - let mut seen_waiting_buffer = Vec::new(); let mut expired_transactions_queue = Vec::new(); - let mut expired_transactions_waiting_buffer = Vec::new(); let txs_from_queue = core::iter::from_fn(|| { self.pop_from_queue(&mut seen_queue, wsv, &mut expired_transactions_queue) }); - let txs_from_waiting_buffer = core::iter::from_fn(|| { - self.pop_from_signature_buffer( - &mut seen_waiting_buffer, - wsv, - &mut expired_transactions_waiting_buffer, - ) - }); let transactions_hashes: HashSet> = transactions .iter() .map(VersionedAcceptedTransaction::hash) .collect(); let txs = txs_from_queue - .riffle(txs_from_waiting_buffer) .filter(|tx| !transactions_hashes.contains(&tx.hash())) .take(max_txs_in_block - transactions.len()); transactions.extend(txs); - [ - (seen_queue, &self.queue, expired_transactions_queue), - ( - seen_waiting_buffer, - &self.signature_buffer, - expired_transactions_waiting_buffer, - ), - ] - .into_iter() - .for_each(|(seen, queue, expired_txs)| { - seen.into_iter() - .try_for_each(|hash| queue.push(hash)) - .expect("Exceeded the number of transactions pending"); - expired_transactions.extend(expired_txs); - }) + seen_queue + .into_iter() + .try_for_each(|hash| self.queue.push(hash)) + .expect("Exceeded the number of transactions pending"); + expired_transactions.extend(expired_transactions_queue); } } @@ -479,166 +415,6 @@ mod tests { )); } - #[test] - fn push_tx_when_signature_buffer_is_full() { - let max_txs_in_waiting_buffer = 10; - - let alice_key_pairs = [KeyPair::generate().unwrap(), KeyPair::generate().unwrap()]; - let bob_key_pair = KeyPair::generate().unwrap(); - let kura = Kura::blank_kura_for_testing(); - let wsv = { - let domain_id = DomainId::from_str("wonderland").expect("Valid"); - let alice_id = AccountId::from_str("alice@wonderland").expect("Valid"); - let mut domain = Domain::new(domain_id.clone()).build(&alice_id); - let bob_id = AccountId::from_str("bob@wonderland").expect("Valid"); - let mut alice = Account::new( - alice_id.clone(), - alice_key_pairs.iter().map(KeyPair::public_key).cloned(), - ) - .build(&alice_id); - alice.signature_check_condition = SignatureCheckCondition( - ContainsAll::new( - EvaluatesTo::new_unchecked(ContextValue::new( - Name::from_str(TRANSACTION_SIGNATORIES_VALUE) - .expect("TRANSACTION_SIGNATORIES_VALUE should be valid."), - )), - EvaluatesTo::new_unchecked(ContextValue::new( - Name::from_str(ACCOUNT_SIGNATORIES_VALUE) - .expect("ACCOUNT_SIGNATORIES_VALUE should be valid."), - )), - ) - .into(), - ); - let bob = - Account::new(bob_id.clone(), [bob_key_pair.public_key().clone()]).build(&bob_id); - assert!(domain.add_account(alice).is_none()); - assert!(domain.add_account(bob).is_none()); - Arc::new(WorldStateView::new( - World::with([domain], PeersIds::new()), - kura.clone(), - )) - }; - - let queue = Queue::from_configuration(&Configuration { - transaction_time_to_live_ms: 100_000, - max_transactions_in_signature_buffer: max_txs_in_waiting_buffer, - ..ConfigurationProxy::default() - .build() - .expect("Default queue config should always build") - }); - - // Fill waiting buffer with multisig transactions - for _ in 0..max_txs_in_waiting_buffer { - queue - .push( - accepted_tx("alice@wonderland", 100_000, alice_key_pairs[0].clone()), - &wsv, - ) - .expect("Failed to push tx into queue"); - thread::sleep(Duration::from_millis(10)); - } - - // Check that signature buffer is full - assert!(matches!( - queue.push( - accepted_tx("alice@wonderland", 100_000, alice_key_pairs[0].clone()), - &wsv - ), - Err(Failure { - err: Error::Full, - .. - }) - )); - - // Check that ordinary transactions can still be pushed into the queue - assert!(queue - .push( - accepted_tx("bob@wonderland", 100_000, bob_key_pair.clone()), - &wsv, - ) - .is_ok()) - } - - #[test] - fn push_multisig_tx_when_queue_is_full() { - let max_txs_in_queue = 10; - - let alice_key_pairs = [KeyPair::generate().unwrap(), KeyPair::generate().unwrap()]; - let bob_key_pair = KeyPair::generate().unwrap(); - let kura = Kura::blank_kura_for_testing(); - let wsv = { - let domain_id = DomainId::from_str("wonderland").expect("Valid"); - let alice_id = AccountId::from_str("alice@wonderland").expect("Valid"); - let mut domain = Domain::new(domain_id.clone()).build(&alice_id); - let bob_id = AccountId::from_str("bob@wonderland").expect("Valid"); - let mut alice = Account::new( - alice_id.clone(), - alice_key_pairs.iter().map(KeyPair::public_key).cloned(), - ) - .build(&alice_id); - alice.signature_check_condition = SignatureCheckCondition( - ContainsAll::new( - EvaluatesTo::new_unchecked(ContextValue::new( - Name::from_str(TRANSACTION_SIGNATORIES_VALUE) - .expect("TRANSACTION_SIGNATORIES_VALUE should be valid."), - )), - EvaluatesTo::new_unchecked(ContextValue::new( - Name::from_str(ACCOUNT_SIGNATORIES_VALUE) - .expect("ACCOUNT_SIGNATORIES_VALUE should be valid."), - )), - ) - .into(), - ); - let bob = - Account::new(bob_id.clone(), [bob_key_pair.public_key().clone()]).build(&bob_id); - assert!(domain.add_account(alice).is_none()); - assert!(domain.add_account(bob).is_none()); - Arc::new(WorldStateView::new( - World::with([domain], PeersIds::new()), - kura.clone(), - )) - }; - - let queue = Queue::from_configuration(&Configuration { - transaction_time_to_live_ms: 100_000, - max_transactions_in_queue: max_txs_in_queue, - ..ConfigurationProxy::default() - .build() - .expect("Default queue config should always build") - }); - - // Fill queue with ordinary transactions - for _ in 0..max_txs_in_queue { - queue - .push( - accepted_tx("bob@wonderland", 100_000, bob_key_pair.clone()), - &wsv, - ) - .expect("Failed to push tx into queue"); - thread::sleep(Duration::from_millis(10)); - } - - // Check that queue is full - assert!(matches!( - queue.push( - accepted_tx("bob@wonderland", 100_000, bob_key_pair.clone()), - &wsv - ), - Err(Failure { - err: Error::Full, - .. - }) - )); - - // Check that multisig transactions can still be pushed into the queue - assert!(queue - .push( - accepted_tx("alice@wonderland", 100_000, alice_key_pairs[0].clone()), - &wsv, - ) - .is_ok()) - } - #[test] fn push_tx_signature_condition_failure() { let max_txs_in_queue = 10; diff --git a/docs/source/references/config.md b/docs/source/references/config.md index c130363d5ba..3e89c32a5c6 100644 --- a/docs/source/references/config.md +++ b/docs/source/references/config.md @@ -65,7 +65,6 @@ The following is the default configuration used by Iroha. }, "QUEUE": { "MAX_TRANSACTIONS_IN_QUEUE": 65536, - "MAX_TRANSACTIONS_IN_SIGNATURE_BUFFER": 65536, "TRANSACTION_TIME_TO_LIVE_MS": 86400000, "FUTURE_THRESHOLD_MS": 1000 }, @@ -394,7 +393,6 @@ Has type `Option`[^1]. Can be configured via environm { "FUTURE_THRESHOLD_MS": 1000, "MAX_TRANSACTIONS_IN_QUEUE": 65536, - "MAX_TRANSACTIONS_IN_SIGNATURE_BUFFER": 65536, "TRANSACTION_TIME_TO_LIVE_MS": 86400000 } ``` @@ -419,16 +417,6 @@ Has type `Option`[^1]. Can be configured via environment variable `QUEUE_MA 65536 ``` -### `queue.max_transactions_in_signature_buffer` - -The upper limit of the number of transactions waiting for more signatures. - -Has type `Option`[^1]. Can be configured via environment variable `QUEUE_MAX_TRANSACTIONS_IN_SIGNATURE_BUFFER` - -```json -65536 -``` - ### `queue.transaction_time_to_live_ms` The transaction will be dropped after this time if it is still in the queue.