Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(sequencer)!: make parked mempool nonces replaceable #1763

Open
wants to merge 11 commits into
base: main
Choose a base branch
from
4 changes: 4 additions & 0 deletions crates/astria-core/src/protocol/abci.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ impl AbciErrorCode {
pub const NONCE_TAKEN: Self = Self(unsafe { NonZeroU32::new_unchecked(15) });
pub const ACCOUNT_SIZE_LIMIT: Self = Self(unsafe { NonZeroU32::new_unchecked(16) });
pub const PARKED_FULL: Self = Self(unsafe { NonZeroU32::new_unchecked(17) });
pub const NONCE_REPLACEMENT: Self = Self(unsafe { NonZeroU32::new_unchecked(18) });
}

impl AbciErrorCode {
Expand Down Expand Up @@ -64,6 +65,9 @@ impl AbciErrorCode {
"the account has reached the maximum number of parked transactions".into()
}
Self::PARKED_FULL => "the mempool is out of space for more parked transactions".into(),
Self::NONCE_REPLACEMENT => {
"the transaction was replaced by a different transaction with the same nonce".into()
Copy link
Member

@SuperFluffy SuperFluffy Oct 31, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What will the flow look like to actually see this abci error code?

If I understand the follow correctly:

  1. transaction is stuck in mempool
  2. new transaction is submitted to the mempool, replacing the old one
  3. check-tx for the new transaction should return a success code (that is abci code 0).

But NONCE_REPLACEMENT is an error code for the transaction that got replaced if I understand correctly. What would I need to do to observe this error code?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The next task I have is for general transaction observability (#1773), we don't currently have the infra to communicate it now to the users but we should

Copy link
Member

@SuperFluffy SuperFluffy Oct 31, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think one this is strictly related to the other. The AbciErrorCodes are intended to be returned in response to abci requests and put into the code field of the various responses (in this case CheckTx).

All the ABCI error codes can be observed right now (even if clunky). I don't understand when/how this new code would be observed.

}
Self(other) => {
format!("invalid error code {other}: should be unreachable (this is a bug)")
}
Expand Down
1 change: 1 addition & 0 deletions crates/astria-sequencer/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

- Bump penumbra dependencies [#1740](https://github.com/astriaorg/astria/pull/1740).
- Move fee event recording to transaction from block [#1718](https://github.com/astriaorg/astria/pull/1718).
- Nonce replacment for parked transactions is now allowed in the mempool [#1763](https://github.com/astriaorg/astria/pull/1763).

## [1.0.0-rc.2] - 2024-10-23

Expand Down
128 changes: 91 additions & 37 deletions crates/astria-sequencer/src/mempool/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ pub(crate) enum RemovalReason {
NonceStale,
LowerNonceInvalidated,
FailedPrepareProposal(String),
NonceReplacement([u8; 32]),
}

/// How long transactions are considered valid in the mempool.
Expand Down Expand Up @@ -143,11 +144,12 @@ impl<'a> ContainedTxLock<'a> {
///
/// The mempool exposes the pending transactions through `builder_queue()`, which returns a copy of
/// all pending transactions sorted in the order in which they should be executed. The sort order
/// is firstly by the difference between the transaction nonce and the account's current nonce
/// (ascending), and then by time first seen (ascending).
/// is first by the transaction group (derived from the contained actions), then by the difference
/// between the transaction nonce and the account's current nonce (ascending), and then by time
/// first seen (ascending).
///
/// The mempool implements the following policies:
/// 1. Nonce replacement is not allowed.
/// 1. Nonce replacement is only allowed for transactions in the parked queue.
/// 2. Accounts cannot have more than `MAX_PARKED_TXS_PER_ACCOUNT` transactions in their parked
/// queues.
/// 3. There is no account limit on pending transactions.
Expand All @@ -156,10 +158,7 @@ impl<'a> ContainedTxLock<'a> {
/// that account with a higher nonce will be removed as well. This is due to the fact that we do
/// not execute failing transactions, so a transaction 'failing' will mean that further account
/// nonces will not be able to execute either.
///
/// Future extensions to this mempool can include:
/// - maximum mempool size
/// - account balance aware pending queue
/// 6. Parked transactions are globally limited to a configured amount.
#[derive(Clone)]
pub(crate) struct Mempool {
pending: Arc<RwLock<PendingTransactions>>,
Expand Down Expand Up @@ -201,8 +200,8 @@ impl Mempool {
}
}

/// Inserts a transaction into the mempool and does not allow for transaction replacement.
/// Will return the reason for insertion failure if failure occurs.
/// Inserts a transaction into the mempool. Allows for nonce replacement of parked
/// transactions. Will return the reason for insertion failure if failure occurs.
#[instrument(skip_all)]
pub(crate) async fn insert(
&self,
Expand Down Expand Up @@ -230,26 +229,39 @@ impl Mempool {
current_account_nonce,
&current_account_balances,
) {
Ok(()) => {
Ok(hash) => {
// log current size of parked
self.metrics
.set_transactions_in_mempool_parked(parked.len());

// track in contained txs
self.lock_contained_txs().await.add(id);

// remove the replaced transaction
if let Some(replaced_hash) = hash {
self.lock_contained_txs().await.remove(replaced_hash);
self.comet_bft_removal_cache
.write()
.await
.add(replaced_hash, RemovalReason::NonceReplacement(id));
}
Ok(())
}
Err(err) => Err(err),
}
}
error @ Err(
InsertionError::AlreadyPresent
| InsertionError::NonceTooLow
| InsertionError::NonceTaken
| InsertionError::AccountSizeLimit
| InsertionError::ParkedSizeLimit,
) => error,
Ok(()) => {
Ok(_) => {
// check if first transaction in parked was replaced
if let Some(replaced_hash) = parked.remove_replaced(&timemarked_tx) {
// remove from contained txs
self.lock_contained_txs().await.remove(replaced_hash);
// add to removal cache
self.comet_bft_removal_cache
.write()
.await
.add(replaced_hash, RemovalReason::NonceReplacement(id));
}

// check parked for txs able to be promoted
let to_promote = parked.find_promotables(
timemarked_tx.address(),
Expand Down Expand Up @@ -285,6 +297,7 @@ impl Mempool {

Ok(())
}
Err(err) => Err(err),
}
}

Expand Down Expand Up @@ -552,25 +565,6 @@ mod tests {
"already present"
);

// try to replace nonce
let tx1_replacement = MockTxBuilder::new()
.nonce(1)
.chain_id("test-chain-id")
.build();
assert_eq!(
mempool
.insert(
tx1_replacement.clone(),
0,
account_balances.clone(),
tx_cost.clone(),
)
.await
.unwrap_err(),
InsertionError::NonceTaken,
"nonce replace not allowed"
);

// add too low nonce
let tx0 = MockTxBuilder::new().nonce(0).build();
assert_eq!(
Expand Down Expand Up @@ -1171,6 +1165,66 @@ mod tests {
assert!(!mempool.is_tracked(tx1.id().get()).await);
}

#[tokio::test]
async fn tx_tracked_nonce_replacement_straight_to_pending() {
// tests that a transaction that is replaced by a transaction that is able to be
// placed into pending is removed from the tracked set

let metrics = Box::leak(Box::new(Metrics::noop_metrics(&()).unwrap()));
let mempool = Mempool::new(metrics, 100);
let account_balances = mock_balances(10, 10);
let tx_cost_parked = mock_tx_cost(20, 10, 0);
let tx_cost_pending = mock_tx_cost(10, 10, 0);

let tx1_0 = MockTxBuilder::new().nonce(1).chain_id("test-0").build();
let tx1_1 = MockTxBuilder::new().nonce(1).chain_id("test-1").build();

// insert initial transaction into parked
mempool
.insert(tx1_0.clone(), 1, account_balances.clone(), tx_cost_parked)
.await
.unwrap();
// replace with different transaction which goes straight to pending
mempool
.insert(tx1_1.clone(), 1, account_balances.clone(), tx_cost_pending)
.await
.unwrap();

// check that the first transaction was removed and the replacement
// is tracked
assert!(!mempool.is_tracked(tx1_0.id().get()).await);
assert!(mempool.is_tracked(tx1_1.id().get()).await);
}

#[tokio::test]
async fn tx_tracked_nonce_replacement_modify_parked() {
// tests that transactions that are waiting in parked can be replaced
// by other transactions that will also go to parked
let metrics = Box::leak(Box::new(Metrics::noop_metrics(&()).unwrap()));
let mempool = Mempool::new(metrics, 100);
let account_balances = mock_balances(100, 100);
let tx_cost = mock_tx_cost(10, 10, 0);

let tx1_0 = MockTxBuilder::new().nonce(1).chain_id("test-0").build();
let tx1_1 = MockTxBuilder::new().nonce(1).chain_id("test-1").build();

// insert initial transaction into parked
mempool
.insert(tx1_0.clone(), 0, account_balances.clone(), tx_cost.clone())
.await
.unwrap();
// replace with different transaction
mempool
.insert(tx1_1.clone(), 0, account_balances.clone(), tx_cost.clone())
.await
.unwrap();

// check that the first transaction was removed and the replacement
// is tracked
assert!(!mempool.is_tracked(tx1_0.id().get()).await);
assert!(mempool.is_tracked(tx1_1.id().get()).await);
}

#[tokio::test]
async fn tx_tracked_reinsertion_ok() {
let metrics = Box::leak(Box::new(Metrics::noop_metrics(&()).unwrap()));
Expand Down
Loading
Loading