Skip to content

Commit

Permalink
go/worker/storage: Add initial sync from checkpoints
Browse files Browse the repository at this point in the history
  • Loading branch information
jberci committed Sep 9, 2020
1 parent d132744 commit 4056529
Show file tree
Hide file tree
Showing 15 changed files with 614 additions and 18 deletions.
2 changes: 1 addition & 1 deletion .buildkite/code.pipeline.yml
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ steps:
###############
- label: E2E tests
parallelism: 7
timeout_in_minutes: 12
timeout_in_minutes: 15
command:
- .buildkite/scripts/download_e2e_test_artifacts.sh
- .buildkite/scripts/test_e2e.sh
Expand Down
4 changes: 4 additions & 0 deletions .changelog/3181.internal.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
go/worker/storage: Add initial sync from checkpoints

Instead of relying on the slow per-block root sync, the worker now tries
syncing from checkpoints, if any suitable are found.
7 changes: 7 additions & 0 deletions go/oasis-test-runner/oasis/args.go
Original file line number Diff line number Diff line change
Expand Up @@ -368,6 +368,13 @@ func (args *argBuilder) workerStorageDebugIgnoreApplies(ignore bool) *argBuilder
return args
}

func (args *argBuilder) workerStorageDebugDisableCheckpointSync(disable bool) *argBuilder {
if disable {
args.vec = append(args.vec, "--"+workerStorage.CfgWorkerCheckpointSyncDisabled)
}
return args
}

