Skip to content

Commit

Permalink
go/storage: reduce storage checkpointing
Browse files Browse the repository at this point in the history
  • Loading branch information
ptrus committed Mar 11, 2022
1 parent acb6bfc commit ad009aa
Show file tree
Hide file tree
Showing 10 changed files with 44 additions and 28 deletions.
5 changes: 5 additions & 0 deletions .changelog/4561.cfg.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
go/worker/storage: Storage checkpoints are now disabled by default

- `worker.storage.checkpointer.disabled` flag is removed.

- use the `worker.storage.checkpointer.enabled` flag to enable checkpoints.
1 change: 1 addition & 0 deletions .changelog/4561.feature.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Randomize storage checkpoints wall-clock interval
16 changes: 16 additions & 0 deletions go/common/random/random.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package random
import (
"math/rand"
"sync"
"time"
)

// NewRand is a convenience function to generate a new
Expand Down Expand Up @@ -49,3 +50,18 @@ func (c *concurrenySafeSource) Seed(seed int64) {
c.src.Seed(seed)
c.mut.Unlock()
}

// Borrowed from https://github.com/cenkalti/backoff.

// Returns a random value from the following interval:
// [currentInterval - randomizationFactor * currentInterval, currentInterval + randomizationFactor * currentInterval].
func GetRandomValueFromInterval(randomizationFactor, random float64, currentInterval time.Duration) time.Duration {
delta := randomizationFactor * float64(currentInterval)
minInterval := float64(currentInterval) - delta
maxInterval := float64(currentInterval) + delta

// Get a random value from the range [minInterval, maxInterval].
// The formula used below has a +1 because if the minInterval is 1 and the maxInterval is 3 then
// we want a 33% chance for selecting either 1, 2 or 3.
return time.Duration(minInterval + (random * (maxInterval - minInterval + 1)))
}
3 changes: 2 additions & 1 deletion go/consensus/tendermint/full/full.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (
"github.com/oasisprotocol/oasis-core/go/common/logging"
"github.com/oasisprotocol/oasis-core/go/common/node"
"github.com/oasisprotocol/oasis-core/go/common/pubsub"
"github.com/oasisprotocol/oasis-core/go/common/random"
cmservice "github.com/oasisprotocol/oasis-core/go/common/service"
"github.com/oasisprotocol/oasis-core/go/common/version"
consensusAPI "github.com/oasisprotocol/oasis-core/go/consensus/api"
Expand Down Expand Up @@ -1387,7 +1388,7 @@ func (t *fullService) lazyInit() error {
// when all the other services start shutting down.
//
// Randomize the period so that not all nodes shut down at the same time.
delay := getRandomValueFromInterval(0.5, rand.Float64(), viper.GetDuration(CfgUpgradeStopDelay))
delay := random.GetRandomValueFromInterval(0.5, rand.Float64(), viper.GetDuration(CfgUpgradeStopDelay))
time.Sleep(delay)

t.Logger.Info("stopping the node for upgrade")
Expand Down
18 changes: 0 additions & 18 deletions go/consensus/tendermint/full/helpers.go

This file was deleted.

7 changes: 7 additions & 0 deletions go/oasis-test-runner/oasis/args.go
Original file line number Diff line number Diff line change
Expand Up @@ -466,6 +466,13 @@ func (args *argBuilder) workerStorageDebugDisableCheckpointSync(disable bool) *a
return args
}

func (args *argBuilder) workerStorageCheckpointerEnabled(enable bool) *argBuilder {
if enable {
args.vec = append(args.vec, Argument{Name: workerStorage.CfgWorkerCheckpointerEnabled})
}
return args
}

func (args *argBuilder) workerStorageCheckpointCheckInterval(interval time.Duration) *argBuilder {
if interval > 0 {
args.vec = append(args.vec, Argument{
Expand Down
1 change: 1 addition & 0 deletions go/oasis-test-runner/oasis/compute.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,7 @@ func (worker *Compute) AddArgs(args *argBuilder) error {
storageBackend(worker.storageBackend).
workerStoragePublicRPCEnabled(!worker.disablePublicRPC).
workerStorageDebugDisableCheckpointSync(worker.checkpointSyncDisabled).
workerStorageCheckpointerEnabled(true).
workerStorageCheckpointCheckInterval(worker.checkpointCheckInterval).
configureDebugCrashPoints(worker.crashPointsProbability).
tendermintSupplementarySanity(worker.supplementarySanityInterval).
Expand Down
13 changes: 8 additions & 5 deletions go/storage/mkvs/checkpoint/checkpointer.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package checkpoint
import (
"context"
"fmt"
"math/rand"
"sort"
"time"

Expand All @@ -11,10 +12,15 @@ import (
"github.com/oasisprotocol/oasis-core/go/common"
"github.com/oasisprotocol/oasis-core/go/common/logging"
"github.com/oasisprotocol/oasis-core/go/common/pubsub"
"github.com/oasisprotocol/oasis-core/go/common/random"
db "github.com/oasisprotocol/oasis-core/go/storage/mkvs/db/api"
"github.com/oasisprotocol/oasis-core/go/storage/mkvs/node"
)

// Specifies the factor by which the checkpoint interval (in wall clock time)
// is randomized.
const checkpointIntervalRandomizationFactor = 0.1

// CheckpointerConfig is a checkpointer configuration.
type CheckpointerConfig struct {
// Name identifying this checkpointer in logs.
Expand Down Expand Up @@ -285,17 +291,14 @@ func (c *checkpointer) worker(ctx context.Context) {
c.logger.Debug("storage checkpointer terminating")
}()

// Use a ticker to avoid checking for checkpoints too often.
ticker := time.NewTicker(c.cfg.CheckInterval)
defer ticker.Stop()

paused := false

for {
interval := random.GetRandomValueFromInterval(checkpointIntervalRandomizationFactor, rand.Float64(), c.cfg.CheckInterval)
select {
case <-ctx.Done():
return
case <-ticker.C:
case <-time.After(interval):
case <-c.flushCh.Out():
case paused = <-c.pausedCh:
continue
Expand Down
6 changes: 3 additions & 3 deletions go/worker/storage/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ const (
// storage committee members.
CfgWorkerPublicRPCEnabled = "worker.storage.public_rpc.enabled"

// CfgWorkerCheckpointerDisabled disables the storage checkpointer.
CfgWorkerCheckpointerDisabled = "worker.storage.checkpointer.disabled"
// CfgWorkerCheckpointerEnabled enables the storage checkpointer.
CfgWorkerCheckpointerEnabled = "worker.storage.checkpointer.enabled"
// CfgWorkerCheckpointCheckInterval configures the checkpointer check interval.
CfgWorkerCheckpointCheckInterval = "worker.storage.checkpointer.check_interval"

Expand Down Expand Up @@ -89,7 +89,7 @@ func NewLocalBackend(
func init() {
Flags.Uint(cfgWorkerFetcherCount, 4, "Number of concurrent storage diff fetchers")
Flags.Bool(CfgWorkerPublicRPCEnabled, false, "Enable storage RPC access for all nodes")
Flags.Bool(CfgWorkerCheckpointerDisabled, false, "Disable the storage checkpointer")
Flags.Bool(CfgWorkerCheckpointerEnabled, false, "Enable the storage checkpointer")
Flags.Duration(CfgWorkerCheckpointCheckInterval, 1*time.Minute, "Storage checkpointer check interval")
Flags.Bool(CfgWorkerCheckpointSyncDisabled, false, "Disable initial storage sync from checkpoints")

Expand Down
2 changes: 1 addition & 1 deletion go/worker/storage/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ func New(
}

var checkpointerCfg *checkpoint.CheckpointerConfig
if !viper.GetBool(CfgWorkerCheckpointerDisabled) {
if viper.GetBool(CfgWorkerCheckpointerEnabled) {
checkpointerCfg = &checkpoint.CheckpointerConfig{
CheckInterval: viper.GetDuration(CfgWorkerCheckpointCheckInterval),
}
Expand Down

0 comments on commit ad009aa

Please sign in to comment.