Skip to content

Commit

Permalink
go/storage: reduce storage checkpointing
Browse files Browse the repository at this point in the history
  • Loading branch information
ptrus committed Mar 11, 2022
1 parent a9e863a commit 3f112cd
Show file tree
Hide file tree
Showing 7 changed files with 32 additions and 8 deletions.
5 changes: 5 additions & 0 deletions .changelog/4561.cfg.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
go/worker/storage: Storage checkpoints are now disabled by default

- `worker.storage.checkpointer.disabled` flag is removed.

- use the `worker.storage.checkpointer.enabled` flag to enable checkpoints.
1 change: 1 addition & 0 deletions .changelog/4561.feature.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Randomize storage checkpoints wall-clock interval
7 changes: 7 additions & 0 deletions go/oasis-test-runner/oasis/args.go
Original file line number Diff line number Diff line change
Expand Up @@ -466,6 +466,13 @@ func (args *argBuilder) workerStorageDebugDisableCheckpointSync(disable bool) *a
return args
}

func (args *argBuilder) workerStorageCheckpointSyncEnabled(enable bool) *argBuilder {
if enable {
args.vec = append(args.vec, Argument{Name: workerStorage.CfgWorkerCheckpointerEnabled})
}
return args
}

func (args *argBuilder) workerStorageCheckpointCheckInterval(interval time.Duration) *argBuilder {
if interval > 0 {
args.vec = append(args.vec, Argument{
Expand Down
1 change: 1 addition & 0 deletions go/oasis-test-runner/oasis/compute.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,7 @@ func (worker *Compute) AddArgs(args *argBuilder) error {
storageBackend(worker.storageBackend).
workerStoragePublicRPCEnabled(!worker.disablePublicRPC).
workerStorageDebugDisableCheckpointSync(worker.checkpointSyncDisabled).
workerStorageCheckpointSyncEnabled(true).
workerStorageCheckpointCheckInterval(worker.checkpointCheckInterval).
configureDebugCrashPoints(worker.crashPointsProbability).
tendermintSupplementarySanity(worker.supplementarySanityInterval).
Expand Down
18 changes: 14 additions & 4 deletions go/storage/mkvs/checkpoint/checkpointer.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package checkpoint
import (
"context"
"fmt"
"math/rand"
"sort"
"time"

Expand All @@ -15,6 +16,10 @@ import (
"github.com/oasisprotocol/oasis-core/go/storage/mkvs/node"
)

// Specifies the factor by which the checkpoint interval (in wall clock time)
// is randomized.
const checkpointIntervalRandomizationFactor = 0.1

// CheckpointerConfig is a checkpointer configuration.
type CheckpointerConfig struct {
// Name identifying this checkpointer in logs.
Expand Down Expand Up @@ -285,17 +290,22 @@ func (c *checkpointer) worker(ctx context.Context) {
c.logger.Debug("storage checkpointer terminating")
}()

// Use a ticker to avoid checking for checkpoints too often.
ticker := time.NewTicker(c.cfg.CheckInterval)
defer ticker.Stop()
getCheckInterval := func() time.Duration {
delta := float64(c.cfg.CheckInterval) * checkpointIntervalRandomizationFactor
min := float64(c.cfg.CheckInterval) - delta
max := float64(c.cfg.CheckInterval) + delta

// Random from the range: [min, max].
return time.Duration(min + (rand.Float64() * (max - min + 1)))
}

paused := false

for {
select {
case <-ctx.Done():
return
case <-ticker.C:
case <-time.After(getCheckInterval()):
case <-c.flushCh.Out():
case paused = <-c.pausedCh:
continue
Expand Down
6 changes: 3 additions & 3 deletions go/worker/storage/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ const (
// storage committee members.
CfgWorkerPublicRPCEnabled = "worker.storage.public_rpc.enabled"

// CfgWorkerCheckpointerDisabled disables the storage checkpointer.
CfgWorkerCheckpointerDisabled = "worker.storage.checkpointer.disabled"
// CfgWorkerCheckpointerEnabled enables the storage checkpointer.
CfgWorkerCheckpointerEnabled = "worker.storage.checkpointer.enabled"
// CfgWorkerCheckpointCheckInterval configures the checkpointer check interval.
CfgWorkerCheckpointCheckInterval = "worker.storage.checkpointer.check_interval"

Expand Down Expand Up @@ -89,7 +89,7 @@ func NewLocalBackend(
func init() {
Flags.Uint(cfgWorkerFetcherCount, 4, "Number of concurrent storage diff fetchers")
Flags.Bool(CfgWorkerPublicRPCEnabled, false, "Enable storage RPC access for all nodes")
Flags.Bool(CfgWorkerCheckpointerDisabled, false, "Disable the storage checkpointer")
Flags.Bool(CfgWorkerCheckpointerEnabled, false, "Enable the storage checkpointer")
Flags.Duration(CfgWorkerCheckpointCheckInterval, 1*time.Minute, "Storage checkpointer check interval")
Flags.Bool(CfgWorkerCheckpointSyncDisabled, false, "Disable initial storage sync from checkpoints")

Expand Down
2 changes: 1 addition & 1 deletion go/worker/storage/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ func New(
}

var checkpointerCfg *checkpoint.CheckpointerConfig
if !viper.GetBool(CfgWorkerCheckpointerDisabled) {
if viper.GetBool(CfgWorkerCheckpointerEnabled) {
checkpointerCfg = &checkpoint.CheckpointerConfig{
CheckInterval: viper.GetDuration(CfgWorkerCheckpointCheckInterval),
}
Expand Down

0 comments on commit 3f112cd

Please sign in to comment.