func (args *argBuilder) workerStorageCheckpointCheckInterval(interval time.Duration) *argBuilder {
if interval > 0 {
args.vec = append(args.vec, "--"+workerStorage.CfgWorkerCheckpointCheckInterval, interval.String())
Expand Down
11 changes: 9 additions & 2 deletions go/oasis-test-runner/oasis/fixture.go
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,8 @@ type StorageWorkerFixture struct { // nolint: maligned
AllowEarlyTermination bool `json:"allow_early_termination"`
AllowErrorTermination bool `json:"allow_error_termination"`

NoAutoStart bool `json:"no_auto_start,omitempty"`

DisableCertRotation bool `json:"disable_cert_rotation"`

LogWatcherHandlerFactories []log.WatcherHandlerFactory `json:"-"`
Expand All @@ -336,6 +338,7 @@ type StorageWorkerFixture struct { // nolint: maligned

CheckpointCheckInterval time.Duration `json:"checkpoint_check_interval,omitempty"`
IgnoreApplies bool `json:"ignore_applies,omitempty"`
CheckpointSyncEnabled bool `json:"checkpoint_sync_enabled,omitempty"`

// Runtimes contains the indexes of the runtimes to enable. Leave
// empty or nil for the default behaviour (i.e. include all runtimes).
Expand All @@ -353,6 +356,7 @@ func (f *StorageWorkerFixture) Create(net *Network) (*Storage, error) {
NodeCfg: NodeCfg{
AllowEarlyTermination: f.AllowEarlyTermination,
AllowErrorTermination: f.AllowErrorTermination,
NoAutoStart: f.NoAutoStart,
LogWatcherHandlerFactories: f.LogWatcherHandlerFactories,
Consensus: f.Consensus,
},
Expand All @@ -361,8 +365,11 @@ func (f *StorageWorkerFixture) Create(net *Network) (*Storage, error) {
SentryIndices: f.Sentries,
CheckpointCheckInterval: f.CheckpointCheckInterval,
IgnoreApplies: f.IgnoreApplies,
DisableCertRotation: f.DisableCertRotation,
Runtimes: f.Runtimes,
// The checkpoint syncing flas is intentionally flipped here.
// Syncing should normally be enabled, but normally disabled in tests.
CheckpointSyncDisabled: !f.CheckpointSyncEnabled,
DisableCertRotation: f.DisableCertRotation,
Runtimes: f.Runtimes,
})
}

Expand Down
9 changes: 8 additions & 1 deletion go/oasis-test-runner/oasis/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"github.com/oasisprotocol/oasis-core/go/oasis-test-runner/log"
roothash "github.com/oasisprotocol/oasis-core/go/roothash/api"
upgrade "github.com/oasisprotocol/oasis-core/go/upgrade/api"
workerStorage "github.com/oasisprotocol/oasis-core/go/worker/storage/committee"
)

// LogAssertEvent returns a handler which checks whether a specific log event was
Expand Down Expand Up @@ -81,8 +82,14 @@ func LogEventABCIPruneDelete() log.WatcherHandlerFactory {
return LogAssertEvent(abci.LogEventABCIPruneDelete, "expected ABCI pruning to be done")
}

// LogAssertRoothashRoothashReindexing returns a handler witch checks wether roothash reindexing was
// LogAssertRoothashRoothashReindexing returns a handler which checks whether roothash reindexing was
// run based on JSON log output.
func LogAssertRoothashRoothashReindexing() log.WatcherHandlerFactory {
return LogAssertEvent(roothash.LogEventHistoryReindexing, "roothash runtime reindexing not detected")
}

// LogAssertCheckpointSync returns a handler which checks whether initial storage sync from
// a checkpoint was successful or not.
func LogAssertCheckpointSync() log.WatcherHandlerFactory {
return LogAssertEvent(workerStorage.LogEventCheckpointSyncSuccess, "checkpoint sync did not succeed")
}
5 changes: 5 additions & 0 deletions go/oasis-test-runner/oasis/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ type Storage struct { // nolint: maligned

disableCertRotation bool
ignoreApplies bool
checkpointSyncDisabled bool
checkpointCheckInterval time.Duration

sentryPubKey signature.PublicKey
Expand All @@ -46,6 +47,7 @@ type StorageCfg struct { // nolint: maligned

DisableCertRotation bool
IgnoreApplies bool
CheckpointSyncDisabled bool
CheckpointCheckInterval time.Duration

Runtimes []int
Expand Down Expand Up @@ -116,6 +118,7 @@ func (worker *Storage) startNode() error {
workerP2pPort(worker.p2pPort).
workerStorageEnabled().
workerStorageDebugIgnoreApplies(worker.ignoreApplies).
workerStorageDebugDisableCheckpointSync(worker.checkpointSyncDisabled).
workerStorageCheckpointCheckInterval(worker.checkpointCheckInterval).
appendNetwork(worker.net).
appendEntity(worker.entity)
Expand Down Expand Up @@ -186,6 +189,7 @@ func (net *Network) NewStorage(cfg *StorageCfg) (*Storage, error) {
Name: storageName,
net: net,
dir: storageDir,
noAutoStart: cfg.NoAutoStart,
disableDefaultLogWatcherHandlerFactories: cfg.DisableDefaultLogWatcherHandlerFactories,
logWatcherHandlerFactories: cfg.LogWatcherHandlerFactories,
consensus: cfg.Consensus,
Expand All @@ -195,6 +199,7 @@ func (net *Network) NewStorage(cfg *StorageCfg) (*Storage, error) {
sentryIndices: cfg.SentryIndices,
disableCertRotation: cfg.DisableCertRotation,
ignoreApplies: cfg.IgnoreApplies,
checkpointSyncDisabled: cfg.CheckpointSyncDisabled,
checkpointCheckInterval: cfg.CheckpointCheckInterval,
sentryPubKey: sentryPubKey,
tmAddress: crypto.PublicKeyToTendermint(&p2pKey).Address().String(),
Expand Down
11 changes: 7 additions & 4 deletions go/oasis-test-runner/scenario/e2e/runtime/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,7 @@ func (sc *runtimeImpl) startClient(childEnv *env.Env) (*exec.Cmd, error) {
return cmd, nil
}

func (sc *runtimeImpl) wait(childEnv *env.Env, cmd *exec.Cmd, clientErrCh <-chan error) error {
func (sc *runtimeImpl) waitClient(childEnv *env.Env, cmd *exec.Cmd, clientErrCh <-chan error) error {
var err error
select {
case err = <-sc.Net.Errors():
Expand All @@ -313,11 +313,14 @@ func (sc *runtimeImpl) wait(childEnv *env.Env, cmd *exec.Cmd, clientErrCh <-chan
return err
}

if err = sc.Net.CheckLogWatchers(); err != nil {
return nil
}

func (sc *runtimeImpl) wait(childEnv *env.Env, cmd *exec.Cmd, clientErrCh <-chan error) error {
if err := sc.waitClient(childEnv, cmd, clientErrCh); err != nil {
return err
}

return nil
return sc.Net.CheckLogWatchers()
}

func (sc *runtimeImpl) Run(childEnv *env.Env) error {
Expand Down
41 changes: 38 additions & 3 deletions go/oasis-test-runner/scenario/e2e/runtime/storage_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@ package runtime
import (
"context"
"fmt"
"strings"
"time"

"github.com/oasisprotocol/oasis-core/go/oasis-test-runner/env"
"github.com/oasisprotocol/oasis-core/go/oasis-test-runner/log"
"github.com/oasisprotocol/oasis-core/go/oasis-test-runner/oasis"
"github.com/oasisprotocol/oasis-core/go/oasis-test-runner/oasis/cli"
"github.com/oasisprotocol/oasis-core/go/oasis-test-runner/scenario"
Expand Down Expand Up @@ -47,13 +49,23 @@ func (sc *storageSyncImpl) Fixture() (*oasis.NetworkFixture, error) {
// Configure runtime for storage checkpointing.
f.Runtimes[1].Storage.CheckpointInterval = 10
f.Runtimes[1].Storage.CheckpointNumKept = 1
f.Runtimes[1].Storage.CheckpointChunkSize = 1024 * 1024
f.Runtimes[1].Storage.CheckpointChunkSize = 1 * 1024
// Provision another storage node and make it ignore all applies.
f.StorageWorkers = append(f.StorageWorkers, oasis.StorageWorkerFixture{
Backend: database.BackendNameBadgerDB,
Entity: 1,
IgnoreApplies: true,
})

// One more storage worker for later, so it can do an initial sync with the snapshots.
f.StorageWorkers = append(f.StorageWorkers, oasis.StorageWorkerFixture{
Backend: database.BackendNameBadgerDB,
Entity: 1,
NoAutoStart: true,
CheckpointSyncEnabled: true,
LogWatcherHandlerFactories: []log.WatcherHandlerFactory{oasis.LogAssertCheckpointSync()},
})

return f, nil
}

Expand All @@ -64,7 +76,7 @@ func (sc *storageSyncImpl) Run(childEnv *env.Env) error {
}

// Wait for the client to exit.
if err = sc.wait(childEnv, cmd, clientErrCh); err != nil {
if err = sc.waitClient(childEnv, cmd, clientErrCh); err != nil {
return err
}

Expand Down Expand Up @@ -161,5 +173,28 @@ func (sc *storageSyncImpl) Run(childEnv *env.Env) error {
return fmt.Errorf("incorrect number of valid checkpoints (expected: >=2 got: %d)", validCps)
}

return nil
largeVal := strings.Repeat("has he his auto ", 7) // 16 bytes base string
for i := 0; i < 32; i++ {
sc.Logger.Info("submitting large transaction to runtime",
"seq", i,
)
if err = sc.submitKeyValueRuntimeInsertTx(ctx, runtimeID, fmt.Sprintf("%d key %d", i, i), fmt.Sprintf("my cp %d: ", i)+largeVal); err != nil {
return err
}
}

// Now spin up the last storage worker and check if it syncs with a checkpoint.
lateWorker := sc.Net.StorageWorkers()[3]
err = lateWorker.Start()
if err != nil {
return fmt.Errorf("can't start last storage worker: %w", err)
}
if err := lateWorker.WaitReady(ctx); err != nil {
return fmt.Errorf("erorr waiting for late storage worker to become ready: %w", err)
}
// Wait a bit to give the logger in the node time to sync; the message has already been
// logged by this point, it just might not be on disk yet.
<-time.After(1 * time.Second)

return sc.Net.CheckLogWatchers()
}
3 changes: 2 additions & 1 deletion go/registry/api/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/oasisprotocol/oasis-core/go/common/quantity"
"github.com/oasisprotocol/oasis-core/go/common/sgx"
"github.com/oasisprotocol/oasis-core/go/common/version"
"github.com/oasisprotocol/oasis-core/go/oasis-node/cmd/common/flags"
staking "github.com/oasisprotocol/oasis-core/go/staking/api"
storage "github.com/oasisprotocol/oasis-core/go/storage/api"
)
Expand Down Expand Up @@ -198,7 +199,7 @@ func (s *StorageParameters) ValidateBasic() error {
}

// Verify storage checkpointing configuration if enabled.
if s.CheckpointInterval > 0 {
if s.CheckpointInterval > 0 && !flags.DebugDontBlameOasis() {
if s.CheckpointInterval < 10 {
return fmt.Errorf("storage CheckpointInterval parameter too small")
}
Expand Down
2 changes: 1 addition & 1 deletion go/storage/api/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -478,7 +478,7 @@ func (c *storageClient) GetCheckpointChunk(ctx context.Context, chunk *checkpoin

for {
var part []byte
switch stream.RecvMsg(&part) {
switch err = stream.RecvMsg(&part); err {
case nil:
case io.EOF:
return nil
Expand Down
3 changes: 3 additions & 0 deletions go/storage/mkvs/checkpoint/checkpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,9 @@ type Restorer interface {
// StartRestore starts a checkpoint restoration process.
StartRestore(ctx context.Context, checkpoint *Metadata) error

// AbortRestore aborts a checkpoint restore in progress.
AbortRestore(ctx context.Context) error

// GetCurrentCheckpoint returns the checkpoint that is being restored. If no restoration is in
// progress, this method may return nil.
GetCurrentCheckpoint() *Metadata
Expand Down
16 changes: 16 additions & 0 deletions go/storage/mkvs/checkpoint/restorer.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,22 @@ func (rs *restorer) StartRestore(ctx context.Context, checkpoint *Metadata) erro
return nil
}

func (rs *restorer) AbortRestore(ctx context.Context) error {
rs.Lock()
defer rs.Unlock()

if rs.currentCheckpoint == nil {
return ErrNoRestoreInProgress
}

rs.pendingChunks = nil
rs.currentCheckpoint = nil

// TODO: also properly remove leftover nodes from the database (#3241).

return nil
}

func (rs *restorer) GetCurrentCheckpoint() *Metadata {
rs.Lock()
defer rs.Unlock()
Expand Down
Loading

0 comments on commit 4056529

Please sign in to comment.