Skip to content

Commit

Permalink
vms/platformvm: Remove double block building logic (#2380)
Browse files Browse the repository at this point in the history
Signed-off-by: Stephen Buttolph <[email protected]>
Co-authored-by: Stephen Buttolph <[email protected]>
Co-authored-by: aaronbuchwald <[email protected]>
  • Loading branch information
3 people authored Dec 8, 2023
1 parent c991157 commit eb570dd
Show file tree
Hide file tree
Showing 7 changed files with 147 additions and 108 deletions.
239 changes: 136 additions & 103 deletions vms/platformvm/block/builder/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,13 @@ import (
"context"
"errors"
"fmt"
"sync"
"time"

"go.uber.org/zap"

"github.com/ava-labs/avalanchego/ids"
"github.com/ava-labs/avalanchego/snow/consensus/snowman"
"github.com/ava-labs/avalanchego/utils/timer"
"github.com/ava-labs/avalanchego/utils/timer/mockable"
"github.com/ava-labs/avalanchego/utils/units"
"github.com/ava-labs/avalanchego/vms/platformvm/block"
Expand All @@ -33,24 +33,31 @@ const targetBlockSize = 128 * units.KiB
var (
_ Builder = (*builder)(nil)

ErrEndOfTime = errors.New("program time is suspiciously far in the future")
ErrNoPendingBlocks = errors.New("no pending blocks")
ErrEndOfTime = errors.New("program time is suspiciously far in the future")
ErrNoPendingBlocks = errors.New("no pending blocks")
errMissingPreferredState = errors.New("missing preferred block state")
errCalculatingNextStakerTime = errors.New("failed calculating next staker time")
)

type Builder interface {
mempool.Mempool

// ResetBlockTimer schedules a timer to notify the consensus engine once
// there is a block ready to be built. If a block is ready to be built when
// this function is called, the engine will be notified directly.
// StartBlockTimer starts to issue block creation requests to advance the
// chain timestamp.
StartBlockTimer()

// ResetBlockTimer forces the block timer to recalculate when it should
// advance the chain timestamp.
ResetBlockTimer()

// BuildBlock is called on timer clock to attempt to create
// next block
BuildBlock(context.Context) (snowman.Block, error)
// ShutdownBlockTimer stops block creation requests to advance the chain
// timestamp.
//
// Invariant: Assumes the context lock is held when calling.
ShutdownBlockTimer()

// Shutdown cleanly shuts Builder down
Shutdown()
// BuildBlock can be called to attempt to create a new block
BuildBlock(context.Context) (snowman.Block, error)
}

// builder implements a simple builder to convert txs into valid blocks
Expand All @@ -61,10 +68,11 @@ type builder struct {
txExecutorBackend *txexecutor.Backend
blkManager blockexecutor.Manager

// This timer goes off when it is time for the next validator to add/leave
// the validator set. When it goes off ResetTimer() is called, potentially
// triggering creation of a new block.
timer *timer.Timer
// resetTimer is used to signal that the block builder timer should update
// when it will trigger building of a block.
resetTimer chan struct{}
closed chan struct{}
closeOnce sync.Once
}

func New(
Expand All @@ -73,17 +81,111 @@ func New(
txExecutorBackend *txexecutor.Backend,
blkManager blockexecutor.Manager,
) Builder {
builder := &builder{
return &builder{
Mempool: mempool,
txBuilder: txBuilder,
txExecutorBackend: txExecutorBackend,
blkManager: blkManager,
resetTimer: make(chan struct{}, 1),
closed: make(chan struct{}),
}
}

func (b *builder) StartBlockTimer() {
go func() {
timer := time.NewTimer(0)
defer timer.Stop()

for {
// Invariant: The [timer] is not stopped.
select {
case <-timer.C:
case <-b.resetTimer:
if !timer.Stop() {
<-timer.C
}
case <-b.closed:
return
}

// Note: Because the context lock is not held here, it is possible
// that [ShutdownBlockTimer] is called concurrently with this
// execution.
for {
duration, err := b.durationToSleep()
if err != nil {
b.txExecutorBackend.Ctx.Log.Error("block builder encountered a fatal error",
zap.Error(err),
)
return
}

if duration > 0 {
timer.Reset(duration)
break
}

// Block needs to be issued to advance time.
b.Mempool.RequestBuildBlock(true /*=emptyBlockPermitted*/)

// Invariant: ResetBlockTimer is guaranteed to be called after
// [durationToSleep] returns a value <= 0. This is because we
// are guaranteed to attempt to build block. After building a
// valid block, the chain will have its preference updated which
// may change the duration to sleep and trigger a timer reset.
select {
case <-b.resetTimer:
case <-b.closed:
return
}
}
}
}()
}

func (b *builder) durationToSleep() (time.Duration, error) {
// Grabbing the lock here enforces that this function is not called mid-way
// through modifying of the state.
b.txExecutorBackend.Ctx.Lock.Lock()
defer b.txExecutorBackend.Ctx.Lock.Unlock()

// If [ShutdownBlockTimer] was called, we want to exit the block timer
// goroutine. We check this with the context lock held because
// [ShutdownBlockTimer] is expected to only be called with the context lock
// held.
select {
case <-b.closed:
return 0, nil
default:
}

preferredID := b.blkManager.Preferred()
preferredState, ok := b.blkManager.GetState(preferredID)
if !ok {
return 0, fmt.Errorf("%w: %s", errMissingPreferredState, preferredID)
}

nextStakerChangeTime, err := txexecutor.GetNextStakerChangeTime(preferredState)
if err != nil {
return 0, fmt.Errorf("%w of %s: %w", errCalculatingNextStakerTime, preferredID, err)
}

builder.timer = timer.NewTimer(builder.setNextBuildBlockTime)
now := b.txExecutorBackend.Clk.Time()
return nextStakerChangeTime.Sub(now), nil
}

func (b *builder) ResetBlockTimer() {
// Ensure that the timer will be reset at least once.
select {
case b.resetTimer <- struct{}{}:
default:
}
}

go txExecutorBackend.Ctx.Log.RecoverAndPanic(builder.timer.Dispatch)
return builder
func (b *builder) ShutdownBlockTimer() {
b.closeOnce.Do(func() {
close(b.closed)
})
}

// BuildBlock builds a block to be added to consensus.
Expand All @@ -93,27 +195,18 @@ func (b *builder) BuildBlock(context.Context) (snowman.Block, error) {
b.Mempool.DisableAdding()
defer func() {
b.Mempool.EnableAdding()
// If we need to advance the chain's timestamp in a standard block, but
// we build an invalid block, then we need to re-trigger block building.
//
// TODO: Remove once we are guaranteed to build a valid block.
b.ResetBlockTimer()
// If there are still transactions in the mempool, then we need to
// re-trigger block building.
b.Mempool.RequestBuildBlock(false /*=emptyBlockPermitted*/)
}()

ctx := b.txExecutorBackend.Ctx
ctx.Log.Debug("starting to attempt to build a block")

statelessBlk, err := b.buildBlock()
if err != nil {
return nil, err
}

// Remove selected txs from mempool now that we are returning the block to
// the consensus engine.
txs := statelessBlk.Txs()
b.Mempool.Remove(txs)
return b.blkManager.NewBlock(statelessBlk), nil
}
b.txExecutorBackend.Ctx.Log.Debug("starting to attempt to build a block")

// Returns the block we want to build and issue.
// Only modifies state to remove expired proposal txs.
func (b *builder) buildBlock() (block.Block, error) {
// Get the block to build on top of and retrieve the new block's context.
preferredID := b.blkManager.Preferred()
preferred, err := b.blkManager.GetBlock(preferredID)
Expand All @@ -131,83 +224,23 @@ func (b *builder) buildBlock() (block.Block, error) {
return nil, fmt.Errorf("could not calculate next staker change time: %w", err)
}

return buildBlock(
statelessBlk, err := buildBlock(
b,
preferredID,
nextHeight,
timestamp,
timeWasCapped,
preferredState,
)
}

func (b *builder) Shutdown() {
// There is a potential deadlock if the timer is about to execute a timeout.
// So, the lock must be released before stopping the timer.
ctx := b.txExecutorBackend.Ctx
ctx.Lock.Unlock()
b.timer.Stop()
ctx.Lock.Lock()
}

func (b *builder) ResetBlockTimer() {
// Next time the context lock is released, we can attempt to reset the block
// timer.
b.timer.SetTimeoutIn(0)
}

func (b *builder) setNextBuildBlockTime() {
ctx := b.txExecutorBackend.Ctx

// Grabbing the lock here enforces that this function is not called mid-way
// through modifying of the state.
ctx.Lock.Lock()
defer ctx.Lock.Unlock()

if !b.txExecutorBackend.Bootstrapped.Get() {
ctx.Log.Verbo("skipping block timer reset",
zap.String("reason", "not bootstrapped"),
)
return
}

if _, err := b.buildBlock(); err == nil {
// We can build a block now
b.Mempool.RequestBuildBlock(true /*=emptyBlockPermitted*/)
return
}

// Wake up when it's time to add/remove the next validator/delegator
preferredID := b.blkManager.Preferred()
preferredState, ok := b.blkManager.GetState(preferredID)
if !ok {
// The preferred block should always be a decision block
ctx.Log.Error("couldn't get preferred block state",
zap.Stringer("preferredID", preferredID),
zap.Stringer("lastAcceptedID", b.blkManager.LastAccepted()),
)
return
}

nextStakerChangeTime, err := txexecutor.GetNextStakerChangeTime(preferredState)
if err != nil {
ctx.Log.Error("couldn't get next staker change time",
zap.Stringer("preferredID", preferredID),
zap.Stringer("lastAcceptedID", b.blkManager.LastAccepted()),
zap.Error(err),
)
return
return nil, err
}

now := b.txExecutorBackend.Clk.Time()
waitTime := nextStakerChangeTime.Sub(now)
ctx.Log.Debug("setting next scheduled event",
zap.Time("nextEventTime", nextStakerChangeTime),
zap.Duration("timeUntil", waitTime),
)

// Wake up when it's time to add/remove the next validator
b.timer.SetTimeoutIn(waitTime)
// Remove selected txs from mempool now that we are returning the block to
// the consensus engine.
txs := statelessBlk.Txs()
b.Mempool.Remove(txs)
return b.blkManager.NewBlock(statelessBlk), nil
}

// [timestamp] is min(max(now, parent timestamp), next staker change time)
Expand Down
4 changes: 4 additions & 0 deletions vms/platformvm/block/builder/builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ func TestBlockBuilderAddLocalTx(t *testing.T) {
env.ctx.Lock.Lock()
defer func() {
require.NoError(shutdownEnvironment(env))
env.ctx.Lock.Unlock()
}()

// Create a valid transaction
Expand Down Expand Up @@ -78,6 +79,7 @@ func TestPreviouslyDroppedTxsCanBeReAddedToMempool(t *testing.T) {
env.ctx.Lock.Lock()
defer func() {
require.NoError(shutdownEnvironment(env))
env.ctx.Lock.Unlock()
}()

// Create a valid transaction
Expand Down Expand Up @@ -130,6 +132,7 @@ func TestNoErrorOnUnexpectedSetPreferenceDuringBootstrapping(t *testing.T) {
env.isBootstrapped.Set(false)
defer func() {
require.NoError(shutdownEnvironment(env))
env.ctx.Lock.Unlock()
}()

require.True(env.blkManager.SetPreference(ids.GenerateTestID())) // should not panic
Expand Down Expand Up @@ -322,6 +325,7 @@ func TestBuildBlock(t *testing.T) {
env.ctx.Lock.Lock()
defer func() {
require.NoError(t, shutdownEnvironment(env))
env.ctx.Lock.Unlock()
}()

var (
Expand Down
3 changes: 2 additions & 1 deletion vms/platformvm/block/builder/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,7 @@ func newEnvironment(t *testing.T) *environment {
&res.backend,
res.blkManager,
)
res.Builder.StartBlockTimer()

res.blkManager.SetPreference(genesisID)
addSubnet(t, res)
Expand Down Expand Up @@ -419,7 +420,7 @@ func buildGenesisTest(t *testing.T, ctx *snow.Context) []byte {
}

func shutdownEnvironment(env *environment) error {
env.Builder.Shutdown()
env.Builder.ShutdownBlockTimer()

if env.isBootstrapped.Get() {
validatorIDs := env.config.Validators.GetValidatorIDs(constants.PrimaryNetworkID)
Expand Down
1 change: 1 addition & 0 deletions vms/platformvm/block/builder/standard_block_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ func TestAtomicTxImports(t *testing.T) {
env.ctx.Lock.Lock()
defer func() {
require.NoError(shutdownEnvironment(env))
env.ctx.Lock.Unlock()
}()

utxoID := avax.UTXOID{
Expand Down
2 changes: 1 addition & 1 deletion vms/platformvm/block/executor/rejector.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func (r *rejector) rejectBlock(b block.Block, blockType string) error {
}
}

r.Mempool.RequestBuildBlock(false)
r.Mempool.RequestBuildBlock(false /*=emptyBlockPermitted*/)

return nil
}
2 changes: 1 addition & 1 deletion vms/platformvm/network/network.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ func (n *network) issueTx(tx *txs.Tx) error {
return err
}

n.mempool.RequestBuildBlock(false)
n.mempool.RequestBuildBlock(false /*=emptyBlockPermitted*/)

return nil
}
Expand Down
Loading

0 comments on commit eb570dd

Please sign in to comment.