From a3a53c8534415895a79f603ca54a39e3140f4f8f Mon Sep 17 00:00:00 2001 From: Yawning Angel Date: Thu, 17 Mar 2022 08:43:58 +0000 Subject: [PATCH] go/worker/registration: Add a random re-registration delay This is hard coded to 5%, and will not be enabled if the mock backend is in use so that the tests hopefully do not need alterations. --- .changelog/4574.feature.md | 1 + go/worker/registration/worker.go | 85 +++++++++++++++++++++++++++++++- 2 files changed, 85 insertions(+), 1 deletion(-) create mode 100644 .changelog/4574.feature.md diff --git a/.changelog/4574.feature.md b/.changelog/4574.feature.md new file mode 100644 index 00000000000..219de0234ec --- /dev/null +++ b/.changelog/4574.feature.md @@ -0,0 +1 @@ +go/worker/registration: Add a random re-registration delay diff --git a/go/worker/registration/worker.go b/go/worker/registration/worker.go index c6da0c51201..1db01170e07 100644 --- a/go/worker/registration/worker.go +++ b/go/worker/registration/worker.go @@ -3,6 +3,8 @@ package registration import ( "context" "fmt" + "math" + "math/rand" "sync" "sync/atomic" "time" @@ -23,6 +25,7 @@ import ( "github.com/oasisprotocol/oasis-core/go/common/logging" "github.com/oasisprotocol/oasis-core/go/common/node" "github.com/oasisprotocol/oasis-core/go/common/persistent" + "github.com/oasisprotocol/oasis-core/go/common/pubsub" "github.com/oasisprotocol/oasis-core/go/common/version" consensus "github.com/oasisprotocol/oasis-core/go/consensus/api" control "github.com/oasisprotocol/oasis-core/go/control/api" @@ -228,6 +231,12 @@ func (w *Worker) registrationLoop() { // nolint: gocyclo // Delay node registration till after the consensus service has // finished initial synchronization if applicable. + var ( + blockCh <-chan *consensus.Block + + delayReregistration bool + maxReregistrationDelay int64 + ) if w.consensus != nil { w.logger.Debug("waiting for consensus sync") select { @@ -236,11 +245,50 @@ func (w *Worker) registrationLoop() { // nolint: gocyclo case <-w.consensus.Synced(): } w.logger.Debug("consensus synced, entering registration loop") + + beaconParameters, err := w.beacon.ConsensusParameters(w.ctx, consensus.HeightLatest) + switch err { + case nil: + delayReregistration = beaconParameters.Backend == beacon.BackendVRF + if delayReregistration { + epochInterval := beaconParameters.VRFParameters.Interval + maxReregistrationDelay = epochInterval / 100 * 5 // 5% + if maxReregistrationDelay == 0 { + w.logger.Warn("epoch interval too short to provide meaningful re-registration delay", + "epoch_interval", epochInterval, + ) + delayReregistration = false + } + } + default: + w.logger.Error("failed to query beacon parameters", + "err", err, + ) + } + + } + if delayReregistration { + // Register for block heights so we can impose a random re-registration + // delay if needed. + var ( + blockSub pubsub.ClosableSubscription + err error + ) + blockCh, blockSub, err = w.consensus.WatchBlocks(w.ctx) + switch err { + case nil: + defer blockSub.Close() + default: + w.logger.Error("failed to watch blocks", + "err", err, + ) + } } // (re-)register the node on each epoch transition. This doesn't // need to be strict block-epoch time, since it just serves to - // extend the node's expiration. + // extend the node's expiration, and we add a randomized delay + // anyway. ch, sub, err := w.beacon.WatchLatestEpoch(w.ctx) if err != nil { w.logger.Error("failed to watch epochs", @@ -299,6 +347,8 @@ func (w *Worker) registrationLoop() { // nolint: gocyclo epoch beacon.EpochTime lastTLSRotationEpoch beacon.EpochTime + reregisterHeight int64 = math.MaxInt64 + tlsRotationPending = true first = true ) @@ -310,6 +360,16 @@ Loop: case <-w.stopRegCh: w.logger.Info("node deregistration and eventual shutdown requested") return + case block := <-blockCh: + if block.Height < reregisterHeight { + continue + } + + // Target re-registration height reached. + w.logger.Info("re-register height reached for current epoch", + "epoch", epoch, + "height", block.Height, + ) case epoch = <-ch: // Epoch updated, check if we can submit a registration. @@ -339,6 +399,26 @@ Loop: } } } + + if delayReregistration { + // Derive the re-registration delay. + epochHeight, err := w.beacon.GetEpochBlock(w.ctx, epoch) + switch err { + case nil: + // Schedule the re-registration, and wait till the target height. + reregisterHeight = epochHeight + rand.Int63n(maxReregistrationDelay) + w.logger.Info("per-epoch re-registration scheduled", + "epoch_height", epochHeight, + "target_height", reregisterHeight, + ) + continue + default: + w.logger.Error("failed to query block height for epoch", + "err", err, + "epoch", epoch, + ) + } + } case ev := <-entityCh: // Entity registration update. if !ev.IsRegistration || !ev.Entity.ID.Equal(w.entityID) { @@ -348,6 +428,9 @@ Loop: // Notification that a role provider has been updated. } + // Disarm the re-registration delay height. + reregisterHeight = math.MaxInt64 + // If there are any role providers which are still not ready, we must wait for more // notifications. hooks, cbs, vers := func() (h []RegisterNodeHook, cbs []RegisterNodeCallback, vers []uint64) {