Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
benaadams committed Oct 22, 2024
1 parent 767bb42 commit fb7935e
Show file tree
Hide file tree
Showing 4 changed files with 72 additions and 8 deletions.
23 changes: 19 additions & 4 deletions src/Nethermind/Nethermind.Consensus/Producers/TxPoolTxSource.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@

namespace Nethermind.Consensus.Producers
{
public class TxPoolTxSource : ITxSource
public class TxPoolTxSource : ITxSource, ITxSourceNotifier
{
private readonly ITxPool _transactionPool;
private readonly ITransactionComparerProvider _transactionComparerProvider;
Expand All @@ -33,6 +33,8 @@ public class TxPoolTxSource : ITxSource
protected readonly ILogger _logger;
private readonly IEip4844Config _eip4844Config;

public event EventHandler<TxEventArgs> NewPendingTransactions;

public TxPoolTxSource(
ITxPool? transactionPool,
ISpecProvider? specProvider,
Expand All @@ -49,8 +51,15 @@ public TxPoolTxSource(
_logger = logManager?.GetClassLogger<TxPoolTxSource>() ?? throw new ArgumentNullException(nameof(logManager));
_stateReader = stateReader;
_eip4844Config = eip4844ConstantsProvider ?? ConstantEip4844Config.Instance;
_transactionPool.NewPending += TransactionPool_NewPending;
}

private void TransactionPool_NewPending(object? sender, TxEventArgs e)
=> NewPendingTransactions?.Invoke(sender, e);

public bool IsInterestingTx(Transaction tx, BlockHeader parent)
=> _txFilterPipeline.Execute(tx, parent);

public IEnumerable<Transaction> GetTransactions(BlockHeader parent, long gasLimit, PayloadAttributes? payloadAttributes = null, CancellationToken token = default)
{
IReleaseSpec spec = _specProvider.GetSpec(parent);
Expand Down Expand Up @@ -165,9 +174,15 @@ private void SortMultiSendersByNonce(ArrayPoolList<Transaction> selectedTxs, Blo
if (token.IsCancellationRequested) return;

// Retrieve the sender's account to get the first expected nonce
AccountStruct account = default;
_stateReader?.TryGetAccount(parent.StateRoot, group.Key, out account);
UInt256 expectedNonce = account.Nonce;
UInt256 expectedNonce;
if (_stateReader is not null && _stateReader.TryGetAccount(parent.StateRoot, group.Key, out AccountStruct account))
{
expectedNonce = account.Nonce;
}
else
{
expectedNonce = order[0].Nonce;
}

bool removeTx = false;
// Iterate over the transactions to validate nonce sequence and remove invalid ones
Expand Down
9 changes: 7 additions & 2 deletions src/Nethermind/Nethermind.Consensus/Transactions/ITxSource.cs
Original file line number Diff line number Diff line change
@@ -1,17 +1,22 @@
// SPDX-FileCopyrightText: 2022 Demerzel Solutions Limited
// SPDX-License-Identifier: LGPL-3.0-only

using System;
using System.Collections.Generic;
using System.Threading;
using Nethermind.Consensus.Producers;
using Nethermind.Core;
using Nethermind.TxPool;

namespace Nethermind.Consensus.Transactions
{
public interface ITxSource
{
IEnumerable<Transaction> GetTransactions(BlockHeader parent, long gasLimit, PayloadAttributes? payloadAttributes = null, CancellationToken token = default);
}


public interface ITxSourceNotifier
{
event EventHandler<TxEventArgs> NewPendingTransactions;
bool IsInterestingTx(Transaction tx, BlockHeader parent);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ public class PayloadPreparationService : IPayloadPreparationService
private readonly TimeSpan _cleanupOldPayloadDelay;
private readonly TimeSpan _timePerSlot;
private CancellationTokenSource _tokenSource = new();
TaskCompletionSource _newPendingTxWaiter = new TaskCompletionSource();
private bool _isDisposed;

// first ExecutionPayloadV1 is empty (without txs), second one is the ideal one
Expand All @@ -66,16 +67,32 @@ public PayloadPreparationService(
timer.Elapsed += CleanupOldPayloads;
timer.Start();

if (blockProducer.SupportsNotifications)
{
blockProducer.NewPendingTransactions += BlockProducer_NewPendingTransactions;
}

_logger = logManager.GetClassLogger();
}

private BlockHeader? _currentParent;

private void BlockProducer_NewPendingTransactions(object? sender, TxPool.TxEventArgs e)
{
// Ignore tx if gas is too low to run
if (_currentParent is null || _blockProducer.IsInterestingTx(e.Transaction, _currentParent))
{
_newPendingTxWaiter.TrySetResult();
}
}

public string StartPreparingPayload(BlockHeader parentHeader, PayloadAttributes payloadAttributes)
{
string payloadId = payloadAttributes.GetPayloadId(parentHeader);
if (!_isDisposed && !_payloadStorage.ContainsKey(payloadId))
{
CancellationTokenSource tokenSource = CancelOngoingImprovements();

_currentParent = parentHeader;
Block emptyBlock = ProduceEmptyBlock(payloadId, parentHeader, payloadAttributes, tokenSource.Token);
ImproveBlock(payloadId, parentHeader, payloadAttributes, emptyBlock, DateTimeOffset.UtcNow, default, _tokenSource.Token);
}
Expand Down Expand Up @@ -162,6 +179,13 @@ private IBlockImprovementContext CreateBlockImprovementContext(string payloadId,

// Wait for the adjusted time or until cancellation is requested
await Task.Delay(adjustedWaitTime, token);
if (_blockProducer.SupportsNotifications)
{
await Task.WhenAny(_newPendingTxWaiter.Task, token.AsTask());
// Rearm the txPool listener
_newPendingTxWaiter = new TaskCompletionSource();
}

// Proceed if not cancelled and the context is still valid
if (!token.IsCancellationRequested && !context.Disposed) // if GetPayload wasn't called for this item or it wasn't cleared
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@

namespace Nethermind.Merge.Plugin.BlockProduction
{
public class PostMergeBlockProducer : BlockProducerBase
public class PostMergeBlockProducer : BlockProducerBase, ITxSourceNotifier
{
private readonly ITxSourceNotifier? _notifier;

public PostMergeBlockProducer(
ITxSource txSource,
IBlockchainProcessor processor,
Expand All @@ -45,8 +47,26 @@ public PostMergeBlockProducer(
miningConfig
)
{
if (txSource is ITxSourceNotifier notifier)
{
_notifier = notifier;
notifier.NewPendingTransactions += Notifier_NewPending;
SupportsNotifications = true;
}
}

public bool SupportsNotifications { get; }

private void Notifier_NewPending(object? sender, TxPool.TxEventArgs e)
{
NewPendingTransactions?.Invoke(sender, e);
}

public bool IsInterestingTx(Transaction tx, BlockHeader parent)
=> _notifier?.IsInterestingTx(tx, parent) ?? true;

public event EventHandler<TxPool.TxEventArgs>? NewPendingTransactions;

public virtual Block PrepareEmptyBlock(BlockHeader parent, PayloadAttributes? payloadAttributes = null, CancellationToken token = default)
{
BlockHeader blockHeader = PrepareBlockHeader(parent, payloadAttributes);
Expand Down

0 comments on commit fb7935e

Please sign in to comment.