Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

vms/platformvm: Remove PeekTxs from Mempool interface #2378

Merged
merged 13 commits into from
Dec 11, 2023
24 changes: 18 additions & 6 deletions vms/platformvm/block/builder/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,10 +234,6 @@ func (b *builder) BuildBlock(context.Context) (snowman.Block, error) {
return nil, err
}

// Remove selected txs from mempool now that we are returning the block to
// the consensus engine.
txs := statelessBlk.Txs()
b.Mempool.Remove(txs)
return b.blkManager.NewBlock(statelessBlk), nil
}

Expand Down Expand Up @@ -281,8 +277,24 @@ func buildBlock(
)
}

var (
blockTxs []*txs.Tx
remainingSize = targetBlockSize
)

for {
tx, exists := builder.Mempool.Peek()
if !exists || len(tx.Bytes()) > remainingSize {
break
}
builder.Mempool.Remove([]*txs.Tx{tx})

remainingSize -= len(tx.Bytes())
blockTxs = append(blockTxs, tx)
dhrubabasu marked this conversation as resolved.
Show resolved Hide resolved
}

// If there is no reason to build a block, don't.
if !builder.Mempool.HasTxs() && !forceAdvanceTime {
if len(blockTxs) == 0 && !forceAdvanceTime {
builder.txExecutorBackend.Ctx.Log.Debug("no pending txs to issue into a block")
return nil, ErrNoPendingBlocks
}
Expand All @@ -292,7 +304,7 @@ func buildBlock(
timestamp,
parentID,
height,
builder.Mempool.PeekTxs(targetBlockSize),
blockTxs,
)
}

