Skip to content

Commit

Permalink
[feature] hyperledger-iroha#3088: Introduce queue throttling
Browse files Browse the repository at this point in the history
Signed-off-by: Shanin Roman <[email protected]>
  • Loading branch information
Erigara committed Jun 5, 2023
1 parent 6a51c06 commit 5607cb4
Show file tree
Hide file tree
Showing 6 changed files with 177 additions and 7 deletions.
1 change: 1 addition & 0 deletions config/iroha_test_config.json
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
},
"QUEUE": {
"MAX_TRANSACTIONS_IN_QUEUE": 65536,
"MAX_TRANSACTIONS_IN_QUEUE_PER_USER": 65536,
"TRANSACTION_TIME_TO_LIVE_MS": 86400000,
"FUTURE_THRESHOLD_MS": 1000
},
Expand Down
8 changes: 7 additions & 1 deletion config/src/queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use iroha_config_base::derive::{Documented, Proxy};
use serde::{Deserialize, Serialize};

const DEFAULT_MAX_TRANSACTIONS_IN_QUEUE: u32 = 2_u32.pow(16);
const DEFAULT_MAX_TRANSACTIONS_IN_QUEUE_PER_USER: u32 = 2_u32.pow(16);
// 24 hours
const DEFAULT_TRANSACTION_TIME_TO_LIVE_MS: u64 = 24 * 60 * 60 * 1000;
const DEFAULT_FUTURE_THRESHOLD_MS: u64 = 1000;
Expand All @@ -15,6 +16,9 @@ const DEFAULT_FUTURE_THRESHOLD_MS: u64 = 1000;
pub struct Configuration {
/// The upper limit of the number of transactions waiting in the queue.
pub max_transactions_in_queue: u32,
/// The upper limit of the number of transactions waiting in the queue for single user.
/// Use this option to apply throttling.
pub max_transactions_in_queue_per_user: u32,
/// The transaction will be dropped after this time if it is still in the queue.
pub transaction_time_to_live_ms: u64,
/// The threshold to determine if a transaction has been tampered to have a future timestamp.
Expand All @@ -25,6 +29,7 @@ impl Default for ConfigurationProxy {
fn default() -> Self {
Self {
max_transactions_in_queue: Some(DEFAULT_MAX_TRANSACTIONS_IN_QUEUE),
max_transactions_in_queue_per_user: Some(DEFAULT_MAX_TRANSACTIONS_IN_QUEUE_PER_USER),
transaction_time_to_live_ms: Some(DEFAULT_TRANSACTION_TIME_TO_LIVE_MS),
future_threshold_ms: Some(DEFAULT_FUTURE_THRESHOLD_MS),
}
Expand All @@ -41,11 +46,12 @@ pub mod tests {
pub fn arb_proxy()
(
max_transactions_in_queue in prop::option::of(Just(DEFAULT_MAX_TRANSACTIONS_IN_QUEUE)),
max_transactions_in_queue_per_user in prop::option::of(Just(DEFAULT_MAX_TRANSACTIONS_IN_QUEUE_PER_USER)),
transaction_time_to_live_ms in prop::option::of(Just(DEFAULT_TRANSACTION_TIME_TO_LIVE_MS)),
future_threshold_ms in prop::option::of(Just(DEFAULT_FUTURE_THRESHOLD_MS)),
)
-> ConfigurationProxy {
ConfigurationProxy { max_transactions_in_queue, transaction_time_to_live_ms, future_threshold_ms }
ConfigurationProxy { max_transactions_in_queue, max_transactions_in_queue_per_user, transaction_time_to_live_ms, future_threshold_ms }
}
}
}
1 change: 1 addition & 0 deletions configs/peer/config.json
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
},
"QUEUE": {
"MAX_TRANSACTIONS_IN_QUEUE": 65536,
"MAX_TRANSACTIONS_IN_QUEUE_PER_USER": 65536,
"TRANSACTION_TIME_TO_LIVE_MS": 86400000,
"FUTURE_THRESHOLD_MS": 1000
},
Expand Down
Binary file modified configs/peer/validator.wasm
Binary file not shown.
162 changes: 156 additions & 6 deletions core/src/queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use dashmap::{mapref::entry::Entry, DashMap};
use eyre::{Report, Result};
use iroha_config::queue::Configuration;
use iroha_crypto::HashOf;
use iroha_data_model::transaction::prelude::*;
use iroha_data_model::{account::AccountId, transaction::prelude::*};
use iroha_logger::{debug, info, trace, warn};
use iroha_primitives::must_use::MustUse;
use rand::seq::IteratorRandom;
Expand All @@ -29,10 +29,14 @@ use crate::{prelude::*, tx::CheckSignatureCondition as _};
pub struct Queue {
/// The queue for transactions
queue: ArrayQueue<HashOf<VersionedSignedTransaction>>,
/// [`VersionedAcceptedTransaction`]s addressed by `Hash`.
/// [`VersionedAcceptedTransaction`]s addressed by `Hash`
txs: DashMap<HashOf<VersionedSignedTransaction>, VersionedAcceptedTransaction>,
/// Amount of transactions per user in the queue
txs_per_user: DashMap<AccountId, usize>,
/// The maximum number of transactions in the queue
max_txs: usize,
/// The maximum number of transactions in the queue per user. Used to apply throttling
max_txs_per_user: usize,
/// Length of time after which transactions are dropped.
pub tx_time_to_live: Duration,
/// A point in time that is considered `Future` we cannot use
Expand Down Expand Up @@ -61,6 +65,9 @@ pub enum Error {
/// Transaction is already in blockchain
#[error("Transaction is already applied")]
InBlockchain,
/// User reached maximum number of transactions in the queue
#[error("User reached maximum number of trnasctions in the queue")]
MaximumTransactionsPerUser,
/// Signature condition check failed
#[error("Failure during signature condition execution, tx hash: {tx_hash}, reason: {reason}")]
SignatureCondition {
Expand All @@ -86,7 +93,9 @@ impl Queue {
Self {
queue: ArrayQueue::new(cfg.max_transactions_in_queue as usize),
txs: DashMap::new(),
txs_per_user: DashMap::new(),
max_txs: cfg.max_transactions_in_queue as usize,
max_txs_per_user: cfg.max_transactions_in_queue_per_user as usize,
tx_time_to_live: Duration::from_millis(cfg.transaction_time_to_live_ms),
future_threshold: Duration::from_millis(cfg.future_threshold_ms),
}
Expand Down Expand Up @@ -185,21 +194,26 @@ impl Queue {
});
}

if let Err(err) = self.check_and_increase_per_user_tx_count(&tx.payload().account_id) {
return Err(Failure { tx, err });
}

// Insert entry first so that the `tx` popped from `queue` will always have a `(hash, tx)` record in `txs`.
entry.insert(tx);
let res = self.queue.push(hash).map_err(|err_hash| {
self.queue.push(hash).map_err(|err_hash| {
warn!("Queue is full");
let (_, err_tx) = self
.txs
.remove(&err_hash)
.expect("Inserted just before match");
self.decrease_per_user_tx_count(&err_tx.payload().account_id);
Failure {
tx: err_tx,
err: Error::Full,
}
});
})?;
trace!("Transaction queue length = {}", self.queue.len(),);
res
Ok(())
}

/// Pop single transaction from the queue. Record all visited and not removed transactions in `seen`.
Expand Down Expand Up @@ -227,12 +241,14 @@ impl Queue {
let tx = entry.get();
if tx.is_in_blockchain(wsv) {
debug!("Transaction is already in blockchain");
entry.remove_entry();
let (_, tx) = entry.remove_entry();
self.decrease_per_user_tx_count(&tx.payload().account_id);
continue;
}
if tx.is_expired(self.tx_time_to_live) {
debug!("Transaction is expired");
let (_, tx) = entry.remove_entry();
self.decrease_per_user_tx_count(&tx.payload().account_id);
expired_transactions.push(tx);
continue;
}
Expand Down Expand Up @@ -299,6 +315,44 @@ impl Queue {
.expect("Exceeded the number of transactions pending");
expired_transactions.extend(expired_transactions_queue);
}

/// Check that user adhere for maximum transaction per user limit and increase his transaction count.
///
/// Check for limit and increment is done in one go in order to prevent situation
/// when multiple transactions from the same user are trying to increase counter at the same time.
fn check_and_increase_per_user_tx_count(&self, account_id: &AccountId) -> Result<(), Error> {
match self.txs_per_user.entry(account_id.clone()) {
Entry::Vacant(vacant) => {
vacant.insert(1);
}
Entry::Occupied(mut occupied) => {
let txs = *occupied.get();
if txs >= self.max_txs_per_user {
warn!(
max_txs_per_user = self.max_txs_per_user,
%account_id,
"Account reached maximum allowed number of transactions in the queue per user"
);
return Err(Error::MaximumTransactionsPerUser);
}
*occupied.get_mut() += 1;
}
}

Ok(())
}

fn decrease_per_user_tx_count(&self, account_id: &AccountId) {
let Entry::Occupied(mut occupied) = self.txs_per_user
.entry(account_id.clone()) else { panic!("Call to decrease always should be paired with increase count. This is a bug.") };

let count = occupied.get_mut();
if *count > 1 {
*count -= 1;
} else {
occupied.remove_entry();
}
}
}

#[cfg(test)]
Expand Down Expand Up @@ -763,6 +817,10 @@ mod tests {
Err(Failure {
err: Error::Full, ..
}) => (),
Err(Failure {
err: Error::MaximumTransactionsPerUser,
..
}) => (),
Err(Failure { err, .. }) => panic!("{err}"),
}
}
Expand Down Expand Up @@ -830,4 +888,96 @@ mod tests {
));
assert_eq!(queue.txs.len(), 1);
}

