Skip to content

Commit

Permalink
vms/platformvm: Verify txs before building a block (#2359)
Browse files Browse the repository at this point in the history
Co-authored-by: Stephen Buttolph <[email protected]>
  • Loading branch information
dhrubabasu and StephenButtolph authored Dec 12, 2023
1 parent 4be744e commit 0b2b109
Show file tree
Hide file tree
Showing 6 changed files with 213 additions and 115 deletions.
147 changes: 114 additions & 33 deletions vms/platformvm/block/builder/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,12 @@ import (

"github.com/ava-labs/avalanchego/ids"
"github.com/ava-labs/avalanchego/snow/consensus/snowman"
"github.com/ava-labs/avalanchego/utils/set"
"github.com/ava-labs/avalanchego/utils/timer/mockable"
"github.com/ava-labs/avalanchego/utils/units"
"github.com/ava-labs/avalanchego/vms/platformvm/block"
"github.com/ava-labs/avalanchego/vms/platformvm/state"
"github.com/ava-labs/avalanchego/vms/platformvm/status"
"github.com/ava-labs/avalanchego/vms/platformvm/txs"
"github.com/ava-labs/avalanchego/vms/platformvm/txs/mempool"

Expand Down Expand Up @@ -192,16 +194,9 @@ func (b *builder) ShutdownBlockTimer() {
// This method removes the transactions from the returned
// blocks from the mempool.
func (b *builder) BuildBlock(context.Context) (snowman.Block, error) {
defer func() {
// If we need to advance the chain's timestamp in a standard block, but
// we build an invalid block, then we need to re-trigger block building.
//
// TODO: Remove once we are guaranteed to build a valid block.
b.ResetBlockTimer()
// If there are still transactions in the mempool, then we need to
// re-trigger block building.
b.Mempool.RequestBuildBlock(false /*=emptyBlockPermitted*/)
}()
// If there are still transactions in the mempool, then we need to
// re-trigger block building.
defer b.Mempool.RequestBuildBlock(false /*=emptyBlockPermitted*/)

b.txExecutorBackend.Ctx.Log.Debug("starting to attempt to build a block")

Expand Down Expand Up @@ -259,38 +254,41 @@ func buildBlock(
return nil, fmt.Errorf("could not build tx to reward staker: %w", err)
}

var blockTxs []*txs.Tx
// TODO: Cleanup post-Durango
if builder.txExecutorBackend.Config.IsDurangoActivated(timestamp) {
blockTxs, err = packBlockTxs(
parentID,
parentState,
builder.Mempool,
builder.txExecutorBackend,
builder.blkManager,
timestamp,
)
if err != nil {
return nil, fmt.Errorf("failed to pack block txs: %w", err)
}
}

return block.NewBanffProposalBlock(
timestamp,
parentID,
height,
rewardValidatorTx,
[]*txs.Tx{}, // TODO: Populate with StandardBlock txs
blockTxs,
)
}

// Clean out the mempool's transactions with invalid timestamps.
droppedStakerTxIDs := builder.Mempool.DropExpiredStakerTxs(timestamp.Add(txexecutor.SyncBound))
for _, txID := range droppedStakerTxIDs {
builder.txExecutorBackend.Ctx.Log.Debug("dropping tx",
zap.Stringer("txID", txID),
zap.Error(err),
)
}

var (
blockTxs []*txs.Tx
remainingSize = targetBlockSize
blockTxs, err := packBlockTxs(
parentID,
parentState,
builder.Mempool,
builder.txExecutorBackend,
builder.blkManager,
timestamp,
)

for {
tx, exists := builder.Mempool.Peek()
if !exists || len(tx.Bytes()) > remainingSize {
break
}
builder.Mempool.Remove([]*txs.Tx{tx})

remainingSize -= len(tx.Bytes())
blockTxs = append(blockTxs, tx)
if err != nil {
return nil, fmt.Errorf("failed to pack block txs: %w", err)
}

// If there is no reason to build a block, don't.
Expand All @@ -308,6 +306,89 @@ func buildBlock(
)
}

func packBlockTxs(
parentID ids.ID,
parentState state.Chain,
mempool mempool.Mempool,
backend *txexecutor.Backend,
manager blockexecutor.Manager,
timestamp time.Time,
) ([]*txs.Tx, error) {
stateDiff, err := state.NewDiffOn(parentState)
if err != nil {
return nil, err
}

changes, err := txexecutor.AdvanceTimeTo(backend, stateDiff, timestamp)
if err != nil {
return nil, err
}
changes.Apply(stateDiff)
stateDiff.SetTimestamp(timestamp)

var (
blockTxs []*txs.Tx
inputs set.Set[ids.ID]
remainingSize = targetBlockSize
)

for {
tx, exists := mempool.Peek()
if !exists {
break
}
txSize := len(tx.Bytes())
if txSize > remainingSize {
break
}
mempool.Remove([]*txs.Tx{tx})

// Invariant: [tx] has already been syntactically verified.

txDiff, err := state.NewDiffOn(stateDiff)
if err != nil {
return nil, err
}

executor := &txexecutor.StandardTxExecutor{
Backend: backend,
State: txDiff,
Tx: tx,
}

err = tx.Unsigned.Visit(executor)
if err != nil {
txID := tx.ID()
mempool.MarkDropped(txID, err)
continue
}

if inputs.Overlaps(executor.Inputs) {
txID := tx.ID()
mempool.MarkDropped(txID, blockexecutor.ErrConflictingBlockTxs)
continue
}
err = manager.VerifyUniqueInputs(parentID, executor.Inputs)
if err != nil {
txID := tx.ID()
mempool.MarkDropped(txID, err)
continue
}
inputs.Union(executor.Inputs)

txDiff.AddTx(tx, status.Committed)
err = txDiff.Apply(stateDiff)
if err != nil {
return nil, err
}

remainingSize -= txSize
blockTxs = append(blockTxs, tx)
}

return blockTxs, nil
}

// getNextStakerToReward returns the next staker txID to remove from the staking
// set with a RewardValidatorTx rather than an AdvanceTimeTx. [chainTimestamp]
// is the timestamp of the chain at the time this validator would be getting
Expand Down
96 changes: 96 additions & 0 deletions vms/platformvm/block/builder/builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,102 @@ func TestBuildBlockForceAdvanceTime(t *testing.T) {
require.Equal(nextTime.Unix(), standardBlk.Timestamp().Unix())
}

func TestBuildBlockDropExpiredStakerTxs(t *testing.T) {
require := require.New(t)

env := newEnvironment(t)
env.ctx.Lock.Lock()
defer func() {
require.NoError(shutdownEnvironment(env))
env.ctx.Lock.Unlock()
}()

var (
now = env.backend.Clk.Time()
defaultValidatorStake = 100 * units.MilliAvax

// Add a validator with StartTime in the future within [MaxFutureStartTime]
validatorStartTime = now.Add(txexecutor.MaxFutureStartTime - 1*time.Second)
validatorEndTime = validatorStartTime.Add(360 * 24 * time.Hour)
)

tx1, err := env.txBuilder.NewAddValidatorTx(
defaultValidatorStake,
uint64(validatorStartTime.Unix()),
uint64(validatorEndTime.Unix()),
ids.GenerateTestNodeID(),
preFundedKeys[0].PublicKey().Address(),
reward.PercentDenominator,
[]*secp256k1.PrivateKey{preFundedKeys[0]},
preFundedKeys[0].PublicKey().Address(),
)
require.NoError(err)
require.NoError(env.mempool.Add(tx1))
tx1ID := tx1.ID()
require.True(env.mempool.Has(tx1ID))

// Add a validator with StartTime before current chain time
validator2StartTime := now.Add(-5 * time.Second)
validator2EndTime := validator2StartTime.Add(360 * 24 * time.Hour)

tx2, err := env.txBuilder.NewAddValidatorTx(
defaultValidatorStake,
uint64(validator2StartTime.Unix()),
uint64(validator2EndTime.Unix()),
ids.GenerateTestNodeID(),
preFundedKeys[1].PublicKey().Address(),
reward.PercentDenominator,
[]*secp256k1.PrivateKey{preFundedKeys[1]},
preFundedKeys[1].PublicKey().Address(),
)
require.NoError(err)
require.NoError(env.mempool.Add(tx2))
tx2ID := tx2.ID()
require.True(env.mempool.Has(tx2ID))

// Add a validator with StartTime in the future past [MaxFutureStartTime]
validator3StartTime := now.Add(txexecutor.MaxFutureStartTime + 5*time.Second)
validator3EndTime := validator2StartTime.Add(360 * 24 * time.Hour)

tx3, err := env.txBuilder.NewAddValidatorTx(
defaultValidatorStake,
uint64(validator3StartTime.Unix()),
uint64(validator3EndTime.Unix()),
ids.GenerateTestNodeID(),
preFundedKeys[2].PublicKey().Address(),
reward.PercentDenominator,
[]*secp256k1.PrivateKey{preFundedKeys[2]},
preFundedKeys[2].PublicKey().Address(),
)
require.NoError(err)
require.NoError(env.mempool.Add(tx3))
tx3ID := tx3.ID()
require.True(env.mempool.Has(tx3ID))

// Only tx1 should be in a built block
blkIntf, err := env.Builder.BuildBlock(context.Background())
require.NoError(err)

require.IsType(&blockexecutor.Block{}, blkIntf)
blk := blkIntf.(*blockexecutor.Block)
require.Len(blk.Txs(), 1)
require.Equal(tx1ID, blk.Txs()[0].ID())

// Mempool should have none of the txs
require.False(env.mempool.Has(tx1ID))
require.False(env.mempool.Has(tx2ID))
require.False(env.mempool.Has(tx3ID))

// Only tx2 and tx3 should be dropped
require.NoError(env.mempool.GetDropReason(tx1ID))

tx2DropReason := env.mempool.GetDropReason(tx2ID)
require.ErrorIs(tx2DropReason, txexecutor.ErrTimestampNotBeforeStartTime)

tx3DropReason := env.mempool.GetDropReason(tx3ID)
require.ErrorIs(tx3DropReason, txexecutor.ErrFutureStakeTime)
}

func TestPreviouslyDroppedTxsCanBeReAddedToMempool(t *testing.T) {
require := require.New(t)

Expand Down
5 changes: 3 additions & 2 deletions vms/platformvm/block/executor/verifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,13 @@ import (
var (
_ block.Visitor = (*verifier)(nil)

ErrConflictingBlockTxs = errors.New("block contains conflicting transactions")

errApricotBlockIssuedAfterFork = errors.New("apricot block issued after fork")
errBanffProposalBlockWithMultipleTransactions = errors.New("BanffProposalBlock contains multiple transactions")
errBanffStandardBlockWithoutChanges = errors.New("BanffStandardBlock performs no state changes")
errIncorrectBlockHeight = errors.New("incorrect block height")
errChildBlockEarlierThanParent = errors.New("proposed timestamp before current chain time")
errConflictingBatchTxs = errors.New("block contains conflicting transactions")
errOptionBlockTimestampNotMatchingParent = errors.New("option block proposed timestamp not matching parent block one")
)

Expand Down Expand Up @@ -468,7 +469,7 @@ func (v *verifier) processStandardTxs(txs []*txs.Tx, state state.Diff, parentID
}
// ensure it doesn't overlap with current input batch
if inputs.Overlaps(txExecutor.Inputs) {
return nil, nil, nil, errConflictingBatchTxs
return nil, nil, nil, ErrConflictingBlockTxs
}
// Add UTXOs to batch
inputs.Union(txExecutor.Inputs)
Expand Down
42 changes: 0 additions & 42 deletions vms/platformvm/txs/mempool/mempool.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ package mempool
import (
"errors"
"fmt"
"time"

"github.com/prometheus/client_golang/prometheus"

Expand Down Expand Up @@ -53,12 +52,6 @@ type Mempool interface {
// Peek returns the oldest tx in the mempool.
Peek() (tx *txs.Tx, exists bool)

// Drops all [txs.Staker] transactions whose [StartTime] is before
// [minStartTime] from [mempool]. The dropped tx ids are returned.
//
// TODO: Remove once [StartTime] field is ignored in staker txs
DropExpiredStakerTxs(minStartTime time.Time) []ids.ID

// RequestBuildBlock notifies the consensus engine that a block should be
// built. If [emptyBlockPermitted] is true, the notification will be sent
// regardless of whether there are no transactions in the mempool. If not,
Expand Down Expand Up @@ -229,38 +222,3 @@ func (m *mempool) RequestBuildBlock(emptyBlockPermitted bool) {
default:
}
}

// Drops all [txs.Staker] transactions whose [StartTime] is before
// [minStartTime] from [mempool]. The dropped tx ids are returned.
//
// TODO: Remove once [StartTime] field is ignored in staker txs
func (m *mempool) DropExpiredStakerTxs(minStartTime time.Time) []ids.ID {
var droppedTxIDs []ids.ID

txIter := m.unissuedTxs.NewIterator()
for txIter.Next() {
tx := txIter.Value()
stakerTx, ok := tx.Unsigned.(txs.ScheduledStaker)
if !ok {
continue
}

startTime := stakerTx.StartTime()
if !startTime.Before(minStartTime) {
continue
}

txID := tx.ID()
err := fmt.Errorf(
"synchrony bound (%s) is later than staker start time (%s)",
minStartTime,
startTime,
)

m.Remove([]*txs.Tx{tx})
m.MarkDropped(txID, err) // cache tx as dropped
droppedTxIDs = append(droppedTxIDs, txID)
}

return droppedTxIDs
}
Loading

0 comments on commit 0b2b109

Please sign in to comment.