Skip to content

Commit

Permalink
go/worker/storage: Add initial sync from checkpoints
Browse files Browse the repository at this point in the history
  • Loading branch information
jberci committed Aug 26, 2020
1 parent 21c6f98 commit 7a68cf5
Show file tree
Hide file tree
Showing 8 changed files with 492 additions and 12 deletions.
3 changes: 3 additions & 0 deletions go/oasis-test-runner/oasis/fixture.go
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,8 @@ type StorageWorkerFixture struct { // nolint: maligned
AllowEarlyTermination bool `json:"allow_early_termination"`
AllowErrorTermination bool `json:"allow_error_termination"`

NoAutoStart bool `json:"no_auto_start,omitempty"`

DisableCertRotation bool `json:"disable_cert_rotation"`

LogWatcherHandlerFactories []log.WatcherHandlerFactory `json:"-"`
Expand Down Expand Up @@ -353,6 +355,7 @@ func (f *StorageWorkerFixture) Create(net *Network) (*Storage, error) {
NodeCfg: NodeCfg{
AllowEarlyTermination: f.AllowEarlyTermination,
AllowErrorTermination: f.AllowErrorTermination,
NoAutoStart: f.NoAutoStart,
LogWatcherHandlerFactories: f.LogWatcherHandlerFactories,
Consensus: f.Consensus,
},
Expand Down
9 changes: 8 additions & 1 deletion go/oasis-test-runner/oasis/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"github.com/oasisprotocol/oasis-core/go/oasis-test-runner/log"
roothash "github.com/oasisprotocol/oasis-core/go/roothash/api"
upgrade "github.com/oasisprotocol/oasis-core/go/upgrade/api"
workerStorage "github.com/oasisprotocol/oasis-core/go/worker/storage/committee"
)

// LogAssertEvent returns a handler which checks whether a specific log event was
Expand Down Expand Up @@ -75,8 +76,14 @@ func LogEventABCIPruneDelete() log.WatcherHandlerFactory {
return LogAssertEvent(abci.LogEventABCIPruneDelete, "expected ABCI pruning to be done")
}

// LogAssertRoothashRoothashReindexing returns a handler witch checks wether roothash reindexing was
// LogAssertRoothashRoothashReindexing returns a handler which checks whether roothash reindexing was
// run based on JSON log output.
func LogAssertRoothashRoothashReindexing() log.WatcherHandlerFactory {
return LogAssertEvent(roothash.LogEventHistoryReindexing, "roothash runtime reindexing not detected")
}

// LogAssertCheckpointSyncreturns a handler which checks whether initial storage sync from
// a checkpoint was successful or not.
func LogAssertCheckpointSync() log.WatcherHandlerFactory {
return LogAssertEvent(workerStorage.LogEventCheckpointSyncSuccess, "checkpoint sync did not succeed")
}
1 change: 1 addition & 0 deletions go/oasis-test-runner/oasis/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,7 @@ func (net *Network) NewStorage(cfg *StorageCfg) (*Storage, error) {
Name: storageName,
net: net,
dir: storageDir,
noAutoStart: cfg.NoAutoStart,
disableDefaultLogWatcherHandlerFactories: cfg.DisableDefaultLogWatcherHandlerFactories,
logWatcherHandlerFactories: cfg.LogWatcherHandlerFactories,
consensus: cfg.Consensus,
Expand Down
11 changes: 7 additions & 4 deletions go/oasis-test-runner/scenario/e2e/runtime/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,7 @@ func (sc *runtimeImpl) startClient(childEnv *env.Env) (*exec.Cmd, error) {
return cmd, nil
}

func (sc *runtimeImpl) wait(childEnv *env.Env, cmd *exec.Cmd, clientErrCh <-chan error) error {
func (sc *runtimeImpl) waitClient(childEnv *env.Env, cmd *exec.Cmd, clientErrCh <-chan error) error {
var err error
select {
case err = <-sc.Net.Errors():
Expand All @@ -312,11 +312,14 @@ func (sc *runtimeImpl) wait(childEnv *env.Env, cmd *exec.Cmd, clientErrCh <-chan
return err
}

if err = sc.Net.CheckLogWatchers(); err != nil {
return nil
}

func (sc *runtimeImpl) wait(childEnv *env.Env, cmd *exec.Cmd, clientErrCh <-chan error) error {
if err := sc.waitClient(childEnv, cmd, clientErrCh); err != nil {
return err
}

return nil
return sc.Net.CheckLogWatchers()
}

func (sc *runtimeImpl) Run(childEnv *env.Env) error {
Expand Down
41 changes: 38 additions & 3 deletions go/oasis-test-runner/scenario/e2e/runtime/storage_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@ package runtime
import (
"context"
"fmt"
"strings"
"time"

"github.com/oasisprotocol/oasis-core/go/oasis-test-runner/env"
"github.com/oasisprotocol/oasis-core/go/oasis-test-runner/log"
"github.com/oasisprotocol/oasis-core/go/oasis-test-runner/oasis"
"github.com/oasisprotocol/oasis-core/go/oasis-test-runner/oasis/cli"
"github.com/oasisprotocol/oasis-core/go/oasis-test-runner/scenario"
Expand Down Expand Up @@ -47,13 +49,22 @@ func (sc *storageSyncImpl) Fixture() (*oasis.NetworkFixture, error) {
// Configure runtime for storage checkpointing.
f.Runtimes[1].Storage.CheckpointInterval = 10
f.Runtimes[1].Storage.CheckpointNumKept = 1
f.Runtimes[1].Storage.CheckpointChunkSize = 1024 * 1024
f.Runtimes[1].Storage.CheckpointChunkSize = 1 * 1024
// Provision another storage node and make it ignore all applies.
f.StorageWorkers = append(f.StorageWorkers, oasis.StorageWorkerFixture{
Backend: database.BackendNameBadgerDB,
Entity: 1,
IgnoreApplies: true,
})

// One more storage worker for later, so it can do an initial sync with the snapshots.
f.StorageWorkers = append(f.StorageWorkers, oasis.StorageWorkerFixture{
Backend: database.BackendNameBadgerDB,
Entity: 1,
NoAutoStart: true,
LogWatcherHandlerFactories: []log.WatcherHandlerFactory{oasis.LogAssertCheckpointSync()},
})

return f, nil
}

Expand All @@ -64,7 +75,7 @@ func (sc *storageSyncImpl) Run(childEnv *env.Env) error {
}

// Wait for the client to exit.
if err = sc.wait(childEnv, cmd, clientErrCh); err != nil {
if err = sc.waitClient(childEnv, cmd, clientErrCh); err != nil {
return err
}

Expand Down Expand Up @@ -161,5 +172,29 @@ func (sc *storageSyncImpl) Run(childEnv *env.Env) error {
return fmt.Errorf("incorrect number of valid checkpoints (expected: >=2 got: %d)", validCps)
}

return nil
largeVal := strings.Repeat("has he his auto ", 7) // 16 bytes base string
for i := 0; i < 32; i++ {
sc.Logger.Info("submitting large transaction to runtime",
"seq", i,
)
if err = sc.submitKeyValueRuntimeInsertTx(ctx, runtimeID, fmt.Sprintf("%d key %d", i, i), fmt.Sprintf("my cp %d: ", i)+largeVal); err != nil {
return err
}
}

// Now spin up the last storage worker and check if it syncs with a checkpoint.
lateWorker := sc.Net.StorageWorkers()[3]
err = lateWorker.Start()
if err != nil {
return fmt.Errorf("can't start last storage worker: %w", err)
}
err = lateWorker.WaitReady(ctx)
if err != nil {
return fmt.Errorf("error waiting for last storage worker to become ready: %w", err)
}
// Wait a bit to give the logger in the node time to sync; the message has already been
// logged by this point, it just might not be on disk yet.
<-time.After(1 * time.Second)

return sc.Net.CheckLogWatchers()
}
3 changes: 2 additions & 1 deletion go/registry/api/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/oasisprotocol/oasis-core/go/common/quantity"
"github.com/oasisprotocol/oasis-core/go/common/sgx"
"github.com/oasisprotocol/oasis-core/go/common/version"
"github.com/oasisprotocol/oasis-core/go/oasis-node/cmd/common/flags"
staking "github.com/oasisprotocol/oasis-core/go/staking/api"
storage "github.com/oasisprotocol/oasis-core/go/storage/api"
)
Expand Down Expand Up @@ -190,7 +191,7 @@ func (s *StorageParameters) ValidateBasic() error {
}

// Verify storage checkpointing configuration if enabled.
if s.CheckpointInterval > 0 {
if s.CheckpointInterval > 0 && !flags.DebugDontBlameOasis() {
if s.CheckpointInterval < 10 {
return fmt.Errorf("storage CheckpointInterval parameter too small")
}
Expand Down
Loading

0 comments on commit 7a68cf5

Please sign in to comment.