Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Export snowman bootstrapper #2331

Merged
merged 3 commits into from
Nov 17, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions chains/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -898,7 +898,8 @@ func (m *manager) createAvalancheChain(
Blocked: blockBlocker,
VM: vmWrappingProposerVM,
}
snowmanBootstrapper, err := smbootstrap.New(
var snowmanBootstrapper common.BootstrapableEngine
snowmanBootstrapper, err = smbootstrap.New(
bootstrapCfg,
snowmanEngine.Start,
)
Expand Down Expand Up @@ -1249,7 +1250,8 @@ func (m *manager) createSnowmanChain(
VM: vm,
Bootstrapped: bootstrapFunc,
}
bootstrapper, err := smbootstrap.New(
var bootstrapper common.BootstrapableEngine
bootstrapper, err = smbootstrap.New(
bootstrapCfg,
engine.Start,
)
Expand Down
44 changes: 22 additions & 22 deletions snow/engine/snowman/bootstrap/bootstrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,14 @@ import (
const bootstrappingDelay = 10 * time.Second

var (
_ common.BootstrapableEngine = (*bootstrapper)(nil)
_ common.BootstrapableEngine = (*Bootstrapper)(nil)

errUnexpectedTimeout = errors.New("unexpected timeout fired")
)

// Invariant: The VM is not guaranteed to be initialized until Start has been
// called, so it must be guaranteed the VM is not used until after Start.
type bootstrapper struct {
type Bootstrapper struct {
Config

// list of NoOpsHandler for messages dropped by bootstrapper
Expand Down Expand Up @@ -83,13 +83,13 @@ type bootstrapper struct {
bootstrappedOnce sync.Once
}

func New(config Config, onFinished func(ctx context.Context, lastReqID uint32) error) (common.BootstrapableEngine, error) {
func New(config Config, onFinished func(ctx context.Context, lastReqID uint32) error) (*Bootstrapper, error) {
metrics, err := newMetrics("bs", config.Ctx.Registerer)
if err != nil {
return nil, err
}

b := &bootstrapper{
b := &Bootstrapper{
Config: config,
metrics: metrics,
StateSummaryFrontierHandler: common.NewNoOpStateSummaryFrontierHandler(config.Ctx.Log),
Expand All @@ -111,11 +111,11 @@ func New(config Config, onFinished func(ctx context.Context, lastReqID uint32) e
return b, nil
}

func (b *bootstrapper) Context() *snow.ConsensusContext {
func (b *Bootstrapper) Context() *snow.ConsensusContext {
return b.Ctx
}

func (b *bootstrapper) Start(ctx context.Context, startReqID uint32) error {
func (b *Bootstrapper) Start(ctx context.Context, startReqID uint32) error {
b.Ctx.Log.Info("starting bootstrapper")

b.Ctx.State.Set(snow.EngineState{
Expand Down Expand Up @@ -159,7 +159,7 @@ func (b *bootstrapper) Start(ctx context.Context, startReqID uint32) error {

// Ancestors handles the receipt of multiple containers. Should be received in
// response to a GetAncestors message to [nodeID] with request ID [requestID]
func (b *bootstrapper) Ancestors(ctx context.Context, nodeID ids.NodeID, requestID uint32, blks [][]byte) error {
func (b *Bootstrapper) Ancestors(ctx context.Context, nodeID ids.NodeID, requestID uint32, blks [][]byte) error {
// Make sure this is in response to a request we made
wantedBlkID, ok := b.OutstandingRequests.Remove(nodeID, requestID)
if !ok { // this message isn't in response to a request we made
Expand Down Expand Up @@ -229,7 +229,7 @@ func (b *bootstrapper) Ancestors(ctx context.Context, nodeID ids.NodeID, request
return b.process(ctx, requestedBlock, blockSet)
}

func (b *bootstrapper) GetAncestorsFailed(ctx context.Context, nodeID ids.NodeID, requestID uint32) error {
func (b *Bootstrapper) GetAncestorsFailed(ctx context.Context, nodeID ids.NodeID, requestID uint32) error {
blkID, ok := b.OutstandingRequests.Remove(nodeID, requestID)
if !ok {
b.Ctx.Log.Debug("unexpectedly called GetAncestorsFailed",
Expand All @@ -246,7 +246,7 @@ func (b *bootstrapper) GetAncestorsFailed(ctx context.Context, nodeID ids.NodeID
return b.fetch(ctx, blkID)
}

func (b *bootstrapper) Connected(ctx context.Context, nodeID ids.NodeID, nodeVersion *version.Application) error {
func (b *Bootstrapper) Connected(ctx context.Context, nodeID ids.NodeID, nodeVersion *version.Application) error {
if err := b.VM.Connected(ctx, nodeID, nodeVersion); err != nil {
return err
}
Expand All @@ -267,7 +267,7 @@ func (b *bootstrapper) Connected(ctx context.Context, nodeID ids.NodeID, nodeVer
return b.Startup(ctx)
}

func (b *bootstrapper) Disconnected(ctx context.Context, nodeID ids.NodeID) error {
func (b *Bootstrapper) Disconnected(ctx context.Context, nodeID ids.NodeID) error {
if err := b.VM.Disconnected(ctx, nodeID); err != nil {
return err
}
Expand All @@ -280,7 +280,7 @@ func (b *bootstrapper) Disconnected(ctx context.Context, nodeID ids.NodeID) erro
return nil
}

func (b *bootstrapper) Timeout(ctx context.Context) error {
func (b *Bootstrapper) Timeout(ctx context.Context) error {
if !b.awaitingTimeout {
return errUnexpectedTimeout
}
Expand All @@ -293,11 +293,11 @@ func (b *bootstrapper) Timeout(ctx context.Context) error {
return b.OnFinished(ctx, b.Config.SharedCfg.RequestID)
}

func (*bootstrapper) Gossip(context.Context) error {
func (*Bootstrapper) Gossip(context.Context) error {
return nil
}

func (b *bootstrapper) Shutdown(ctx context.Context) error {
func (b *Bootstrapper) Shutdown(ctx context.Context) error {
b.Ctx.Log.Info("shutting down bootstrapper")

b.Ctx.Lock.Lock()
Expand All @@ -306,7 +306,7 @@ func (b *bootstrapper) Shutdown(ctx context.Context) error {
return b.VM.Shutdown(ctx)
}

func (b *bootstrapper) Notify(_ context.Context, msg common.Message) error {
func (b *Bootstrapper) Notify(_ context.Context, msg common.Message) error {
if msg != common.StateSyncDone {
b.Ctx.Log.Warn("received an unexpected message from the VM",
zap.Stringer("msg", msg),
Expand All @@ -318,7 +318,7 @@ func (b *bootstrapper) Notify(_ context.Context, msg common.Message) error {
return nil
}

func (b *bootstrapper) HealthCheck(ctx context.Context) (interface{}, error) {
func (b *Bootstrapper) HealthCheck(ctx context.Context) (interface{}, error) {
b.Ctx.Lock.Lock()
defer b.Ctx.Lock.Unlock()

Expand All @@ -330,11 +330,11 @@ func (b *bootstrapper) HealthCheck(ctx context.Context) (interface{}, error) {
return intf, vmErr
}

func (b *bootstrapper) GetVM() common.VM {
func (b *Bootstrapper) GetVM() common.VM {
return b.VM
}

func (b *bootstrapper) ForceAccepted(ctx context.Context, acceptedContainerIDs []ids.ID) error {
func (b *Bootstrapper) ForceAccepted(ctx context.Context, acceptedContainerIDs []ids.ID) error {
pendingContainerIDs := b.Blocked.MissingIDs()

// Initialize the fetch from set to the currently preferred peers
Expand Down Expand Up @@ -377,7 +377,7 @@ func (b *bootstrapper) ForceAccepted(ctx context.Context, acceptedContainerIDs [
}

// Get block [blkID] and its ancestors from a validator
func (b *bootstrapper) fetch(ctx context.Context, blkID ids.ID) error {
func (b *Bootstrapper) fetch(ctx context.Context, blkID ids.ID) error {
// Make sure we haven't already requested this block
if b.OutstandingRequests.Contains(blkID) {
return nil
Expand Down Expand Up @@ -406,7 +406,7 @@ func (b *bootstrapper) fetch(ctx context.Context, blkID ids.ID) error {
// markUnavailable removes [nodeID] from the set of peers used to fetch
// ancestors. If the set becomes empty, it is reset to the currently preferred
// peers so bootstrapping can continue.
func (b *bootstrapper) markUnavailable(nodeID ids.NodeID) {
func (b *Bootstrapper) markUnavailable(nodeID ids.NodeID) {
b.fetchFrom.Remove(nodeID)

// if [fetchFrom] has become empty, reset it to the currently preferred
Expand All @@ -416,7 +416,7 @@ func (b *bootstrapper) markUnavailable(nodeID ids.NodeID) {
}
}

func (b *bootstrapper) Clear(context.Context) error {
func (b *Bootstrapper) Clear(context.Context) error {
b.Ctx.Lock.Lock()
defer b.Ctx.Lock.Unlock()

Expand All @@ -436,7 +436,7 @@ func (b *bootstrapper) Clear(context.Context) error {
//
// If [blk]'s height is <= the last accepted height, then it will be removed
// from the missingIDs set.
func (b *bootstrapper) process(ctx context.Context, blk snowman.Block, processingBlocks map[ids.ID]snowman.Block) error {
func (b *Bootstrapper) process(ctx context.Context, blk snowman.Block, processingBlocks map[ids.ID]snowman.Block) error {
for {
blkID := blk.ID()
if b.Halted() {
Expand Down Expand Up @@ -555,7 +555,7 @@ func (b *bootstrapper) process(ctx context.Context, blk snowman.Block, processin

// checkFinish repeatedly executes pending transactions and requests new frontier vertices until there aren't any new ones
// after which it finishes the bootstrap process
func (b *bootstrapper) checkFinish(ctx context.Context) error {
func (b *Bootstrapper) checkFinish(ctx context.Context) error {
if numPending := b.Blocked.NumMissingIDs(); numPending != 0 {
return nil
}
Expand Down
18 changes: 6 additions & 12 deletions snow/engine/snowman/bootstrap/bootstrapper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -697,10 +697,10 @@ func TestBootstrapperEmptyResponse(t *testing.T) {

// add another two validators to the fetch set to test behavior on empty response
newPeerID := ids.GenerateTestNodeID()
bs.(*bootstrapper).fetchFrom.Add(newPeerID)
bs.fetchFrom.Add(newPeerID)

newPeerID = ids.GenerateTestNodeID()
bs.(*bootstrapper).fetchFrom.Add(newPeerID)
bs.fetchFrom.Add(newPeerID)

require.NoError(bs.Ancestors(context.Background(), peerID, requestID, [][]byte{blkBytes2}))
require.Equal(blkID1, requestedBlock)
Expand All @@ -720,7 +720,7 @@ func TestBootstrapperEmptyResponse(t *testing.T) {
require.Equal(choices.Accepted, blk2.Status())

// check peerToBlacklist was removed from the fetch set
require.NotContains(bs.(*bootstrapper).fetchFrom, peerToBlacklist)
require.NotContains(bs.fetchFrom, peerToBlacklist)
}

// There are multiple needed blocks and Ancestors returns all at once
Expand Down Expand Up @@ -1106,7 +1106,7 @@ func TestRestartBootstrapping(t *testing.T) {
return nil, errUnknownBlock
}

bsIntf, err := New(
bs, err := New(
config,
func(context.Context, uint32) error {
config.Ctx.State.Set(snow.EngineState{
Expand All @@ -1117,8 +1117,6 @@ func TestRestartBootstrapping(t *testing.T) {
},
)
require.NoError(err)
require.IsType(&bootstrapper{}, bsIntf)
bs := bsIntf.(*bootstrapper)

vm.CantSetState = false
require.NoError(bs.Start(context.Background(), 0))
Expand Down Expand Up @@ -1212,7 +1210,7 @@ func TestBootstrapOldBlockAfterStateSync(t *testing.T) {
return nil, errUnknownBlock
}

bsIntf, err := New(
bs, err := New(
config,
func(context.Context, uint32) error {
config.Ctx.State.Set(snow.EngineState{
Expand All @@ -1223,8 +1221,6 @@ func TestBootstrapOldBlockAfterStateSync(t *testing.T) {
},
)
require.NoError(err)
require.IsType(&bootstrapper{}, bsIntf)
bs := bsIntf.(*bootstrapper)

vm.CantSetState = false
require.NoError(bs.Start(context.Background(), 0))
Expand Down Expand Up @@ -1283,7 +1279,7 @@ func TestBootstrapContinueAfterHalt(t *testing.T) {
return blk0.ID(), nil
}

bsIntf, err := New(
bs, err := New(
config,
func(context.Context, uint32) error {
config.Ctx.State.Set(snow.EngineState{
Expand All @@ -1294,8 +1290,6 @@ func TestBootstrapContinueAfterHalt(t *testing.T) {
},
)
require.NoError(err)
require.IsType(&bootstrapper{}, bsIntf)
bs := bsIntf.(*bootstrapper)

vm.GetBlockF = func(_ context.Context, blkID ids.ID) (snowman.Block, error) {
switch blkID {
Expand Down
Loading