Skip to content

Commit

Permalink
go/worker/storage: Add initial sync from checkpoints
Browse files Browse the repository at this point in the history
  • Loading branch information
jberci committed Sep 3, 2020
1 parent 85640b0 commit 2e5bda6
Show file tree
Hide file tree
Showing 14 changed files with 605 additions and 17 deletions.
4 changes: 4 additions & 0 deletions .changelog/3181.internal.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
go/worker/storage: Add initial sync from checkpoints

Instead of relying on the slow per-block root sync, the worker now tries
syncing from checkpoints, if any suitable are found.
5 changes: 5 additions & 0 deletions go/oasis-test-runner/oasis/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
runtimeClient "github.com/oasisprotocol/oasis-core/go/runtime/client/api"
staking "github.com/oasisprotocol/oasis-core/go/staking/api"
storage "github.com/oasisprotocol/oasis-core/go/storage/api"
workerStorage "github.com/oasisprotocol/oasis-core/go/worker/storage/api"
)

// Controller is a network controller that connects to one of the
Expand All @@ -26,6 +27,8 @@ type Controller struct {
Storage storage.Backend
Keymanager *keymanager.KeymanagerClient

WorkerStorage workerStorage.StorageWorker

conn *grpc.ClientConn
}

Expand Down Expand Up @@ -56,6 +59,8 @@ func NewController(socketPath string) (*Controller, error) {
Storage: storage.NewStorageClient(conn),
Keymanager: keymanager.NewKeymanagerClient(conn),

WorkerStorage: workerStorage.NewStorageWorkerClient(conn),

conn: conn,
}, nil
}
3 changes: 3 additions & 0 deletions go/oasis-test-runner/oasis/fixture.go
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,8 @@ type StorageWorkerFixture struct { // nolint: maligned
AllowEarlyTermination bool `json:"allow_early_termination"`
AllowErrorTermination bool `json:"allow_error_termination"`

NoAutoStart bool `json:"no_auto_start,omitempty"`

DisableCertRotation bool `json:"disable_cert_rotation"`

LogWatcherHandlerFactories []log.WatcherHandlerFactory `json:"-"`
Expand Down Expand Up @@ -353,6 +355,7 @@ func (f *StorageWorkerFixture) Create(net *Network) (*Storage, error) {
NodeCfg: NodeCfg{
AllowEarlyTermination: f.AllowEarlyTermination,
AllowErrorTermination: f.AllowErrorTermination,
NoAutoStart: f.NoAutoStart,
LogWatcherHandlerFactories: f.LogWatcherHandlerFactories,
Consensus: f.Consensus,
},
Expand Down
9 changes: 8 additions & 1 deletion go/oasis-test-runner/oasis/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"github.com/oasisprotocol/oasis-core/go/oasis-test-runner/log"
roothash "github.com/oasisprotocol/oasis-core/go/roothash/api"
upgrade "github.com/oasisprotocol/oasis-core/go/upgrade/api"
workerStorage "github.com/oasisprotocol/oasis-core/go/worker/storage/committee"
)

// LogAssertEvent returns a handler which checks whether a specific log event was
Expand Down Expand Up @@ -81,8 +82,14 @@ func LogEventABCIPruneDelete() log.WatcherHandlerFactory {
return LogAssertEvent(abci.LogEventABCIPruneDelete, "expected ABCI pruning to be done")
}

// LogAssertRoothashRoothashReindexing returns a handler witch checks wether roothash reindexing was
// LogAssertRoothashRoothashReindexing returns a handler which checks whether roothash reindexing was
// run based on JSON log output.
func LogAssertRoothashRoothashReindexing() log.WatcherHandlerFactory {
return LogAssertEvent(roothash.LogEventHistoryReindexing, "roothash runtime reindexing not detected")
}

// LogAssertCheckpointSync returns a handler which checks whether initial storage sync from
// a checkpoint was successful or not.
func LogAssertCheckpointSync() log.WatcherHandlerFactory {
return LogAssertEvent(workerStorage.LogEventCheckpointSyncSuccess, "checkpoint sync did not succeed")
}
1 change: 1 addition & 0 deletions go/oasis-test-runner/oasis/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,7 @@ func (net *Network) NewStorage(cfg *StorageCfg) (*Storage, error) {
Name: storageName,
net: net,
dir: storageDir,
noAutoStart: cfg.NoAutoStart,
disableDefaultLogWatcherHandlerFactories: cfg.DisableDefaultLogWatcherHandlerFactories,
logWatcherHandlerFactories: cfg.LogWatcherHandlerFactories,
consensus: cfg.Consensus,
Expand Down
11 changes: 7 additions & 4 deletions go/oasis-test-runner/scenario/e2e/runtime/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,7 @@ func (sc *runtimeImpl) startClient(childEnv *env.Env) (*exec.Cmd, error) {
return cmd, nil
}

func (sc *runtimeImpl) wait(childEnv *env.Env, cmd *exec.Cmd, clientErrCh <-chan error) error {
func (sc *runtimeImpl) waitClient(childEnv *env.Env, cmd *exec.Cmd, clientErrCh <-chan error) error {
var err error
select {
case err = <-sc.Net.Errors():
Expand All @@ -313,11 +313,14 @@ func (sc *runtimeImpl) wait(childEnv *env.Env, cmd *exec.Cmd, clientErrCh <-chan
return err
}

if err = sc.Net.CheckLogWatchers(); err != nil {
return nil
}

func (sc *runtimeImpl) wait(childEnv *env.Env, cmd *exec.Cmd, clientErrCh <-chan error) error {
if err := sc.waitClient(childEnv, cmd, clientErrCh); err != nil {
return err
}

return nil
return sc.Net.CheckLogWatchers()
}

func (sc *runtimeImpl) Run(childEnv *env.Env) error {
Expand Down
46 changes: 43 additions & 3 deletions go/oasis-test-runner/scenario/e2e/runtime/storage_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,18 @@ package runtime
import (
"context"
"fmt"
"strings"
"time"

"github.com/oasisprotocol/oasis-core/go/oasis-test-runner/env"
"github.com/oasisprotocol/oasis-core/go/oasis-test-runner/log"
"github.com/oasisprotocol/oasis-core/go/oasis-test-runner/oasis"
"github.com/oasisprotocol/oasis-core/go/oasis-test-runner/oasis/cli"
"github.com/oasisprotocol/oasis-core/go/oasis-test-runner/scenario"
runtimeClient "github.com/oasisprotocol/oasis-core/go/runtime/client/api"
"github.com/oasisprotocol/oasis-core/go/storage/database"
"github.com/oasisprotocol/oasis-core/go/storage/mkvs/checkpoint"
workerStorage "github.com/oasisprotocol/oasis-core/go/worker/storage/api"
)

// StorageSync is the storage sync scenario.
Expand Down Expand Up @@ -47,13 +50,22 @@ func (sc *storageSyncImpl) Fixture() (*oasis.NetworkFixture, error) {
// Configure runtime for storage checkpointing.
f.Runtimes[1].Storage.CheckpointInterval = 10
f.Runtimes[1].Storage.CheckpointNumKept = 1
f.Runtimes[1].Storage.CheckpointChunkSize = 1024 * 1024
f.Runtimes[1].Storage.CheckpointChunkSize = 1 * 1024
// Provision another storage node and make it ignore all applies.
f.StorageWorkers = append(f.StorageWorkers, oasis.StorageWorkerFixture{
Backend: database.BackendNameBadgerDB,
Entity: 1,
IgnoreApplies: true,
})

// One more storage worker for later, so it can do an initial sync with the snapshots.
f.StorageWorkers = append(f.StorageWorkers, oasis.StorageWorkerFixture{
Backend: database.BackendNameBadgerDB,
Entity: 1,
NoAutoStart: true,
LogWatcherHandlerFactories: []log.WatcherHandlerFactory{oasis.LogAssertCheckpointSync()},
})

return f, nil
}

Expand All @@ -64,7 +76,7 @@ func (sc *storageSyncImpl) Run(childEnv *env.Env) error {
}

// Wait for the client to exit.
if err = sc.wait(childEnv, cmd, clientErrCh); err != nil {
if err = sc.waitClient(childEnv, cmd, clientErrCh); err != nil {
return err
}

Expand Down Expand Up @@ -161,5 +173,33 @@ func (sc *storageSyncImpl) Run(childEnv *env.Env) error {
return fmt.Errorf("incorrect number of valid checkpoints (expected: >=2 got: %d)", validCps)
}

return nil
largeVal := strings.Repeat("has he his auto ", 7) // 16 bytes base string
for i := 0; i < 32; i++ {
sc.Logger.Info("submitting large transaction to runtime",
"seq", i,
)
if err = sc.submitKeyValueRuntimeInsertTx(ctx, runtimeID, fmt.Sprintf("%d key %d", i, i), fmt.Sprintf("my cp %d: ", i)+largeVal); err != nil {
return err
}
}

// Now spin up the last storage worker and check if it syncs with a checkpoint.
lateWorker := sc.Net.StorageWorkers()[3]
err = lateWorker.Start()
if err != nil {
return fmt.Errorf("can't start last storage worker: %w", err)
}
lateCtrl, err := oasis.NewController(lateWorker.SocketPath())
if err != nil {
return fmt.Errorf("failed to connect with the late storage node: %w", err)
}
err = lateCtrl.WorkerStorage.WaitCheckpointSynced(ctx, &workerStorage.WaitCheckpointSyncedRequest{RuntimeID: runtimeID})
if err != nil {
return fmt.Errorf("error waiting for late worker to sync checkpoints: %w", err)
}
// Wait a bit to give the logger in the node time to sync; the message has already been
// logged by this point, it just might not be on disk yet.
<-time.After(1 * time.Second)

return sc.Net.CheckLogWatchers()
}
3 changes: 2 additions & 1 deletion go/registry/api/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/oasisprotocol/oasis-core/go/common/quantity"
"github.com/oasisprotocol/oasis-core/go/common/sgx"
"github.com/oasisprotocol/oasis-core/go/common/version"
"github.com/oasisprotocol/oasis-core/go/oasis-node/cmd/common/flags"
staking "github.com/oasisprotocol/oasis-core/go/staking/api"
storage "github.com/oasisprotocol/oasis-core/go/storage/api"
)
Expand Down Expand Up @@ -198,7 +199,7 @@ func (s *StorageParameters) ValidateBasic() error {
}

// Verify storage checkpointing configuration if enabled.
if s.CheckpointInterval > 0 {
if s.CheckpointInterval > 0 && !flags.DebugDontBlameOasis() {
if s.CheckpointInterval < 10 {
return fmt.Errorf("storage CheckpointInterval parameter too small")
}
Expand Down
2 changes: 1 addition & 1 deletion go/storage/api/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -478,7 +478,7 @@ func (c *storageClient) GetCheckpointChunk(ctx context.Context, chunk *checkpoin

for {
var part []byte
switch stream.RecvMsg(&part) {
switch err = stream.RecvMsg(&part); err {
case nil:
case io.EOF:
return nil
Expand Down
8 changes: 8 additions & 0 deletions go/worker/storage/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ type StorageWorker interface {

// ForceFinalize forces finalization of a specific round.
ForceFinalize(ctx context.Context, request *ForceFinalizeRequest) error

// WaitCheckpointSynced waits for the storage worker to finish syncing from checkpoints.
WaitCheckpointSynced(ctx context.Context, request *WaitCheckpointSyncedRequest) error
}

// GetLastSyncedRoundRequest is a GetLastSyncedRound request.
Expand All @@ -40,3 +43,8 @@ type ForceFinalizeRequest struct {
RuntimeID common.Namespace `json:"runtime_id"`
Round uint64 `json:"round"`
}

// WaitCheckpointSyncedRequest is a WaitCheckpointSynced request.
type WaitCheckpointSyncedRequest struct {
RuntimeID common.Namespace `json:"runtime_id"`
}
33 changes: 33 additions & 0 deletions go/worker/storage/api/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ var (
methodGetLastSyncedRound = serviceName.NewMethod("GetLastSyncedRound", &GetLastSyncedRoundRequest{})
// methodForceFinalize is the ForceFinalize method.
methodForceFinalize = serviceName.NewMethod("ForceFinalize", &ForceFinalizeRequest{})
// methodWaitCheckpointSynced is the WaitCheckpointSynced method.
methodWaitCheckpointSynced = serviceName.NewMethod("WaitCheckpointSynced", &WaitCheckpointSyncedRequest{})

// serviceDesc is the gRPC service descriptor.
serviceDesc = grpc.ServiceDesc{
Expand All @@ -30,6 +32,10 @@ var (
MethodName: methodForceFinalize.ShortName(),
Handler: handlerForceFinalize,
},
{
MethodName: methodWaitCheckpointSynced.ShortName(),
Handler: handlerWaitCheckpointSynced,
},
},
Streams: []grpc.StreamDesc{},
}
Expand Down Expand Up @@ -81,6 +87,29 @@ func handlerForceFinalize( // nolint: golint
return interceptor(ctx, rq, info, handler)
}

func handlerWaitCheckpointSynced( // nolint: golint
srv interface{},
ctx context.Context,
dec func(interface{}) error,
interceptor grpc.UnaryServerInterceptor,
) (interface{}, error) {
rq := new(WaitCheckpointSyncedRequest)
if err := dec(rq); err != nil {
return nil, err
}
if interceptor == nil {
return nil, srv.(StorageWorker).WaitCheckpointSynced(ctx, rq)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: methodWaitCheckpointSynced.FullName(),
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return nil, srv.(StorageWorker).WaitCheckpointSynced(ctx, req.(*WaitCheckpointSyncedRequest))
}
return interceptor(ctx, rq, info, handler)
}

// RegisterService registers a new storage worker service with the given gRPC server.
func RegisterService(server *grpc.Server, service StorageWorker) {
server.RegisterService(&serviceDesc, service)
Expand All @@ -102,6 +131,10 @@ func (c *storageWorkerClient) ForceFinalize(ctx context.Context, req *ForceFinal
return c.conn.Invoke(ctx, methodForceFinalize.FullName(), req, nil)
}

func (c *storageWorkerClient) WaitCheckpointSynced(ctx context.Context, req *WaitCheckpointSyncedRequest) error {
return c.conn.Invoke(ctx, methodWaitCheckpointSynced.FullName(), req, nil)
}

// NewStorageWorkerClient creates a new gRPC transaction scheduler
// client service.
func NewStorageWorkerClient(c *grpc.ClientConn) StorageWorker {
Expand Down
Loading

0 comments on commit 2e5bda6

Please sign in to comment.