Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

go/worker/storage: Add initial sync from checkpoints #3181

Merged
merged 3 commits into from
Sep 9, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .buildkite/code.pipeline.yml
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ steps:
###############
- label: E2E tests
parallelism: 7
timeout_in_minutes: 12
timeout_in_minutes: 15
command:
- .buildkite/scripts/download_e2e_test_artifacts.sh
- .buildkite/scripts/test_e2e.sh
Expand Down
4 changes: 4 additions & 0 deletions .changelog/3181.internal.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
go/worker/storage: Add initial sync from checkpoints

Instead of relying on the slow per-block root sync, the worker now tries
syncing from checkpoints, if any suitable are found.
7 changes: 7 additions & 0 deletions go/oasis-test-runner/oasis/args.go
Original file line number Diff line number Diff line change
Expand Up @@ -368,6 +368,13 @@ func (args *argBuilder) workerStorageDebugIgnoreApplies(ignore bool) *argBuilder
return args
}

func (args *argBuilder) workerStorageDebugDisableCheckpointSync(disable bool) *argBuilder {
if disable {
args.vec = append(args.vec, "--"+workerStorage.CfgWorkerCheckpointSyncDisabled)
}
return args
}

func (args *argBuilder) workerStorageCheckpointCheckInterval(interval time.Duration) *argBuilder {
if interval > 0 {
args.vec = append(args.vec, "--"+workerStorage.CfgWorkerCheckpointCheckInterval, interval.String())
Expand Down
11 changes: 9 additions & 2 deletions go/oasis-test-runner/oasis/fixture.go
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,8 @@ type StorageWorkerFixture struct { // nolint: maligned
AllowEarlyTermination bool `json:"allow_early_termination"`
AllowErrorTermination bool `json:"allow_error_termination"`

NoAutoStart bool `json:"no_auto_start,omitempty"`

DisableCertRotation bool `json:"disable_cert_rotation"`

LogWatcherHandlerFactories []log.WatcherHandlerFactory `json:"-"`
Expand All @@ -336,6 +338,7 @@ type StorageWorkerFixture struct { // nolint: maligned

CheckpointCheckInterval time.Duration `json:"checkpoint_check_interval,omitempty"`
IgnoreApplies bool `json:"ignore_applies,omitempty"`
CheckpointSyncEnabled bool `json:"checkpoint_sync_enabled,omitempty"`

// Runtimes contains the indexes of the runtimes to enable. Leave
// empty or nil for the default behaviour (i.e. include all runtimes).
Expand All @@ -353,6 +356,7 @@ func (f *StorageWorkerFixture) Create(net *Network) (*Storage, error) {
NodeCfg: NodeCfg{
AllowEarlyTermination: f.AllowEarlyTermination,
AllowErrorTermination: f.AllowErrorTermination,
NoAutoStart: f.NoAutoStart,
LogWatcherHandlerFactories: f.LogWatcherHandlerFactories,
Consensus: f.Consensus,
},
Expand All @@ -361,8 +365,11 @@ func (f *StorageWorkerFixture) Create(net *Network) (*Storage, error) {
SentryIndices: f.Sentries,
CheckpointCheckInterval: f.CheckpointCheckInterval,
IgnoreApplies: f.IgnoreApplies,
DisableCertRotation: f.DisableCertRotation,
Runtimes: f.Runtimes,
// The checkpoint syncing flas is intentionally flipped here.
// Syncing should normally be enabled, but normally disabled in tests.
CheckpointSyncDisabled: !f.CheckpointSyncEnabled,
DisableCertRotation: f.DisableCertRotation,
Runtimes: f.Runtimes,
})
}

Expand Down
9 changes: 8 additions & 1 deletion go/oasis-test-runner/oasis/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"github.com/oasisprotocol/oasis-core/go/oasis-test-runner/log"
roothash "github.com/oasisprotocol/oasis-core/go/roothash/api"
upgrade "github.com/oasisprotocol/oasis-core/go/upgrade/api"
workerStorage "github.com/oasisprotocol/oasis-core/go/worker/storage/committee"
)

// LogAssertEvent returns a handler which checks whether a specific log event was
Expand Down Expand Up @@ -81,8 +82,14 @@ func LogEventABCIPruneDelete() log.WatcherHandlerFactory {
return LogAssertEvent(abci.LogEventABCIPruneDelete, "expected ABCI pruning to be done")
}

// LogAssertRoothashRoothashReindexing returns a handler witch checks wether roothash reindexing was
// LogAssertRoothashRoothashReindexing returns a handler which checks whether roothash reindexing was
// run based on JSON log output.
func LogAssertRoothashRoothashReindexing() log.WatcherHandlerFactory {
return LogAssertEvent(roothash.LogEventHistoryReindexing, "roothash runtime reindexing not detected")
}

// LogAssertCheckpointSync returns a handler which checks whether initial storage sync from
// a checkpoint was successful or not.
func LogAssertCheckpointSync() log.WatcherHandlerFactory {
return LogAssertEvent(workerStorage.LogEventCheckpointSyncSuccess, "checkpoint sync did not succeed")
}
5 changes: 5 additions & 0 deletions go/oasis-test-runner/oasis/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ type Storage struct { // nolint: maligned

disableCertRotation bool
ignoreApplies bool
checkpointSyncDisabled bool
checkpointCheckInterval time.Duration

sentryPubKey signature.PublicKey
Expand All @@ -46,6 +47,7 @@ type StorageCfg struct { // nolint: maligned

DisableCertRotation bool
IgnoreApplies bool
CheckpointSyncDisabled bool
CheckpointCheckInterval time.Duration

Runtimes []int
Expand Down Expand Up @@ -116,6 +118,7 @@ func (worker *Storage) startNode() error {
workerP2pPort(worker.p2pPort).
workerStorageEnabled().
workerStorageDebugIgnoreApplies(worker.ignoreApplies).
workerStorageDebugDisableCheckpointSync(worker.checkpointSyncDisabled).
workerStorageCheckpointCheckInterval(worker.checkpointCheckInterval).
appendNetwork(worker.net).
appendEntity(worker.entity)
Expand Down Expand Up @@ -186,6 +189,7 @@ func (net *Network) NewStorage(cfg *StorageCfg) (*Storage, error) {
Name: storageName,
net: net,
dir: storageDir,
noAutoStart: cfg.NoAutoStart,
disableDefaultLogWatcherHandlerFactories: cfg.DisableDefaultLogWatcherHandlerFactories,
logWatcherHandlerFactories: cfg.LogWatcherHandlerFactories,
consensus: cfg.Consensus,
Expand All @@ -195,6 +199,7 @@ func (net *Network) NewStorage(cfg *StorageCfg) (*Storage, error) {
sentryIndices: cfg.SentryIndices,
disableCertRotation: cfg.DisableCertRotation,
ignoreApplies: cfg.IgnoreApplies,
checkpointSyncDisabled: cfg.CheckpointSyncDisabled,
checkpointCheckInterval: cfg.CheckpointCheckInterval,
sentryPubKey: sentryPubKey,
tmAddress: crypto.PublicKeyToTendermint(&p2pKey).Address().String(),
Expand Down
11 changes: 7 additions & 4 deletions go/oasis-test-runner/scenario/e2e/runtime/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,7 @@ func (sc *runtimeImpl) startClient(childEnv *env.Env) (*exec.Cmd, error) {
return cmd, nil
}

func (sc *runtimeImpl) wait(childEnv *env.Env, cmd *exec.Cmd, clientErrCh <-chan error) error {
func (sc *runtimeImpl) waitClient(childEnv *env.Env, cmd *exec.Cmd, clientErrCh <-chan error) error {
var err error
select {
case err = <-sc.Net.Errors():
Expand All @@ -313,11 +313,14 @@ func (sc *runtimeImpl) wait(childEnv *env.Env, cmd *exec.Cmd, clientErrCh <-chan
return err
}

if err = sc.Net.CheckLogWatchers(); err != nil {
return nil
}

func (sc *runtimeImpl) wait(childEnv *env.Env, cmd *exec.Cmd, clientErrCh <-chan error) error {
if err := sc.waitClient(childEnv, cmd, clientErrCh); err != nil {
return err
}

return nil
return sc.Net.CheckLogWatchers()
}

func (sc *runtimeImpl) Run(childEnv *env.Env) error {
Expand Down
41 changes: 38 additions & 3 deletions go/oasis-test-runner/scenario/e2e/runtime/storage_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@ package runtime
import (
"context"
"fmt"
"strings"
"time"

"github.com/oasisprotocol/oasis-core/go/oasis-test-runner/env"
"github.com/oasisprotocol/oasis-core/go/oasis-test-runner/log"
"github.com/oasisprotocol/oasis-core/go/oasis-test-runner/oasis"
"github.com/oasisprotocol/oasis-core/go/oasis-test-runner/oasis/cli"
"github.com/oasisprotocol/oasis-core/go/oasis-test-runner/scenario"
Expand Down Expand Up @@ -47,13 +49,23 @@ func (sc *storageSyncImpl) Fixture() (*oasis.NetworkFixture, error) {
// Configure runtime for storage checkpointing.
f.Runtimes[1].Storage.CheckpointInterval = 10
f.Runtimes[1].Storage.CheckpointNumKept = 1
f.Runtimes[1].Storage.CheckpointChunkSize = 1024 * 1024
f.Runtimes[1].Storage.CheckpointChunkSize = 1 * 1024
// Provision another storage node and make it ignore all applies.
f.StorageWorkers = append(f.StorageWorkers, oasis.StorageWorkerFixture{
Backend: database.BackendNameBadgerDB,
Entity: 1,
IgnoreApplies: true,
})

// One more storage worker for later, so it can do an initial sync with the snapshots.
f.StorageWorkers = append(f.StorageWorkers, oasis.StorageWorkerFixture{
Backend: database.BackendNameBadgerDB,
Entity: 1,
NoAutoStart: true,
CheckpointSyncEnabled: true,
LogWatcherHandlerFactories: []log.WatcherHandlerFactory{oasis.LogAssertCheckpointSync()},
})

return f, nil
}

Expand All @@ -64,7 +76,7 @@ func (sc *storageSyncImpl) Run(childEnv *env.Env) error {
}

// Wait for the client to exit.
if err = sc.wait(childEnv, cmd, clientErrCh); err != nil {
if err = sc.waitClient(childEnv, cmd, clientErrCh); err != nil {
return err
}

Expand Down Expand Up @@ -161,5 +173,28 @@ func (sc *storageSyncImpl) Run(childEnv *env.Env) error {
return fmt.Errorf("incorrect number of valid checkpoints (expected: >=2 got: %d)", validCps)
}

return nil
largeVal := strings.Repeat("has he his auto ", 7) // 16 bytes base string
for i := 0; i < 32; i++ {
sc.Logger.Info("submitting large transaction to runtime",
"seq", i,
)
if err = sc.submitKeyValueRuntimeInsertTx(ctx, runtimeID, fmt.Sprintf("%d key %d", i, i), fmt.Sprintf("my cp %d: ", i)+largeVal); err != nil {
return err
}
}

// Now spin up the last storage worker and check if it syncs with a checkpoint.
lateWorker := sc.Net.StorageWorkers()[3]
err = lateWorker.Start()
if err != nil {
return fmt.Errorf("can't start last storage worker: %w", err)
}
if err := lateWorker.WaitReady(ctx); err != nil {
return fmt.Errorf("erorr waiting for late storage worker to become ready: %w", err)
}
// Wait a bit to give the logger in the node time to sync; the message has already been
// logged by this point, it just might not be on disk yet.
<-time.After(1 * time.Second)

return sc.Net.CheckLogWatchers()
}
8 changes: 8 additions & 0 deletions go/oasis-test-runner/scenario/e2e/runtime/txsource.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,11 @@ func (sc *txSourceImpl) Fixture() (*oasis.NetworkFixture, error) {
// Disable CheckTx on the client node so we can submit invalid transactions.
f.Clients[0].Consensus.DisableCheckTx = true

// Set up checkpointing.
f.Runtimes[1].Storage.CheckpointInterval = 1000
f.Runtimes[1].Storage.CheckpointNumKept = 2
f.Runtimes[1].Storage.CheckpointChunkSize = 1024 * 1024

// Use at least 4 validators so that consensus can keep making progress
// when a node is being killed and restarted.
f.Validators = []oasis.ValidatorFixture{
Expand Down Expand Up @@ -283,6 +288,9 @@ func (sc *txSourceImpl) Fixture() (*oasis.NetworkFixture, error) {
for i := range f.StorageWorkers {
f.StorageWorkers[i].Consensus.SubmissionGasPrice = txSourceGasPrice
sc.generateConsensusFixture(&f.StorageWorkers[i].Consensus)
if i > 0 {
f.StorageWorkers[i].CheckpointSyncEnabled = true
}
}
for i := range f.ComputeWorkers {
f.ComputeWorkers[i].Consensus.SubmissionGasPrice = txSourceGasPrice
Expand Down
3 changes: 2 additions & 1 deletion go/registry/api/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/oasisprotocol/oasis-core/go/common/quantity"
"github.com/oasisprotocol/oasis-core/go/common/sgx"
"github.com/oasisprotocol/oasis-core/go/common/version"
"github.com/oasisprotocol/oasis-core/go/oasis-node/cmd/common/flags"
staking "github.com/oasisprotocol/oasis-core/go/staking/api"
storage "github.com/oasisprotocol/oasis-core/go/storage/api"
)
Expand Down Expand Up @@ -198,7 +199,7 @@ func (s *StorageParameters) ValidateBasic() error {
}

// Verify storage checkpointing configuration if enabled.
if s.CheckpointInterval > 0 {
if s.CheckpointInterval > 0 && !flags.DebugDontBlameOasis() {
if s.CheckpointInterval < 10 {
return fmt.Errorf("storage CheckpointInterval parameter too small")
}
Expand Down
8 changes: 8 additions & 0 deletions go/runtime/committee/committee.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

"github.com/oasisprotocol/oasis-core/go/common"
"github.com/oasisprotocol/oasis-core/go/common/crypto/hash"
"github.com/oasisprotocol/oasis-core/go/common/crypto/signature"
"github.com/oasisprotocol/oasis-core/go/common/logging"
"github.com/oasisprotocol/oasis-core/go/common/pubsub"
registry "github.com/oasisprotocol/oasis-core/go/registry/api"
Expand Down Expand Up @@ -177,6 +178,13 @@ func WithAutomaticEpochTransitions() WatcherOption {
// should be excluded.
type Filter func(*scheduler.CommitteeNode) bool

// IgnoreNodeFilter is a committee watcher filter that filters out nodes based on their public key.
func IgnoreNodeFilter(pk signature.PublicKey) Filter {
return func(cn *scheduler.CommitteeNode) bool {
return !cn.PublicKey.Equal(pk)
}
}

// WithFilter is an option that adds a given filter to the committee watcher.
func WithFilter(f Filter) WatcherOption {
return func(cw *committeeWatcher) {
Expand Down
2 changes: 1 addition & 1 deletion go/storage/api/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -478,7 +478,7 @@ func (c *storageClient) GetCheckpointChunk(ctx context.Context, chunk *checkpoin

for {
var part []byte
switch stream.RecvMsg(&part) {
switch err = stream.RecvMsg(&part); err {
jberci marked this conversation as resolved.
Show resolved Hide resolved
case nil:
case io.EOF:
return nil
Expand Down
4 changes: 3 additions & 1 deletion go/storage/client/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,14 +47,16 @@ func New(
schedulerBackend scheduler.Backend,
registryBackend registry.Backend,
runtime registry.RuntimeDescriptorProvider,
extraWatcherOpts ...committee.WatcherOption,
) (api.Backend, error) {
watcherOpts := append(extraWatcherOpts, committee.WithAutomaticEpochTransitions())
committeeWatcher, err := committee.NewWatcher(
ctx,
schedulerBackend,
registryBackend,
namespace,
scheduler.KindStorage,
committee.WithAutomaticEpochTransitions(),
watcherOpts...,
)
if err != nil {
return nil, fmt.Errorf("storage/client: failed to create committee watcher: %w", err)
Expand Down
3 changes: 3 additions & 0 deletions go/storage/mkvs/checkpoint/checkpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,9 @@ type Restorer interface {
// StartRestore starts a checkpoint restoration process.
StartRestore(ctx context.Context, checkpoint *Metadata) error

// AbortRestore aborts a checkpoint restore in progress.
AbortRestore(ctx context.Context) error

// GetCurrentCheckpoint returns the checkpoint that is being restored. If no restoration is in
// progress, this method may return nil.
GetCurrentCheckpoint() *Metadata
Expand Down
16 changes: 16 additions & 0 deletions go/storage/mkvs/checkpoint/restorer.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,22 @@ func (rs *restorer) StartRestore(ctx context.Context, checkpoint *Metadata) erro
return nil
}

func (rs *restorer) AbortRestore(ctx context.Context) error {
rs.Lock()
defer rs.Unlock()

if rs.currentCheckpoint == nil {
return ErrNoRestoreInProgress
}

rs.pendingChunks = nil
rs.currentCheckpoint = nil

// TODO: also properly remove leftover nodes from the database (#3241).

return nil
}

func (rs *restorer) GetCurrentCheckpoint() *Metadata {
rs.Lock()
defer rs.Unlock()
Expand Down
Loading