Skip to content

Commit

Permalink
Merge pull request #4574 from oasisprotocol/yawning/feature/randomize…
Browse files Browse the repository at this point in the history
…-registration-delay

go/worker/registration: Add a random re-registration delay
  • Loading branch information
Yawning authored Mar 17, 2022
2 parents 59d58e8 + a3a53c8 commit afa260f
Show file tree
Hide file tree
Showing 2 changed files with 85 additions and 1 deletion.
1 change: 1 addition & 0 deletions .changelog/4574.feature.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
go/worker/registration: Add a random re-registration delay
85 changes: 84 additions & 1 deletion go/worker/registration/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package registration
import (
"context"
"fmt"
"math"
"math/rand"
"sync"
"sync/atomic"
"time"
Expand All @@ -23,6 +25,7 @@ import (
"github.com/oasisprotocol/oasis-core/go/common/logging"
"github.com/oasisprotocol/oasis-core/go/common/node"
"github.com/oasisprotocol/oasis-core/go/common/persistent"
"github.com/oasisprotocol/oasis-core/go/common/pubsub"
"github.com/oasisprotocol/oasis-core/go/common/version"
consensus "github.com/oasisprotocol/oasis-core/go/consensus/api"
control "github.com/oasisprotocol/oasis-core/go/control/api"
Expand Down Expand Up @@ -228,6 +231,12 @@ func (w *Worker) registrationLoop() { // nolint: gocyclo

// Delay node registration till after the consensus service has
// finished initial synchronization if applicable.
var (
blockCh <-chan *consensus.Block

delayReregistration bool
maxReregistrationDelay int64
)
if w.consensus != nil {
w.logger.Debug("waiting for consensus sync")
select {
Expand All @@ -236,11 +245,50 @@ func (w *Worker) registrationLoop() { // nolint: gocyclo
case <-w.consensus.Synced():
}
w.logger.Debug("consensus synced, entering registration loop")

beaconParameters, err := w.beacon.ConsensusParameters(w.ctx, consensus.HeightLatest)
switch err {
case nil:
delayReregistration = beaconParameters.Backend == beacon.BackendVRF
if delayReregistration {
epochInterval := beaconParameters.VRFParameters.Interval
maxReregistrationDelay = epochInterval / 100 * 5 // 5%
if maxReregistrationDelay == 0 {
w.logger.Warn("epoch interval too short to provide meaningful re-registration delay",
"epoch_interval", epochInterval,
)
delayReregistration = false
}
}
default:
w.logger.Error("failed to query beacon parameters",
"err", err,
)
}

}
if delayReregistration {
// Register for block heights so we can impose a random re-registration
// delay if needed.
var (
blockSub pubsub.ClosableSubscription
err error
)
blockCh, blockSub, err = w.consensus.WatchBlocks(w.ctx)
switch err {
case nil:
defer blockSub.Close()
default:
w.logger.Error("failed to watch blocks",
"err", err,
)
}
}

// (re-)register the node on each epoch transition. This doesn't
// need to be strict block-epoch time, since it just serves to
// extend the node's expiration.
// extend the node's expiration, and we add a randomized delay
// anyway.
ch, sub, err := w.beacon.WatchLatestEpoch(w.ctx)
if err != nil {
w.logger.Error("failed to watch epochs",
Expand Down Expand Up @@ -299,6 +347,8 @@ func (w *Worker) registrationLoop() { // nolint: gocyclo
epoch beacon.EpochTime
lastTLSRotationEpoch beacon.EpochTime

reregisterHeight int64 = math.MaxInt64

tlsRotationPending = true
first = true
)
Expand All @@ -310,6 +360,16 @@ Loop:
case <-w.stopRegCh:
w.logger.Info("node deregistration and eventual shutdown requested")
return
case block := <-blockCh:
if block.Height < reregisterHeight {
continue
}

// Target re-registration height reached.
w.logger.Info("re-register height reached for current epoch",
"epoch", epoch,
"height", block.Height,
)
case epoch = <-ch:
// Epoch updated, check if we can submit a registration.

Expand Down Expand Up @@ -339,6 +399,26 @@ Loop:
}
}
}

if delayReregistration {
// Derive the re-registration delay.
epochHeight, err := w.beacon.GetEpochBlock(w.ctx, epoch)
switch err {
case nil:
// Schedule the re-registration, and wait till the target height.
reregisterHeight = epochHeight + rand.Int63n(maxReregistrationDelay)
w.logger.Info("per-epoch re-registration scheduled",
"epoch_height", epochHeight,
"target_height", reregisterHeight,
)
continue
default:
w.logger.Error("failed to query block height for epoch",
"err", err,
"epoch", epoch,
)
}
}
case ev := <-entityCh:
// Entity registration update.
if !ev.IsRegistration || !ev.Entity.ID.Equal(w.entityID) {
Expand All @@ -348,6 +428,9 @@ Loop:
// Notification that a role provider has been updated.
}

// Disarm the re-registration delay height.
reregisterHeight = math.MaxInt64

// If there are any role providers which are still not ready, we must wait for more
// notifications.
hooks, cbs, vers := func() (h []RegisterNodeHook, cbs []RegisterNodeCallback, vers []uint64) {
Expand Down

0 comments on commit afa260f

Please sign in to comment.