diff --git a/config/iroha_test_config.json b/config/iroha_test_config.json index e9507503ed3..c3e37e741cd 100644 --- a/config/iroha_test_config.json +++ b/config/iroha_test_config.json @@ -53,6 +53,7 @@ }, "QUEUE": { "MAX_TRANSACTIONS_IN_QUEUE": 65536, + "MAX_TRANSACTIONS_IN_QUEUE_PER_USER": 65536, "TRANSACTION_TIME_TO_LIVE_MS": 86400000, "FUTURE_THRESHOLD_MS": 1000 }, diff --git a/config/src/queue.rs b/config/src/queue.rs index 6e5116a4e81..2b10dcd1730 100644 --- a/config/src/queue.rs +++ b/config/src/queue.rs @@ -4,6 +4,7 @@ use iroha_config_base::derive::{Documented, Proxy}; use serde::{Deserialize, Serialize}; const DEFAULT_MAX_TRANSACTIONS_IN_QUEUE: u32 = 2_u32.pow(16); +const DEFAULT_MAX_TRANSACTIONS_IN_QUEUE_PER_USER: u32 = 2_u32.pow(16); // 24 hours const DEFAULT_TRANSACTION_TIME_TO_LIVE_MS: u64 = 24 * 60 * 60 * 1000; const DEFAULT_FUTURE_THRESHOLD_MS: u64 = 1000; @@ -15,6 +16,9 @@ const DEFAULT_FUTURE_THRESHOLD_MS: u64 = 1000; pub struct Configuration { /// The upper limit of the number of transactions waiting in the queue. pub max_transactions_in_queue: u32, + /// The upper limit of the number of transactions waiting in the queue for single user. + /// Use this option to apply throttling. + pub max_transactions_in_queue_per_user: u32, /// The transaction will be dropped after this time if it is still in the queue. pub transaction_time_to_live_ms: u64, /// The threshold to determine if a transaction has been tampered to have a future timestamp. @@ -25,6 +29,7 @@ impl Default for ConfigurationProxy { fn default() -> Self { Self { max_transactions_in_queue: Some(DEFAULT_MAX_TRANSACTIONS_IN_QUEUE), + max_transactions_in_queue_per_user: Some(DEFAULT_MAX_TRANSACTIONS_IN_QUEUE_PER_USER), transaction_time_to_live_ms: Some(DEFAULT_TRANSACTION_TIME_TO_LIVE_MS), future_threshold_ms: Some(DEFAULT_FUTURE_THRESHOLD_MS), } @@ -41,11 +46,12 @@ pub mod tests { pub fn arb_proxy() ( max_transactions_in_queue in prop::option::of(Just(DEFAULT_MAX_TRANSACTIONS_IN_QUEUE)), + max_transactions_in_queue_per_user in prop::option::of(Just(DEFAULT_MAX_TRANSACTIONS_IN_QUEUE_PER_USER)), transaction_time_to_live_ms in prop::option::of(Just(DEFAULT_TRANSACTION_TIME_TO_LIVE_MS)), future_threshold_ms in prop::option::of(Just(DEFAULT_FUTURE_THRESHOLD_MS)), ) -> ConfigurationProxy { - ConfigurationProxy { max_transactions_in_queue, transaction_time_to_live_ms, future_threshold_ms } + ConfigurationProxy { max_transactions_in_queue, max_transactions_in_queue_per_user, transaction_time_to_live_ms, future_threshold_ms } } } } diff --git a/configs/peer/config.json b/configs/peer/config.json index def54c93a87..1737604b1b3 100644 --- a/configs/peer/config.json +++ b/configs/peer/config.json @@ -34,6 +34,7 @@ }, "QUEUE": { "MAX_TRANSACTIONS_IN_QUEUE": 65536, + "MAX_TRANSACTIONS_IN_QUEUE_PER_USER": 65536, "TRANSACTION_TIME_TO_LIVE_MS": 86400000, "FUTURE_THRESHOLD_MS": 1000 }, diff --git a/configs/peer/validator.wasm b/configs/peer/validator.wasm index 72feaff4287..fe05da72d67 100644 Binary files a/configs/peer/validator.wasm and b/configs/peer/validator.wasm differ diff --git a/core/src/queue.rs b/core/src/queue.rs index 1f00900f470..b7aef3a95d6 100644 --- a/core/src/queue.rs +++ b/core/src/queue.rs @@ -14,7 +14,7 @@ use dashmap::{mapref::entry::Entry, DashMap}; use eyre::{Report, Result}; use iroha_config::queue::Configuration; use iroha_crypto::HashOf; -use iroha_data_model::transaction::prelude::*; +use iroha_data_model::{account::AccountId, transaction::prelude::*}; use iroha_logger::{debug, info, trace, warn}; use iroha_primitives::must_use::MustUse; use rand::seq::IteratorRandom; @@ -29,10 +29,14 @@ use crate::{prelude::*, tx::CheckSignatureCondition as _}; pub struct Queue { /// The queue for transactions queue: ArrayQueue>, - /// [`VersionedAcceptedTransaction`]s addressed by `Hash`. + /// [`VersionedAcceptedTransaction`]s addressed by `Hash` txs: DashMap, VersionedAcceptedTransaction>, + /// Amount of transactions per user in the queue + txs_per_user: DashMap, /// The maximum number of transactions in the queue max_txs: usize, + /// The maximum number of transactions in the queue per user. Used to apply throttling + max_txs_per_user: usize, /// Length of time after which transactions are dropped. pub tx_time_to_live: Duration, /// A point in time that is considered `Future` we cannot use @@ -61,6 +65,9 @@ pub enum Error { /// Transaction is already in blockchain #[error("Transaction is already applied")] InBlockchain, + /// User reached maximum number of transactions in the queue + #[error("User reached maximum number of trnasctions in the queue")] + MaximumTransactionsPerUser, /// Signature condition check failed #[error("Failure during signature condition execution, tx hash: {tx_hash}, reason: {reason}")] SignatureCondition { @@ -86,7 +93,9 @@ impl Queue { Self { queue: ArrayQueue::new(cfg.max_transactions_in_queue as usize), txs: DashMap::new(), + txs_per_user: DashMap::new(), max_txs: cfg.max_transactions_in_queue as usize, + max_txs_per_user: cfg.max_transactions_in_queue_per_user as usize, tx_time_to_live: Duration::from_millis(cfg.transaction_time_to_live_ms), future_threshold: Duration::from_millis(cfg.future_threshold_ms), } @@ -185,21 +194,26 @@ impl Queue { }); } + if let Err(err) = self.check_and_increase_per_user_tx_count(&tx.payload().account_id) { + return Err(Failure { tx, err }); + } + // Insert entry first so that the `tx` popped from `queue` will always have a `(hash, tx)` record in `txs`. entry.insert(tx); - let res = self.queue.push(hash).map_err(|err_hash| { + self.queue.push(hash).map_err(|err_hash| { warn!("Queue is full"); let (_, err_tx) = self .txs .remove(&err_hash) .expect("Inserted just before match"); + self.decrease_per_user_tx_count(&err_tx.payload().account_id); Failure { tx: err_tx, err: Error::Full, } - }); + })?; trace!("Transaction queue length = {}", self.queue.len(),); - res + Ok(()) } /// Pop single transaction from the queue. Record all visited and not removed transactions in `seen`. @@ -227,12 +241,14 @@ impl Queue { let tx = entry.get(); if tx.is_in_blockchain(wsv) { debug!("Transaction is already in blockchain"); - entry.remove_entry(); + let (_, tx) = entry.remove_entry(); + self.decrease_per_user_tx_count(&tx.payload().account_id); continue; } if tx.is_expired(self.tx_time_to_live) { debug!("Transaction is expired"); let (_, tx) = entry.remove_entry(); + self.decrease_per_user_tx_count(&tx.payload().account_id); expired_transactions.push(tx); continue; } @@ -299,6 +315,44 @@ impl Queue { .expect("Exceeded the number of transactions pending"); expired_transactions.extend(expired_transactions_queue); } + + /// Check that user adhere for maximum transaction per user limit and increase his transaction count. + /// + /// Check for limit and increment is done in one go in order to prevent situation + /// when multiple transactions from the same user are trying to increase counter at the same time. + fn check_and_increase_per_user_tx_count(&self, account_id: &AccountId) -> Result<(), Error> { + match self.txs_per_user.entry(account_id.clone()) { + Entry::Vacant(vacant) => { + vacant.insert(1); + } + Entry::Occupied(mut occupied) => { + let txs = *occupied.get(); + if txs >= self.max_txs_per_user { + warn!( + max_txs_per_user = self.max_txs_per_user, + %account_id, + "Account reached maximum allowed number of transactions in the queue per user" + ); + return Err(Error::MaximumTransactionsPerUser); + } + *occupied.get_mut() += 1; + } + } + + Ok(()) + } + + fn decrease_per_user_tx_count(&self, account_id: &AccountId) { + let Entry::Occupied(mut occupied) = self.txs_per_user + .entry(account_id.clone()) else { panic!("Call to decrease always should be paired with increase count. This is a bug.") }; + + let count = occupied.get_mut(); + if *count > 1 { + *count -= 1; + } else { + occupied.remove_entry(); + } + } } #[cfg(test)] @@ -763,6 +817,10 @@ mod tests { Err(Failure { err: Error::Full, .. }) => (), + Err(Failure { + err: Error::MaximumTransactionsPerUser, + .. + }) => (), Err(Failure { err, .. }) => panic!("{err}"), } } @@ -830,4 +888,96 @@ mod tests { )); assert_eq!(queue.txs.len(), 1); } + + #[test] + fn queue_throttling() { + let alice_key_pair = KeyPair::generate().unwrap(); + let bob_key_pair = KeyPair::generate().unwrap(); + let kura = Kura::blank_kura_for_testing(); + let world = { + let domain_id = DomainId::from_str("wonderland").expect("Valid"); + let alice_account_id = AccountId::from_str("alice@wonderland").expect("Valid"); + let bob_account_id = AccountId::from_str("bob@wonderland").expect("Valid"); + let mut domain = Domain::new(domain_id).build(&alice_account_id); + let alice_account = Account::new( + alice_account_id.clone(), + [alice_key_pair.public_key().clone()], + ) + .build(&alice_account_id); + let bob_account = + Account::new(bob_account_id.clone(), [bob_key_pair.public_key().clone()]) + .build(&bob_account_id); + assert!(domain.add_account(alice_account).is_none()); + assert!(domain.add_account(bob_account).is_none()); + World::with([domain], PeersIds::new()) + }; + let mut wsv = WorldStateView::new(world, kura.clone()); + + let queue = Queue::from_configuration(&Configuration { + transaction_time_to_live_ms: 100_000, + max_transactions_in_queue: 100, + max_transactions_in_queue_per_user: 1, + ..ConfigurationProxy::default() + .build() + .expect("Default queue config should always build") + }); + + // First push by Alice should be fine + queue + .push( + accepted_tx("alice@wonderland", 100_000, alice_key_pair.clone()), + &wsv, + ) + .expect("Failed to push tx into queue"); + + // Second push by Alice excide limit and will be rejected + let result = queue.push( + accepted_tx("alice@wonderland", 100_000, alice_key_pair.clone()), + &wsv, + ); + assert!( + matches!( + result, + Err(Failure { + tx: _, + err: Error::MaximumTransactionsPerUser + }), + ), + "Failed to match: {:?}", + result, + ); + + // First push by Bob should be fine despite previous Alice error + queue + .push( + accepted_tx("bob@wonderland", 100_000, bob_key_pair.clone()), + &wsv, + ) + .expect("Failed to push tx into queue"); + + let transactions = queue.collect_transactions_for_block(&wsv, 10); + assert_eq!(transactions.len(), 2); + for transaction in transactions { + // Put transaction hashes into wsv as if they were in the blockchain + wsv.transactions.insert(transaction.hash()); + } + // Cleanup transactions + let transactions = queue.collect_transactions_for_block(&wsv, 10); + assert!(transactions.is_empty()); + + // After cleanup Alice and Bob pushes should work fine + queue + .push( + accepted_tx("alice@wonderland", 100_000, alice_key_pair.clone()), + &wsv, + ) + .expect("Failed to push tx into queue"); + + queue + .push( + accepted_tx("bob@wonderland", 100_000, bob_key_pair.clone()), + &wsv, + ) + .expect("Failed to push tx into queue"); + } } diff --git a/docs/source/references/config.md b/docs/source/references/config.md index 3e89c32a5c6..4b781ccfbde 100644 --- a/docs/source/references/config.md +++ b/docs/source/references/config.md @@ -65,6 +65,7 @@ The following is the default configuration used by Iroha. }, "QUEUE": { "MAX_TRANSACTIONS_IN_QUEUE": 65536, + "MAX_TRANSACTIONS_IN_QUEUE_PER_USER": 65536, "TRANSACTION_TIME_TO_LIVE_MS": 86400000, "FUTURE_THRESHOLD_MS": 1000 }, @@ -393,6 +394,7 @@ Has type `Option`[^1]. Can be configured via environm { "FUTURE_THRESHOLD_MS": 1000, "MAX_TRANSACTIONS_IN_QUEUE": 65536, + "MAX_TRANSACTIONS_IN_QUEUE_PER_USER": 65536, "TRANSACTION_TIME_TO_LIVE_MS": 86400000 } ``` @@ -417,6 +419,16 @@ Has type `Option`[^1]. Can be configured via environment variable `QUEUE_MA 65536 ``` +### `queue.max_transactions_in_queue_per_user` + +The upper limit of the number of transactions waiting in the queue for single user. + +Has type `Option`[^1]. Can be configured via environment variable `QUEUE_MAX_TRANSACTIONS_IN_QUEUE_PER_USER` + +```json +65536 +``` + ### `queue.transaction_time_to_live_ms` The transaction will be dropped after this time if it is still in the queue.