#[test]
fn queue_throttling() {
let alice_key_pair = KeyPair::generate().unwrap();
let bob_key_pair = KeyPair::generate().unwrap();
let kura = Kura::blank_kura_for_testing();
let world = {
let domain_id = DomainId::from_str("wonderland").expect("Valid");
let alice_account_id = AccountId::from_str("alice@wonderland").expect("Valid");
let bob_account_id = AccountId::from_str("bob@wonderland").expect("Valid");
let mut domain = Domain::new(domain_id).build(&alice_account_id);
let alice_account = Account::new(
alice_account_id.clone(),
[alice_key_pair.public_key().clone()],
)
.build(&alice_account_id);
let bob_account =
Account::new(bob_account_id.clone(), [bob_key_pair.public_key().clone()])
.build(&bob_account_id);
assert!(domain.add_account(alice_account).is_none());
assert!(domain.add_account(bob_account).is_none());
World::with([domain], PeersIds::new())
};
let mut wsv = WorldStateView::new(world, kura.clone());

let queue = Queue::from_configuration(&Configuration {
transaction_time_to_live_ms: 100_000,
max_transactions_in_queue: 100,
max_transactions_in_queue_per_user: 1,
..ConfigurationProxy::default()
.build()
.expect("Default queue config should always build")
});

// First push by Alice should be fine
queue
.push(
accepted_tx("alice@wonderland", 100_000, alice_key_pair.clone()),
&wsv,
)
.expect("Failed to push tx into queue");

// Second push by Alice excide limit and will be rejected
let result = queue.push(
accepted_tx("alice@wonderland", 100_000, alice_key_pair.clone()),
&wsv,
);
assert!(
matches!(
result,
Err(Failure {
tx: _,
err: Error::MaximumTransactionsPerUser
}),
),
"Failed to match: {:?}",
result,
);

// First push by Bob should be fine despite previous Alice error
queue
.push(
accepted_tx("bob@wonderland", 100_000, bob_key_pair.clone()),
&wsv,
)
.expect("Failed to push tx into queue");

let transactions = queue.collect_transactions_for_block(&wsv, 10);
assert_eq!(transactions.len(), 2);
for transaction in transactions {
// Put transaction hashes into wsv as if they were in the blockchain
wsv.transactions.insert(transaction.hash());
}
// Cleanup transactions
let transactions = queue.collect_transactions_for_block(&wsv, 10);
assert!(transactions.is_empty());

// After cleanup Alice and Bob pushes should work fine
queue
.push(
accepted_tx("alice@wonderland", 100_000, alice_key_pair.clone()),
&wsv,
)
.expect("Failed to push tx into queue");

queue
.push(
accepted_tx("bob@wonderland", 100_000, bob_key_pair.clone()),
&wsv,
)
.expect("Failed to push tx into queue");
}
}
12 changes: 12 additions & 0 deletions docs/source/references/config.md
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ The following is the default configuration used by Iroha.
},
"QUEUE": {
"MAX_TRANSACTIONS_IN_QUEUE": 65536,
"MAX_TRANSACTIONS_IN_QUEUE_PER_USER": 65536,
"TRANSACTION_TIME_TO_LIVE_MS": 86400000,
"FUTURE_THRESHOLD_MS": 1000
},
Expand Down Expand Up @@ -393,6 +394,7 @@ Has type `Option<queue::ConfigurationProxy>`[^1]. Can be configured via environm
{
"FUTURE_THRESHOLD_MS": 1000,
"MAX_TRANSACTIONS_IN_QUEUE": 65536,
"MAX_TRANSACTIONS_IN_QUEUE_PER_USER": 65536,
"TRANSACTION_TIME_TO_LIVE_MS": 86400000
}
```
Expand All @@ -417,6 +419,16 @@ Has type `Option<u32>`[^1]. Can be configured via environment variable `QUEUE_MA
65536
```

### `queue.max_transactions_in_queue_per_user`

The upper limit of the number of transactions waiting in the queue for single user.

Has type `Option<u32>`[^1]. Can be configured via environment variable `QUEUE_MAX_TRANSACTIONS_IN_QUEUE_PER_USER`

```json
65536
```

### `queue.transaction_time_to_live_ms`

The transaction will be dropped after this time if it is still in the queue.
Expand Down

0 comments on commit 5607cb4

Please sign in to comment.