Expand Down
34 changes: 24 additions & 10 deletions vms/platformvm/block/builder/builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -414,10 +414,15 @@ func TestBuildBlock(t *testing.T) {
builderF: func(ctrl *gomock.Controller) *builder {
mempool := mempool.NewMockMempool(ctrl)

// There are txs.
mempool.EXPECT().DropExpiredStakerTxs(gomock.Any()).Return([]ids.ID{})
mempool.EXPECT().HasTxs().Return(true)
mempool.EXPECT().PeekTxs(targetBlockSize).Return([]*txs.Tx{tx})

gomock.InOrder(
mempool.EXPECT().Peek().Return(tx, true),
mempool.EXPECT().Remove([]*txs.Tx{tx}),
// Second loop iteration
mempool.EXPECT().Peek().Return(nil, false),
)

return &builder{
Mempool: mempool,
}
Expand Down Expand Up @@ -463,7 +468,7 @@ func TestBuildBlock(t *testing.T) {

// There are no txs.
mempool.EXPECT().DropExpiredStakerTxs(gomock.Any()).Return([]ids.ID{})
mempool.EXPECT().HasTxs().Return(false)
mempool.EXPECT().Peek().Return(nil, false)

clk := &mockable.Clock{}
clk.Set(now)
Expand Down Expand Up @@ -511,8 +516,7 @@ func TestBuildBlock(t *testing.T) {

// There are no txs.
mempool.EXPECT().DropExpiredStakerTxs(gomock.Any()).Return([]ids.ID{})
mempool.EXPECT().HasTxs().Return(false)
mempool.EXPECT().PeekTxs(targetBlockSize).Return(nil)
mempool.EXPECT().Peek().Return(nil, false)

clk := &mockable.Clock{}
clk.Set(now)
Expand Down Expand Up @@ -566,8 +570,13 @@ func TestBuildBlock(t *testing.T) {

// There is a tx.
mempool.EXPECT().DropExpiredStakerTxs(gomock.Any()).Return([]ids.ID{})
mempool.EXPECT().HasTxs().Return(true)
mempool.EXPECT().PeekTxs(targetBlockSize).Return([]*txs.Tx{tx})

gomock.InOrder(
mempool.EXPECT().Peek().Return(tx, true),
mempool.EXPECT().Remove([]*txs.Tx{tx}),
// Second loop iteration
mempool.EXPECT().Peek().Return(nil, false),
)

clk := &mockable.Clock{}
clk.Set(now)
Expand Down Expand Up @@ -620,8 +629,13 @@ func TestBuildBlock(t *testing.T) {
// There are no decision txs
// There is a staker tx.
mempool.EXPECT().DropExpiredStakerTxs(gomock.Any()).Return([]ids.ID{})
mempool.EXPECT().HasTxs().Return(true)
mempool.EXPECT().PeekTxs(targetBlockSize).Return([]*txs.Tx{tx})

gomock.InOrder(
mempool.EXPECT().Peek().Return(tx, true),
mempool.EXPECT().Remove([]*txs.Tx{tx}),
// Second loop iteration
mempool.EXPECT().Peek().Return(nil, false),
)

clk := &mockable.Clock{}
clk.Set(now)
Expand Down
31 changes: 6 additions & 25 deletions vms/platformvm/txs/mempool/mempool.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,13 +50,8 @@ type Mempool interface {
Get(txID ids.ID) *txs.Tx
Remove(txs []*txs.Tx)

// Following Banff activation, all mempool transactions,
// (both decision and staker) are included into Standard blocks.
// HasTxs allow to check for availability of any mempool transaction.
HasTxs() bool
// PeekTxs returns the next txs for Banff blocks
// up to maxTxsBytes without removing them from the mempool.
PeekTxs(maxTxsBytes int) []*txs.Tx
// Peek returns the oldest tx in the mempool.
Peek() (tx *txs.Tx, exists bool)

// Drops all [txs.Staker] transactions whose [StartTime] is before
// [minStartTime] from [mempool]. The dropped tx ids are returned.
Expand Down Expand Up @@ -210,23 +205,9 @@ func (m *mempool) Remove(txsToRemove []*txs.Tx) {
}
}

func (m *mempool) HasTxs() bool {
return m.unissuedTxs.Len() > 0
}

func (m *mempool) PeekTxs(maxTxsBytes int) []*txs.Tx {
var txs []*txs.Tx
txIter := m.unissuedTxs.NewIterator()
size := 0
for txIter.Next() {
tx := txIter.Value()
size += len(tx.Bytes())
if size > maxTxsBytes {
return txs
}
txs = append(txs, tx)
}
return txs
func (m *mempool) Peek() (*txs.Tx, bool) {
_, tx, exists := m.unissuedTxs.Oldest()
return tx, exists
}

func (m *mempool) MarkDropped(txID ids.ID, reason error) {
Expand All @@ -239,7 +220,7 @@ func (m *mempool) GetDropReason(txID ids.ID) error {
}

func (m *mempool) RequestBuildBlock(emptyBlockPermitted bool) {
if !emptyBlockPermitted && !m.HasTxs() {
if !emptyBlockPermitted && m.unissuedTxs.Len() == 0 {
return
}

Expand Down
77 changes: 41 additions & 36 deletions vms/platformvm/txs/mempool/mempool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ package mempool

import (
"errors"
"math"
"testing"
"time"

Expand All @@ -14,6 +13,7 @@ import (
"github.com/stretchr/testify/require"

"github.com/ava-labs/avalanchego/ids"
"github.com/ava-labs/avalanchego/snow/engine/common"
"github.com/ava-labs/avalanchego/utils/crypto/secp256k1"
"github.com/ava-labs/avalanchego/vms/components/avax"
"github.com/ava-labs/avalanchego/vms/platformvm/txs"
Expand Down Expand Up @@ -58,9 +58,6 @@ func TestDecisionTxsInMempool(t *testing.T) {
decisionTxs, err := createTestDecisionTxs(2)
require.NoError(err)

// txs must not already there before we start
require.False(mpool.HasTxs())

for _, tx := range decisionTxs {
// tx not already there
require.False(mpool.Has(tx.ID()))
Expand All @@ -75,20 +72,6 @@ func TestDecisionTxsInMempool(t *testing.T) {
require.NotNil(retrieved)
require.Equal(tx, retrieved)

// we can peek it
peeked := mpool.PeekTxs(math.MaxInt)

// tx will be among those peeked,
// in NO PARTICULAR ORDER
found := false
for _, pk := range peeked {
if pk.ID() == tx.ID() {
found = true
break
}
}
require.True(found)

// once removed it cannot be there
mpool.Remove([]*txs.Tx{tx})

Expand All @@ -113,7 +96,7 @@ func TestProposalTxsInMempool(t *testing.T) {
proposalTxs, err := createTestProposalTxs(2)
require.NoError(err)

for i, tx := range proposalTxs {
for _, tx := range proposalTxs {
require.False(mpool.Has(tx.ID()))

// we can insert
Expand All @@ -126,23 +109,6 @@ func TestProposalTxsInMempool(t *testing.T) {
require.NotNil(retrieved)
require.Equal(tx, retrieved)

{
// we can peek it
peeked := mpool.PeekTxs(math.MaxInt)
require.Len(peeked, i+1)

// tx will be among those peeked,
// in NO PARTICULAR ORDER
found := false
for _, pk := range peeked {
if pk.ID() == tx.ID() {
found = true
break
}
}
require.True(found)
}

// once removed it cannot be there
mpool.Remove([]*txs.Tx{tx})

Expand Down Expand Up @@ -255,3 +221,42 @@ func TestDropExpiredStakerTxs(t *testing.T) {
minStartTime := time.Unix(9, 0)
require.Len(mempool.DropExpiredStakerTxs(minStartTime), 1)
}

func TestPeekTxs(t *testing.T) {
require := require.New(t)

registerer := prometheus.NewRegistry()
toEngine := make(chan common.Message, 100)
mempool, err := New("mempool", registerer, toEngine)
require.NoError(err)

testDecisionTxs, err := createTestDecisionTxs(1)
require.NoError(err)
testProposalTxs, err := createTestProposalTxs(1)
require.NoError(err)

tx, exists := mempool.Peek()
require.False(exists)
require.Nil(tx)

require.NoError(mempool.Add(testDecisionTxs[0]))
require.NoError(mempool.Add(testProposalTxs[0]))

tx, exists = mempool.Peek()
require.True(exists)
require.Equal(tx, testDecisionTxs[0])
require.NotEqual(tx, testProposalTxs[0])

mempool.Remove([]*txs.Tx{testDecisionTxs[0]})

tx, exists = mempool.Peek()
require.True(exists)
require.NotEqual(tx, testDecisionTxs[0])
require.Equal(tx, testProposalTxs[0])

mempool.Remove([]*txs.Tx{testProposalTxs[0]})

tx, exists = mempool.Peek()
require.False(exists)
require.Nil(tx)
}
31 changes: 9 additions & 22 deletions vms/platformvm/txs/mempool/mock_mempool.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading