Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add manager validator set callbacks #2950

Merged
merged 3 commits into from
Apr 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions chains/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -762,7 +762,7 @@ func (m *manager) createAvalancheChain(
if err != nil {
return nil, fmt.Errorf("error creating peer tracker: %w", err)
}
vdrs.RegisterCallbackListener(ctx.SubnetID, connectedValidators)
vdrs.RegisterSetCallbackListener(ctx.SubnetID, connectedValidators)

peerTracker, err := p2p.NewPeerTracker(
ctx.Log,
Expand Down Expand Up @@ -794,7 +794,7 @@ func (m *manager) createAvalancheChain(

connectedBeacons := tracker.NewPeers()
startupTracker := tracker.NewStartup(connectedBeacons, (3*bootstrapWeight+3)/4)
vdrs.RegisterCallbackListener(ctx.SubnetID, startupTracker)
vdrs.RegisterSetCallbackListener(ctx.SubnetID, startupTracker)

snowGetHandler, err := snowgetter.New(
vmWrappingProposerVM,
Expand Down Expand Up @@ -1107,7 +1107,7 @@ func (m *manager) createSnowmanChain(
if err != nil {
return nil, fmt.Errorf("error creating peer tracker: %w", err)
}
vdrs.RegisterCallbackListener(ctx.SubnetID, connectedValidators)
vdrs.RegisterSetCallbackListener(ctx.SubnetID, connectedValidators)

peerTracker, err := p2p.NewPeerTracker(
ctx.Log,
Expand Down Expand Up @@ -1139,7 +1139,7 @@ func (m *manager) createSnowmanChain(

connectedBeacons := tracker.NewPeers()
startupTracker := tracker.NewStartup(connectedBeacons, (3*bootstrapWeight+3)/4)
beacons.RegisterCallbackListener(ctx.SubnetID, startupTracker)
beacons.RegisterSetCallbackListener(ctx.SubnetID, startupTracker)

snowGetHandler, err := snowgetter.New(
vm,
Expand Down
2 changes: 1 addition & 1 deletion network/network.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ func NewNetwork(
if err != nil {
return nil, fmt.Errorf("initializing ip tracker failed with: %w", err)
}
config.Validators.RegisterCallbackListener(constants.PrimaryNetworkID, ipTracker)
config.Validators.RegisterSetCallbackListener(constants.PrimaryNetworkID, ipTracker)

// Track all default bootstrappers to ensure their current IPs are gossiped
// like validator IPs.
Expand Down
8 changes: 6 additions & 2 deletions node/overridden_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,12 @@ func (o *overriddenManager) GetMap(ids.ID) map[ids.NodeID]*validators.GetValidat
return o.manager.GetMap(o.subnetID)
}

func (o *overriddenManager) RegisterCallbackListener(_ ids.ID, listener validators.SetCallbackListener) {
o.manager.RegisterCallbackListener(o.subnetID, listener)
func (o *overriddenManager) RegisterCallbackListener(listener validators.ManagerCallbackListener) {
o.manager.RegisterCallbackListener(listener)
}

func (o *overriddenManager) RegisterSetCallbackListener(_ ids.ID, listener validators.SetCallbackListener) {
o.manager.RegisterSetCallbackListener(o.subnetID, listener)
}

func (o *overriddenManager) String() string {
Expand Down
2 changes: 1 addition & 1 deletion snow/engine/avalanche/bootstrap/bootstrapper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ func newConfig(t *testing.T) (Config, ids.NodeID, *common.SenderTest, *vertex.Te
totalWeight, err := vdrs.TotalWeight(constants.PrimaryNetworkID)
require.NoError(err)
startupTracker := tracker.NewStartup(peerTracker, totalWeight/2+1)
vdrs.RegisterCallbackListener(constants.PrimaryNetworkID, startupTracker)
vdrs.RegisterSetCallbackListener(constants.PrimaryNetworkID, startupTracker)

avaGetHandler, err := getter.New(manager, sender, ctx.Log, time.Second, 2000, ctx.AvalancheRegisterer)
require.NoError(err)
Expand Down
6 changes: 3 additions & 3 deletions snow/engine/snowman/bootstrap/bootstrapper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func newConfig(t *testing.T) (Config, ids.NodeID, *common.SenderTest, *block.Tes
totalWeight, err := vdrs.TotalWeight(ctx.SubnetID)
require.NoError(err)
startupTracker := tracker.NewStartup(tracker.NewPeers(), totalWeight/2+1)
vdrs.RegisterCallbackListener(ctx.SubnetID, startupTracker)
vdrs.RegisterSetCallbackListener(ctx.SubnetID, startupTracker)

require.NoError(startupTracker.Connected(context.Background(), peer, version.CurrentApp))

Expand Down Expand Up @@ -126,7 +126,7 @@ func TestBootstrapperStartsOnlyIfEnoughStakeIsConnected(t *testing.T) {
startupAlpha := alpha

startupTracker := tracker.NewStartup(tracker.NewPeers(), startupAlpha)
peers.RegisterCallbackListener(ctx.SubnetID, startupTracker)
peers.RegisterSetCallbackListener(ctx.SubnetID, startupTracker)

snowGetHandler, err := getter.New(vm, sender, ctx.Log, time.Second, 2000, ctx.Registerer)
require.NoError(err)
Expand Down Expand Up @@ -650,7 +650,7 @@ func TestBootstrapNoParseOnNew(t *testing.T) {
totalWeight, err := peers.TotalWeight(ctx.SubnetID)
require.NoError(err)
startupTracker := tracker.NewStartup(tracker.NewPeers(), totalWeight/2+1)
peers.RegisterCallbackListener(ctx.SubnetID, startupTracker)
peers.RegisterSetCallbackListener(ctx.SubnetID, startupTracker)
require.NoError(startupTracker.Connected(context.Background(), peer, version.CurrentApp))

snowGetHandler, err := getter.New(vm, sender, ctx.Log, time.Second, 2000, ctx.Registerer)
Expand Down
30 changes: 15 additions & 15 deletions snow/engine/snowman/syncer/state_syncer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ func TestStateSyncingStartsOnlyIfEnoughStakeIsConnected(t *testing.T) {

peers := tracker.NewPeers()
startup := tracker.NewStartup(peers, startupAlpha)
beacons.RegisterCallbackListener(ctx.SubnetID, startup)
beacons.RegisterSetCallbackListener(ctx.SubnetID, startup)

syncer, _, sender := buildTestsObjects(t, ctx, startup, beacons, alpha)

Expand Down Expand Up @@ -159,7 +159,7 @@ func TestStateSyncLocalSummaryIsIncludedAmongFrontiersIfAvailable(t *testing.T)

peers := tracker.NewPeers()
startup := tracker.NewStartup(peers, startupAlpha)
beacons.RegisterCallbackListener(ctx.SubnetID, startup)
beacons.RegisterSetCallbackListener(ctx.SubnetID, startup)

syncer, fullVM, _ := buildTestsObjects(t, ctx, startup, beacons, (totalWeight+1)/2)

Expand Down Expand Up @@ -197,7 +197,7 @@ func TestStateSyncNotFoundOngoingSummaryIsNotIncludedAmongFrontiers(t *testing.T

peers := tracker.NewPeers()
startup := tracker.NewStartup(peers, startupAlpha)
beacons.RegisterCallbackListener(ctx.SubnetID, startup)
beacons.RegisterSetCallbackListener(ctx.SubnetID, startup)

syncer, fullVM, _ := buildTestsObjects(t, ctx, startup, beacons, (totalWeight+1)/2)

Expand Down Expand Up @@ -228,7 +228,7 @@ func TestBeaconsAreReachedForFrontiersUponStartup(t *testing.T) {

peers := tracker.NewPeers()
startup := tracker.NewStartup(peers, startupAlpha)
beacons.RegisterCallbackListener(ctx.SubnetID, startup)
beacons.RegisterSetCallbackListener(ctx.SubnetID, startup)

syncer, _, sender := buildTestsObjects(t, ctx, startup, beacons, (totalWeight+1)/2)

Expand Down Expand Up @@ -267,7 +267,7 @@ func TestUnRequestedStateSummaryFrontiersAreDropped(t *testing.T) {

peers := tracker.NewPeers()
startup := tracker.NewStartup(peers, startupAlpha)
beacons.RegisterCallbackListener(ctx.SubnetID, startup)
beacons.RegisterSetCallbackListener(ctx.SubnetID, startup)

syncer, fullVM, sender := buildTestsObjects(t, ctx, startup, beacons, (totalWeight+1)/2)

Expand Down Expand Up @@ -357,7 +357,7 @@ func TestMalformedStateSummaryFrontiersAreDropped(t *testing.T) {

peers := tracker.NewPeers()
startup := tracker.NewStartup(peers, startupAlpha)
beacons.RegisterCallbackListener(ctx.SubnetID, startup)
beacons.RegisterSetCallbackListener(ctx.SubnetID, startup)

syncer, fullVM, sender := buildTestsObjects(t, ctx, startup, beacons, (totalWeight+1)/2)

Expand Down Expand Up @@ -426,7 +426,7 @@ func TestLateResponsesFromUnresponsiveFrontiersAreNotRecorded(t *testing.T) {

peers := tracker.NewPeers()
startup := tracker.NewStartup(peers, startupAlpha)
beacons.RegisterCallbackListener(ctx.SubnetID, startup)
beacons.RegisterSetCallbackListener(ctx.SubnetID, startup)

syncer, fullVM, sender := buildTestsObjects(t, ctx, startup, beacons, (totalWeight+1)/2)

Expand Down Expand Up @@ -509,7 +509,7 @@ func TestStateSyncIsRestartedIfTooManyFrontierSeedersTimeout(t *testing.T) {

peers := tracker.NewPeers()
startup := tracker.NewStartup(peers, startupAlpha)
beacons.RegisterCallbackListener(ctx.SubnetID, startup)
beacons.RegisterSetCallbackListener(ctx.SubnetID, startup)

syncer, fullVM, sender := buildTestsObjects(t, ctx, startup, beacons, (totalWeight+1)/2)

Expand Down Expand Up @@ -598,7 +598,7 @@ func TestVoteRequestsAreSentAsAllFrontierBeaconsResponded(t *testing.T) {

peers := tracker.NewPeers()
startup := tracker.NewStartup(peers, startupAlpha)
beacons.RegisterCallbackListener(ctx.SubnetID, startup)
beacons.RegisterSetCallbackListener(ctx.SubnetID, startup)

syncer, fullVM, sender := buildTestsObjects(t, ctx, startup, beacons, (totalWeight+1)/2)

Expand Down Expand Up @@ -669,7 +669,7 @@ func TestUnRequestedVotesAreDropped(t *testing.T) {

peers := tracker.NewPeers()
startup := tracker.NewStartup(peers, startupAlpha)
beacons.RegisterCallbackListener(ctx.SubnetID, startup)
beacons.RegisterSetCallbackListener(ctx.SubnetID, startup)

syncer, fullVM, sender := buildTestsObjects(t, ctx, startup, beacons, (totalWeight+1)/2)

Expand Down Expand Up @@ -786,7 +786,7 @@ func TestVotesForUnknownSummariesAreDropped(t *testing.T) {

peers := tracker.NewPeers()
startup := tracker.NewStartup(peers, startupAlpha)
beacons.RegisterCallbackListener(ctx.SubnetID, startup)
beacons.RegisterSetCallbackListener(ctx.SubnetID, startup)

syncer, fullVM, sender := buildTestsObjects(t, ctx, startup, beacons, (totalWeight+1)/2)

Expand Down Expand Up @@ -890,7 +890,7 @@ func TestStateSummaryIsPassedToVMAsMajorityOfVotesIsCastedForIt(t *testing.T) {

peers := tracker.NewPeers()
startup := tracker.NewStartup(peers, startupAlpha)
beacons.RegisterCallbackListener(ctx.SubnetID, startup)
beacons.RegisterSetCallbackListener(ctx.SubnetID, startup)

syncer, fullVM, sender := buildTestsObjects(t, ctx, startup, beacons, alpha)

Expand Down Expand Up @@ -1035,7 +1035,7 @@ func TestVotingIsRestartedIfMajorityIsNotReachedDueToTimeouts(t *testing.T) {

peers := tracker.NewPeers()
startup := tracker.NewStartup(peers, startupAlpha)
beacons.RegisterCallbackListener(ctx.SubnetID, startup)
beacons.RegisterSetCallbackListener(ctx.SubnetID, startup)

syncer, fullVM, sender := buildTestsObjects(t, ctx, startup, beacons, alpha)

Expand Down Expand Up @@ -1141,7 +1141,7 @@ func TestStateSyncIsStoppedIfEnoughVotesAreCastedWithNoClearMajority(t *testing.

peers := tracker.NewPeers()
startup := tracker.NewStartup(peers, startupAlpha)
beacons.RegisterCallbackListener(ctx.SubnetID, startup)
beacons.RegisterSetCallbackListener(ctx.SubnetID, startup)

syncer, fullVM, sender := buildTestsObjects(t, ctx, startup, beacons, alpha)

Expand Down Expand Up @@ -1286,7 +1286,7 @@ func TestStateSyncIsDoneOnceVMNotifies(t *testing.T) {

peers := tracker.NewPeers()
startup := tracker.NewStartup(peers, startupAlpha)
beacons.RegisterCallbackListener(ctx.SubnetID, startup)
beacons.RegisterSetCallbackListener(ctx.SubnetID, startup)

syncer, _, _ := buildTestsObjects(t, ctx, startup, beacons, (totalWeight+1)/2)

Expand Down
2 changes: 1 addition & 1 deletion snow/engine/snowman/transitive.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ func New(config Config) (*Transitive, error) {
}

acceptedFrontiers := tracker.NewAccepted()
config.Validators.RegisterCallbackListener(config.Ctx.SubnetID, acceptedFrontiers)
config.Validators.RegisterSetCallbackListener(config.Ctx.SubnetID, acceptedFrontiers)

factory := poll.NewEarlyTermNoTraversalFactory(
config.Params.AlphaPreference,
Expand Down
2 changes: 1 addition & 1 deletion snow/engine/snowman/transitive_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func setup(t *testing.T, engCfg Config) (ids.NodeID, validators.Manager, *common
require.NoError(vals.AddStaker(engCfg.Ctx.SubnetID, vdr, nil, ids.Empty, 1))
require.NoError(engCfg.ConnectedValidators.Connected(context.Background(), vdr, version.CurrentApp))

vals.RegisterCallbackListener(engCfg.Ctx.SubnetID, engCfg.ConnectedValidators)
vals.RegisterSetCallbackListener(engCfg.Ctx.SubnetID, engCfg.ConnectedValidators)

sender := &common.SenderTest{T: t}
engCfg.Sender = sender
Expand Down
2 changes: 1 addition & 1 deletion snow/networking/handler/health_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func TestHealthCheckSubnet(t *testing.T) {
require.NoError(err)

peerTracker := commontracker.NewPeers()
vdrs.RegisterCallbackListener(ctx.SubnetID, peerTracker)
vdrs.RegisterSetCallbackListener(ctx.SubnetID, peerTracker)

sb := subnets.New(
ctx.NodeID,
Expand Down
35 changes: 28 additions & 7 deletions snow/validators/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,12 @@ var (
ErrMissingValidators = errors.New("missing validators")
)

type ManagerCallbackListener interface {
OnValidatorAdded(subnetID ids.ID, nodeID ids.NodeID, pk *bls.PublicKey, txID ids.ID, weight uint64)
OnValidatorRemoved(subnetID ids.ID, nodeID ids.NodeID, weight uint64)
OnValidatorWeightChanged(subnetID ids.ID, nodeID ids.NodeID, oldWeight, newWeight uint64)
}

type SetCallbackListener interface {
OnValidatorAdded(nodeID ids.NodeID, pk *bls.PublicKey, txID ids.ID, weight uint64)
OnValidatorRemoved(nodeID ids.NodeID, weight uint64)
Expand Down Expand Up @@ -88,9 +94,13 @@ type Manager interface {
// Map of the validators in this subnet
GetMap(subnetID ids.ID) map[ids.NodeID]*GetValidatorOutput

// When a validator's weight changes, or a validator is added/removed,
// this listener is called.
RegisterCallbackListener(subnetID ids.ID, listener SetCallbackListener)
// When a validator is added, removed, or its weight changes, the listener
// will be notified of the event.
RegisterCallbackListener(listener ManagerCallbackListener)

// When a validator is added, removed, or its weight changes on [subnetID],
// the listener will be notified of the event.
RegisterSetCallbackListener(subnetID ids.ID, listener SetCallbackListener)
}

// NewManager returns a new, empty manager
Expand All @@ -105,7 +115,8 @@ type manager struct {

// Key: Subnet ID
// Value: The validators that validate the subnet
subnetToVdrs map[ids.ID]*vdrSet
subnetToVdrs map[ids.ID]*vdrSet
callbackListeners []ManagerCallbackListener
}

func (m *manager) AddStaker(subnetID ids.ID, nodeID ids.NodeID, pk *bls.PublicKey, txID ids.ID, weight uint64) error {
Expand All @@ -118,7 +129,7 @@ func (m *manager) AddStaker(subnetID ids.ID, nodeID ids.NodeID, pk *bls.PublicKe

set, exists := m.subnetToVdrs[subnetID]
if !exists {
set = newSet()
set = newSet(subnetID, m.callbackListeners)
m.subnetToVdrs[subnetID] = set
}

Expand Down Expand Up @@ -264,13 +275,23 @@ func (m *manager) GetMap(subnetID ids.ID) map[ids.NodeID]*GetValidatorOutput {
return set.Map()
}

func (m *manager) RegisterCallbackListener(subnetID ids.ID, listener SetCallbackListener) {
func (m *manager) RegisterCallbackListener(listener ManagerCallbackListener) {
m.lock.Lock()
defer m.lock.Unlock()

m.callbackListeners = append(m.callbackListeners, listener)
for _, set := range m.subnetToVdrs {
set.RegisterManagerCallbackListener(listener)
}
}

func (m *manager) RegisterSetCallbackListener(subnetID ids.ID, listener SetCallbackListener) {
m.lock.Lock()
defer m.lock.Unlock()

set, exists := m.subnetToVdrs[subnetID]
if !exists {
set = newSet()
set = newSet(subnetID, m.callbackListeners)
m.subnetToVdrs[subnetID] = set
}

Expand Down
Loading
Loading