Skip to content

Commit

Permalink
Remove PeekTxs
Browse files Browse the repository at this point in the history
  • Loading branch information
dhrubabasu committed Dec 11, 2023
1 parent 4b6e354 commit b741693
Show file tree
Hide file tree
Showing 5 changed files with 98 additions and 98 deletions.
34 changes: 28 additions & 6 deletions vms/platformvm/block/builder/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,10 +234,6 @@ func (b *builder) BuildBlock(context.Context) (snowman.Block, error) {
return nil, err
}

// Remove selected txs from mempool now that we are returning the block to
// the consensus engine.
txs := statelessBlk.Txs()
b.Mempool.Remove(txs)
return b.blkManager.NewBlock(statelessBlk), nil
}

Expand Down Expand Up @@ -281,8 +277,34 @@ func buildBlock(
)
}

var (
blockTxs []*txs.Tx
remainingSize = targetBlockSize
)

for {
builder.txExecutorBackend.Ctx.Log.Error("[builder.buildBlock] mempool: peeking tx",
zap.Int("remainingSize", remainingSize),
zap.Error(err),
)
tx := builder.Mempool.Peek()
if tx == nil || len(tx.Bytes()) > remainingSize {
break
}
builder.Mempool.Remove([]*txs.Tx{tx})

txID := tx.ID()
builder.txExecutorBackend.Ctx.Log.Error("[builder.buildBlock] mempool: removed tx",
zap.Stringer("txID", txID),
zap.Error(err),
)

remainingSize -= len(tx.Bytes())
blockTxs = append(blockTxs, tx)
}

// If there is no reason to build a block, don't.
if !builder.Mempool.HasTxs() && !forceAdvanceTime {
if len(blockTxs) == 0 && !forceAdvanceTime {
builder.txExecutorBackend.Ctx.Log.Debug("no pending txs to issue into a block")
return nil, ErrNoPendingBlocks
}
Expand All @@ -292,7 +314,7 @@ func buildBlock(
timestamp,
parentID,
height,
builder.Mempool.PeekTxs(targetBlockSize),
blockTxs,
)
}

