diff --git a/src/Nethermind/Nethermind.Consensus/Producers/TxPoolTxSource.cs b/src/Nethermind/Nethermind.Consensus/Producers/TxPoolTxSource.cs index ac61befc040..783b5be0db8 100644 --- a/src/Nethermind/Nethermind.Consensus/Producers/TxPoolTxSource.cs +++ b/src/Nethermind/Nethermind.Consensus/Producers/TxPoolTxSource.cs @@ -23,7 +23,7 @@ namespace Nethermind.Consensus.Producers { - public class TxPoolTxSource : ITxSource + public class TxPoolTxSource : ITxSource, ITxSourceNotifier { private readonly ITxPool _transactionPool; private readonly ITransactionComparerProvider _transactionComparerProvider; @@ -33,6 +33,8 @@ public class TxPoolTxSource : ITxSource protected readonly ILogger _logger; private readonly IEip4844Config _eip4844Config; + public event EventHandler NewPendingTransactions; + public TxPoolTxSource( ITxPool? transactionPool, ISpecProvider? specProvider, @@ -49,8 +51,15 @@ public TxPoolTxSource( _logger = logManager?.GetClassLogger() ?? throw new ArgumentNullException(nameof(logManager)); _stateReader = stateReader; _eip4844Config = eip4844ConstantsProvider ?? ConstantEip4844Config.Instance; + _transactionPool.NewPending += TransactionPool_NewPending; } + private void TransactionPool_NewPending(object? sender, TxEventArgs e) + => NewPendingTransactions?.Invoke(sender, e); + + public bool IsInterestingTx(Transaction tx, BlockHeader parent) + => _txFilterPipeline.Execute(tx, parent); + public IEnumerable GetTransactions(BlockHeader parent, long gasLimit, PayloadAttributes? payloadAttributes = null, CancellationToken token = default) { IReleaseSpec spec = _specProvider.GetSpec(parent); @@ -165,9 +174,15 @@ private void SortMultiSendersByNonce(ArrayPoolList selectedTxs, Blo if (token.IsCancellationRequested) return; // Retrieve the sender's account to get the first expected nonce - AccountStruct account = default; - _stateReader?.TryGetAccount(parent.StateRoot, group.Key, out account); - UInt256 expectedNonce = account.Nonce; + UInt256 expectedNonce; + if (_stateReader is not null && _stateReader.TryGetAccount(parent.StateRoot, group.Key, out AccountStruct account)) + { + expectedNonce = account.Nonce; + } + else + { + expectedNonce = order[0].Nonce; + } bool removeTx = false; // Iterate over the transactions to validate nonce sequence and remove invalid ones diff --git a/src/Nethermind/Nethermind.Consensus/Transactions/ITxSource.cs b/src/Nethermind/Nethermind.Consensus/Transactions/ITxSource.cs index 8ece1c070f1..054a1f08097 100644 --- a/src/Nethermind/Nethermind.Consensus/Transactions/ITxSource.cs +++ b/src/Nethermind/Nethermind.Consensus/Transactions/ITxSource.cs @@ -1,10 +1,12 @@ // SPDX-FileCopyrightText: 2022 Demerzel Solutions Limited // SPDX-License-Identifier: LGPL-3.0-only +using System; using System.Collections.Generic; using System.Threading; using Nethermind.Consensus.Producers; using Nethermind.Core; +using Nethermind.TxPool; namespace Nethermind.Consensus.Transactions { @@ -12,6 +14,9 @@ public interface ITxSource { IEnumerable GetTransactions(BlockHeader parent, long gasLimit, PayloadAttributes? payloadAttributes = null, CancellationToken token = default); } - - + public interface ITxSourceNotifier + { + event EventHandler NewPendingTransactions; + bool IsInterestingTx(Transaction tx, BlockHeader parent); + } } diff --git a/src/Nethermind/Nethermind.Merge.Plugin/BlockProduction/PayloadPreparationService.cs b/src/Nethermind/Nethermind.Merge.Plugin/BlockProduction/PayloadPreparationService.cs index 06e34925304..35f438bce71 100644 --- a/src/Nethermind/Nethermind.Merge.Plugin/BlockProduction/PayloadPreparationService.cs +++ b/src/Nethermind/Nethermind.Merge.Plugin/BlockProduction/PayloadPreparationService.cs @@ -42,6 +42,7 @@ public class PayloadPreparationService : IPayloadPreparationService private readonly TimeSpan _cleanupOldPayloadDelay; private readonly TimeSpan _timePerSlot; private CancellationTokenSource _tokenSource = new(); + TaskCompletionSource _newPendingTxWaiter = new TaskCompletionSource(); private bool _isDisposed; // first ExecutionPayloadV1 is empty (without txs), second one is the ideal one @@ -66,16 +67,32 @@ public PayloadPreparationService( timer.Elapsed += CleanupOldPayloads; timer.Start(); + if (blockProducer.SupportsNotifications) + { + blockProducer.NewPendingTransactions += BlockProducer_NewPendingTransactions; + } + _logger = logManager.GetClassLogger(); } + private BlockHeader? _currentParent; + + private void BlockProducer_NewPendingTransactions(object? sender, TxPool.TxEventArgs e) + { + // Ignore tx if gas is too low to run + if (_currentParent is null || _blockProducer.IsInterestingTx(e.Transaction, _currentParent)) + { + _newPendingTxWaiter.TrySetResult(); + } + } + public string StartPreparingPayload(BlockHeader parentHeader, PayloadAttributes payloadAttributes) { string payloadId = payloadAttributes.GetPayloadId(parentHeader); if (!_isDisposed && !_payloadStorage.ContainsKey(payloadId)) { CancellationTokenSource tokenSource = CancelOngoingImprovements(); - + _currentParent = parentHeader; Block emptyBlock = ProduceEmptyBlock(payloadId, parentHeader, payloadAttributes, tokenSource.Token); ImproveBlock(payloadId, parentHeader, payloadAttributes, emptyBlock, DateTimeOffset.UtcNow, default, _tokenSource.Token); } @@ -162,6 +179,13 @@ private IBlockImprovementContext CreateBlockImprovementContext(string payloadId, // Wait for the adjusted time or until cancellation is requested await Task.Delay(adjustedWaitTime, token); + if (_blockProducer.SupportsNotifications) + { + await Task.WhenAny(_newPendingTxWaiter.Task, token.AsTask()); + // Rearm the txPool listener + _newPendingTxWaiter = new TaskCompletionSource(); + } + // Proceed if not cancelled and the context is still valid if (!token.IsCancellationRequested && !context.Disposed) // if GetPayload wasn't called for this item or it wasn't cleared { diff --git a/src/Nethermind/Nethermind.Merge.Plugin/BlockProduction/PostMergeBlockProducer.cs b/src/Nethermind/Nethermind.Merge.Plugin/BlockProduction/PostMergeBlockProducer.cs index f68a7e6edf8..ebeea649307 100644 --- a/src/Nethermind/Nethermind.Merge.Plugin/BlockProduction/PostMergeBlockProducer.cs +++ b/src/Nethermind/Nethermind.Merge.Plugin/BlockProduction/PostMergeBlockProducer.cs @@ -18,8 +18,10 @@ namespace Nethermind.Merge.Plugin.BlockProduction { - public class PostMergeBlockProducer : BlockProducerBase + public class PostMergeBlockProducer : BlockProducerBase, ITxSourceNotifier { + private readonly ITxSourceNotifier? _notifier; + public PostMergeBlockProducer( ITxSource txSource, IBlockchainProcessor processor, @@ -45,8 +47,26 @@ public PostMergeBlockProducer( miningConfig ) { + if (txSource is ITxSourceNotifier notifier) + { + _notifier = notifier; + notifier.NewPendingTransactions += Notifier_NewPending; + SupportsNotifications = true; + } } + public bool SupportsNotifications { get; } + + private void Notifier_NewPending(object? sender, TxPool.TxEventArgs e) + { + NewPendingTransactions?.Invoke(sender, e); + } + + public bool IsInterestingTx(Transaction tx, BlockHeader parent) + => _notifier?.IsInterestingTx(tx, parent) ?? true; + + public event EventHandler? NewPendingTransactions; + public virtual Block PrepareEmptyBlock(BlockHeader parent, PayloadAttributes? payloadAttributes = null, CancellationToken token = default) { BlockHeader blockHeader = PrepareBlockHeader(parent, payloadAttributes);