Expand Down
34 changes: 24 additions & 10 deletions vms/platformvm/block/builder/builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -414,10 +414,15 @@ func TestBuildBlock(t *testing.T) {
builderF: func(ctrl *gomock.Controller) *builder {
mempool := mempool.NewMockMempool(ctrl)

// There are txs.
mempool.EXPECT().DropExpiredStakerTxs(gomock.Any()).Return([]ids.ID{})
mempool.EXPECT().HasTxs().Return(true)
mempool.EXPECT().PeekTxs(targetBlockSize).Return([]*txs.Tx{tx})

gomock.InOrder(
mempool.EXPECT().Peek().Return(tx),
mempool.EXPECT().Remove([]*txs.Tx{tx}),
// Second loop iteration
mempool.EXPECT().Peek().Return(nil),
)

return &builder{
Mempool: mempool,
}
Expand Down Expand Up @@ -463,7 +468,7 @@ func TestBuildBlock(t *testing.T) {

// There are no txs.
mempool.EXPECT().DropExpiredStakerTxs(gomock.Any()).Return([]ids.ID{})
mempool.EXPECT().HasTxs().Return(false)
mempool.EXPECT().Peek().Return(nil)

clk := &mockable.Clock{}
clk.Set(now)
Expand Down Expand Up @@ -511,8 +516,7 @@ func TestBuildBlock(t *testing.T) {

// There are no txs.
mempool.EXPECT().DropExpiredStakerTxs(gomock.Any()).Return([]ids.ID{})
mempool.EXPECT().HasTxs().Return(false)
mempool.EXPECT().PeekTxs(targetBlockSize).Return(nil)
mempool.EXPECT().Peek().Return(nil)

clk := &mockable.Clock{}
clk.Set(now)
Expand Down Expand Up @@ -566,8 +570,13 @@ func TestBuildBlock(t *testing.T) {

// There is a tx.
mempool.EXPECT().DropExpiredStakerTxs(gomock.Any()).Return([]ids.ID{})
mempool.EXPECT().HasTxs().Return(true)
mempool.EXPECT().PeekTxs(targetBlockSize).Return([]*txs.Tx{tx})

gomock.InOrder(
mempool.EXPECT().Peek().Return(tx),
mempool.EXPECT().Remove([]*txs.Tx{tx}),
// Second loop iteration
mempool.EXPECT().Peek().Return(nil),
)

clk := &mockable.Clock{}
clk.Set(now)
Expand Down Expand Up @@ -620,8 +629,13 @@ func TestBuildBlock(t *testing.T) {
// There are no decision txs
// There is a staker tx.
mempool.EXPECT().DropExpiredStakerTxs(gomock.Any()).Return([]ids.ID{})
mempool.EXPECT().HasTxs().Return(true)
mempool.EXPECT().PeekTxs(targetBlockSize).Return([]*txs.Tx{tx})

gomock.InOrder(
mempool.EXPECT().Peek().Return(tx),
mempool.EXPECT().Remove([]*txs.Tx{tx}),
// Second loop iteration
mempool.EXPECT().Peek().Return(nil),
)

clk := &mockable.Clock{}
clk.Set(now)
Expand Down
31 changes: 6 additions & 25 deletions vms/platformvm/txs/mempool/mempool.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,13 +50,8 @@ type Mempool interface {
Get(txID ids.ID) *txs.Tx
Remove(txs []*txs.Tx)

// Following Banff activation, all mempool transactions,
// (both decision and staker) are included into Standard blocks.
// HasTxs allow to check for availability of any mempool transaction.
HasTxs() bool
// PeekTxs returns the next txs for Banff blocks
// up to maxTxsBytes without removing them from the mempool.
PeekTxs(maxTxsBytes int) []*txs.Tx
// Peek returns the oldest tx in the mempool.
Peek() *txs.Tx

// Drops all [txs.Staker] transactions whose [StartTime] is before
// [minStartTime] from [mempool]. The dropped tx ids are returned.
Expand Down Expand Up @@ -210,23 +205,9 @@ func (m *mempool) Remove(txsToRemove []*txs.Tx) {
}
}

func (m *mempool) HasTxs() bool {
return m.unissuedTxs.Len() > 0
}

func (m *mempool) PeekTxs(maxTxsBytes int) []*txs.Tx {
var txs []*txs.Tx
txIter := m.unissuedTxs.NewIterator()
size := 0
for txIter.Next() {
tx := txIter.Value()
size += len(tx.Bytes())
if size > maxTxsBytes {
return txs
}
txs = append(txs, tx)
}
return txs
func (m *mempool) Peek() *txs.Tx {
_, tx, _ := m.unissuedTxs.Oldest()
return tx
}

func (m *mempool) MarkDropped(txID ids.ID, reason error) {
Expand All @@ -239,7 +220,7 @@ func (m *mempool) GetDropReason(txID ids.ID) error {
}

func (m *mempool) RequestBuildBlock(emptyBlockPermitted bool) {
if !emptyBlockPermitted && !m.HasTxs() {
if !emptyBlockPermitted && m.unissuedTxs.Len() == 0 {
return
}

Expand Down
69 changes: 33 additions & 36 deletions vms/platformvm/txs/mempool/mempool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ package mempool

import (
"errors"
"math"
"testing"
"time"

Expand All @@ -14,6 +13,7 @@ import (
"github.com/stretchr/testify/require"

"github.com/ava-labs/avalanchego/ids"
"github.com/ava-labs/avalanchego/snow/engine/common"
"github.com/ava-labs/avalanchego/utils/crypto/secp256k1"
"github.com/ava-labs/avalanchego/vms/components/avax"
"github.com/ava-labs/avalanchego/vms/platformvm/txs"
Expand Down Expand Up @@ -58,9 +58,6 @@ func TestDecisionTxsInMempool(t *testing.T) {
decisionTxs, err := createTestDecisionTxs(2)
require.NoError(err)

// txs must not already there before we start
require.False(mpool.HasTxs())

for _, tx := range decisionTxs {
// tx not already there
require.False(mpool.Has(tx.ID()))
Expand All @@ -75,20 +72,6 @@ func TestDecisionTxsInMempool(t *testing.T) {
require.NotNil(retrieved)
require.Equal(tx, retrieved)

// we can peek it
peeked := mpool.PeekTxs(math.MaxInt)

// tx will be among those peeked,
// in NO PARTICULAR ORDER
found := false
for _, pk := range peeked {
if pk.ID() == tx.ID() {
found = true
break
}
}
require.True(found)

// once removed it cannot be there
mpool.Remove([]*txs.Tx{tx})

Expand All @@ -113,7 +96,7 @@ func TestProposalTxsInMempool(t *testing.T) {
proposalTxs, err := createTestProposalTxs(2)
require.NoError(err)

for i, tx := range proposalTxs {
for _, tx := range proposalTxs {
require.False(mpool.Has(tx.ID()))

// we can insert
Expand All @@ -126,23 +109,6 @@ func TestProposalTxsInMempool(t *testing.T) {
require.NotNil(retrieved)
require.Equal(tx, retrieved)

{
// we can peek it
peeked := mpool.PeekTxs(math.MaxInt)
require.Len(peeked, i+1)

// tx will be among those peeked,
// in NO PARTICULAR ORDER
found := false
for _, pk := range peeked {
if pk.ID() == tx.ID() {
found = true
break
}
}
require.True(found)
}

// once removed it cannot be there
mpool.Remove([]*txs.Tx{tx})

Expand Down Expand Up @@ -255,3 +221,34 @@ func TestDropExpiredStakerTxs(t *testing.T) {
minStartTime := time.Unix(9, 0)
require.Len(mempool.DropExpiredStakerTxs(minStartTime), 1)
}

func TestPeekTxs(t *testing.T) {
require := require.New(t)

registerer := prometheus.NewRegistry()
toEngine := make(chan common.Message, 100)
mempool, err := New("mempool", registerer, toEngine)
require.NoError(err)

testDecisionTxs, err := createTestDecisionTxs(1)
require.NoError(err)
testProposalTxs, err := createTestProposalTxs(1)
require.NoError(err)

require.Nil(mempool.Peek())

require.NoError(mempool.Add(testDecisionTxs[0]))
require.NoError(mempool.Add(testProposalTxs[0]))

require.Equal(mempool.Peek(), testDecisionTxs[0])
require.NotEqual(mempool.Peek(), testProposalTxs[0])

mempool.Remove([]*txs.Tx{testDecisionTxs[0]})

require.NotEqual(mempool.Peek(), testDecisionTxs[0])
require.Equal(mempool.Peek(), testProposalTxs[0])

mempool.Remove([]*txs.Tx{testProposalTxs[0]})

require.Nil(mempool.Peek())
}
28 changes: 7 additions & 21 deletions vms/platformvm/txs/mempool/mock_mempool.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit b741693

Please sign in to comment.