diff --git a/.changelog/3181.internal.md b/.changelog/3181.internal.md new file mode 100644 index 00000000000..fc0de722014 --- /dev/null +++ b/.changelog/3181.internal.md @@ -0,0 +1,4 @@ +go/worker/storage: Add initial sync from checkpoints + +Instead of relying on the slow per-block root sync, the worker now tries +syncing from checkpoints, if any suitable are found. diff --git a/go/oasis-test-runner/oasis/fixture.go b/go/oasis-test-runner/oasis/fixture.go index 31205e920ab..f381bc15a59 100644 --- a/go/oasis-test-runner/oasis/fixture.go +++ b/go/oasis-test-runner/oasis/fixture.go @@ -325,6 +325,8 @@ type StorageWorkerFixture struct { // nolint: maligned AllowEarlyTermination bool `json:"allow_early_termination"` AllowErrorTermination bool `json:"allow_error_termination"` + NoAutoStart bool `json:"no_auto_start,omitempty"` + DisableCertRotation bool `json:"disable_cert_rotation"` LogWatcherHandlerFactories []log.WatcherHandlerFactory `json:"-"` @@ -353,6 +355,7 @@ func (f *StorageWorkerFixture) Create(net *Network) (*Storage, error) { NodeCfg: NodeCfg{ AllowEarlyTermination: f.AllowEarlyTermination, AllowErrorTermination: f.AllowErrorTermination, + NoAutoStart: f.NoAutoStart, LogWatcherHandlerFactories: f.LogWatcherHandlerFactories, Consensus: f.Consensus, }, diff --git a/go/oasis-test-runner/oasis/log.go b/go/oasis-test-runner/oasis/log.go index 05c8fd4ca99..f7dbaeb1526 100644 --- a/go/oasis-test-runner/oasis/log.go +++ b/go/oasis-test-runner/oasis/log.go @@ -7,6 +7,7 @@ import ( "github.com/oasisprotocol/oasis-core/go/oasis-test-runner/log" roothash "github.com/oasisprotocol/oasis-core/go/roothash/api" upgrade "github.com/oasisprotocol/oasis-core/go/upgrade/api" + workerStorage "github.com/oasisprotocol/oasis-core/go/worker/storage/committee" ) // LogAssertEvent returns a handler which checks whether a specific log event was @@ -75,8 +76,14 @@ func LogEventABCIPruneDelete() log.WatcherHandlerFactory { return LogAssertEvent(abci.LogEventABCIPruneDelete, "expected ABCI pruning to be done") } -// LogAssertRoothashRoothashReindexing returns a handler witch checks wether roothash reindexing was +// LogAssertRoothashRoothashReindexing returns a handler which checks whether roothash reindexing was // run based on JSON log output. func LogAssertRoothashRoothashReindexing() log.WatcherHandlerFactory { return LogAssertEvent(roothash.LogEventHistoryReindexing, "roothash runtime reindexing not detected") } + +// LogAssertCheckpointSyncreturns a handler which checks whether initial storage sync from +// a checkpoint was successful or not. +func LogAssertCheckpointSync() log.WatcherHandlerFactory { + return LogAssertEvent(workerStorage.LogEventCheckpointSyncSuccess, "checkpoint sync did not succeed") +} diff --git a/go/oasis-test-runner/oasis/storage.go b/go/oasis-test-runner/oasis/storage.go index 6d531cd50af..829dad6cef3 100644 --- a/go/oasis-test-runner/oasis/storage.go +++ b/go/oasis-test-runner/oasis/storage.go @@ -186,6 +186,7 @@ func (net *Network) NewStorage(cfg *StorageCfg) (*Storage, error) { Name: storageName, net: net, dir: storageDir, + noAutoStart: cfg.NoAutoStart, disableDefaultLogWatcherHandlerFactories: cfg.DisableDefaultLogWatcherHandlerFactories, logWatcherHandlerFactories: cfg.LogWatcherHandlerFactories, consensus: cfg.Consensus, diff --git a/go/oasis-test-runner/scenario/e2e/runtime/runtime.go b/go/oasis-test-runner/scenario/e2e/runtime/runtime.go index ede328f56cd..860bb8e2668 100644 --- a/go/oasis-test-runner/scenario/e2e/runtime/runtime.go +++ b/go/oasis-test-runner/scenario/e2e/runtime/runtime.go @@ -301,7 +301,7 @@ func (sc *runtimeImpl) startClient(childEnv *env.Env) (*exec.Cmd, error) { return cmd, nil } -func (sc *runtimeImpl) wait(childEnv *env.Env, cmd *exec.Cmd, clientErrCh <-chan error) error { +func (sc *runtimeImpl) waitClient(childEnv *env.Env, cmd *exec.Cmd, clientErrCh <-chan error) error { var err error select { case err = <-sc.Net.Errors(): @@ -312,11 +312,14 @@ func (sc *runtimeImpl) wait(childEnv *env.Env, cmd *exec.Cmd, clientErrCh <-chan return err } - if err = sc.Net.CheckLogWatchers(); err != nil { + return nil +} + +func (sc *runtimeImpl) wait(childEnv *env.Env, cmd *exec.Cmd, clientErrCh <-chan error) error { + if err := sc.waitClient(childEnv, cmd, clientErrCh); err != nil { return err } - - return nil + return sc.Net.CheckLogWatchers() } func (sc *runtimeImpl) Run(childEnv *env.Env) error { diff --git a/go/oasis-test-runner/scenario/e2e/runtime/storage_sync.go b/go/oasis-test-runner/scenario/e2e/runtime/storage_sync.go index c7b07141397..580930e6535 100644 --- a/go/oasis-test-runner/scenario/e2e/runtime/storage_sync.go +++ b/go/oasis-test-runner/scenario/e2e/runtime/storage_sync.go @@ -3,9 +3,11 @@ package runtime import ( "context" "fmt" + "strings" "time" "github.com/oasisprotocol/oasis-core/go/oasis-test-runner/env" + "github.com/oasisprotocol/oasis-core/go/oasis-test-runner/log" "github.com/oasisprotocol/oasis-core/go/oasis-test-runner/oasis" "github.com/oasisprotocol/oasis-core/go/oasis-test-runner/oasis/cli" "github.com/oasisprotocol/oasis-core/go/oasis-test-runner/scenario" @@ -47,13 +49,22 @@ func (sc *storageSyncImpl) Fixture() (*oasis.NetworkFixture, error) { // Configure runtime for storage checkpointing. f.Runtimes[1].Storage.CheckpointInterval = 10 f.Runtimes[1].Storage.CheckpointNumKept = 1 - f.Runtimes[1].Storage.CheckpointChunkSize = 1024 * 1024 + f.Runtimes[1].Storage.CheckpointChunkSize = 1 * 1024 // Provision another storage node and make it ignore all applies. f.StorageWorkers = append(f.StorageWorkers, oasis.StorageWorkerFixture{ Backend: database.BackendNameBadgerDB, Entity: 1, IgnoreApplies: true, }) + + // One more storage worker for later, so it can do an initial sync with the snapshots. + f.StorageWorkers = append(f.StorageWorkers, oasis.StorageWorkerFixture{ + Backend: database.BackendNameBadgerDB, + Entity: 1, + NoAutoStart: true, + LogWatcherHandlerFactories: []log.WatcherHandlerFactory{oasis.LogAssertCheckpointSync()}, + }) + return f, nil } @@ -64,7 +75,7 @@ func (sc *storageSyncImpl) Run(childEnv *env.Env) error { } // Wait for the client to exit. - if err = sc.wait(childEnv, cmd, clientErrCh); err != nil { + if err = sc.waitClient(childEnv, cmd, clientErrCh); err != nil { return err } @@ -161,5 +172,29 @@ func (sc *storageSyncImpl) Run(childEnv *env.Env) error { return fmt.Errorf("incorrect number of valid checkpoints (expected: >=2 got: %d)", validCps) } - return nil + largeVal := strings.Repeat("has he his auto ", 7) // 16 bytes base string + for i := 0; i < 32; i++ { + sc.Logger.Info("submitting large transaction to runtime", + "seq", i, + ) + if err = sc.submitKeyValueRuntimeInsertTx(ctx, runtimeID, fmt.Sprintf("%d key %d", i, i), fmt.Sprintf("my cp %d: ", i)+largeVal); err != nil { + return err + } + } + + // Now spin up the last storage worker and check if it syncs with a checkpoint. + lateWorker := sc.Net.StorageWorkers()[3] + err = lateWorker.Start() + if err != nil { + return fmt.Errorf("can't start last storage worker: %w", err) + } + err = lateWorker.WaitReady(ctx) + if err != nil { + return fmt.Errorf("error waiting for last storage worker to become ready: %w", err) + } + // Wait a bit to give the logger in the node time to sync; the message has already been + // logged by this point, it just might not be on disk yet. + <-time.After(1 * time.Second) + + return sc.Net.CheckLogWatchers() } diff --git a/go/registry/api/runtime.go b/go/registry/api/runtime.go index 64b8f5702d8..27380e20e8b 100644 --- a/go/registry/api/runtime.go +++ b/go/registry/api/runtime.go @@ -17,6 +17,7 @@ import ( "github.com/oasisprotocol/oasis-core/go/common/quantity" "github.com/oasisprotocol/oasis-core/go/common/sgx" "github.com/oasisprotocol/oasis-core/go/common/version" + "github.com/oasisprotocol/oasis-core/go/oasis-node/cmd/common/flags" staking "github.com/oasisprotocol/oasis-core/go/staking/api" storage "github.com/oasisprotocol/oasis-core/go/storage/api" ) @@ -190,7 +191,7 @@ func (s *StorageParameters) ValidateBasic() error { } // Verify storage checkpointing configuration if enabled. - if s.CheckpointInterval > 0 { + if s.CheckpointInterval > 0 && !flags.DebugDontBlameOasis() { if s.CheckpointInterval < 10 { return fmt.Errorf("storage CheckpointInterval parameter too small") } diff --git a/go/worker/storage/committee/checkpoint_sync.go b/go/worker/storage/committee/checkpoint_sync.go new file mode 100644 index 00000000000..b8a6c0df046 --- /dev/null +++ b/go/worker/storage/committee/checkpoint_sync.go @@ -0,0 +1,420 @@ +package committee + +import ( + "bytes" + "container/heap" + "context" + "errors" + "fmt" + "io" + "sort" + "sync" + "time" + + "github.com/cenkalti/backoff/v4" + + "github.com/oasisprotocol/oasis-core/go/runtime/committee" + schedulerApi "github.com/oasisprotocol/oasis-core/go/scheduler/api" + storageApi "github.com/oasisprotocol/oasis-core/go/storage/api" + storageClient "github.com/oasisprotocol/oasis-core/go/storage/client" + "github.com/oasisprotocol/oasis-core/go/storage/mkvs/checkpoint" +) + +const ( + retryInterval = 1 * time.Second + maxRetries = 30 + + checkpointStatusDone = 0 + checkpointStatusNext = 1 + checkpointStatusBail = 2 + + // LogEventCheckpointSyncSuccess is a log event value that signals that checkpoint sync was successful. + LogEventCheckpointSyncSuccess = "worker/storage/checkpoint-sync-success" +) + +// ErrNoUsableCheckpoints is the error returned when none of the checkpoints could be synced. +var ErrNoUsableCheckpoints = errors.New("storage: no checkpoint could be synced") + +type restoreResult struct { + done bool + err error +} + +type chunkHeap struct { + array []*checkpoint.ChunkMetadata + length int +} + +func (h chunkHeap) Len() int { return h.length } +func (h chunkHeap) Less(i, j int) bool { return h.array[i].Index < h.array[j].Index } +func (h chunkHeap) Swap(i, j int) { h.array[i], h.array[j] = h.array[j], h.array[i] } + +func (h *chunkHeap) Push(x interface{}) { + h.array[h.length] = x.(*checkpoint.ChunkMetadata) + h.length++ +} + +func (h *chunkHeap) Pop() interface{} { + h.length-- + ret := h.array[h.length] + h.array[h.length] = nil + return ret +} + +// goWithCommittee runs the given operation with all the connections to the storage committee. +func (n *Node) goWithCommittee( + committeeClient committee.Client, + fn func(context.Context, *committee.ClientConnWithMeta) error, +) ( + context.CancelFunc, + chan interface{}, + error, +) { + connCh := make(chan []*committee.ClientConnWithMeta) + connGetter := func() error { + conns := committeeClient.GetConnectionsWithMeta() + if len(conns) == 0 { + return storageClient.ErrStorageNotAvailable + } + connCh <- conns + return nil + } + go func() { + sched := backoff.WithMaxRetries(backoff.NewConstantBackOff(retryInterval), maxRetries) + _ = backoff.Retry(connGetter, backoff.WithContext(sched, n.ctx)) + close(connCh) + }() + conns, ok := <-connCh + if !ok || len(conns) == 0 { + return nil, nil, storageClient.ErrStorageNotAvailable + } + + workerCtx, workerCancel := context.WithCancel(n.ctx) + var workerGroup sync.WaitGroup + doneCh := make(chan interface{}) + + for _, conn := range conns { + workerGroup.Add(1) + go func(conn *committee.ClientConnWithMeta) { + defer workerGroup.Done() + op := func() error { + return fn(workerCtx, conn) + } + sched := backoff.WithMaxRetries(backoff.NewConstantBackOff(retryInterval), maxRetries) + _ = backoff.Retry(op, backoff.WithContext(sched, workerCtx)) + }(conn) + } + go func() { + defer close(doneCh) + workerGroup.Wait() + }() + + return workerCancel, doneCh, nil +} + +func (n *Node) nodeWorker( + ctx context.Context, + conn *committee.ClientConnWithMeta, + chunkDispatchCh chan *checkpoint.ChunkMetadata, + chunkReturnCh chan *checkpoint.ChunkMetadata, + errorCh chan int, +) error { + api := storageApi.NewStorageClient(conn.ClientConn) + for { + var chunk *checkpoint.ChunkMetadata + var ok bool + select { + case <-ctx.Done(): + return backoff.Permanent(ctx.Err()) + case chunk, ok = <-chunkDispatchCh: + if !ok { + return nil + } + } + + restoreCh := make(chan *restoreResult) + rd, wr := io.Pipe() + go func() { + done, err := n.localStorage.Checkpointer().RestoreChunk(ctx, chunk.Index, rd) + restoreCh <- &restoreResult{ + done: done, + err: err, + } + }() + err := api.GetCheckpointChunk(ctx, chunk, wr) + wr.Close() + result := <-restoreCh + + // GetCheckpointChunk errors. + switch { + case err == nil: + case errors.Is(err, checkpoint.ErrChunkNotFound): + chunkReturnCh <- chunk + return backoff.Permanent(err) + default: + n.logger.Error("can't fetch chunk from storage node", "node", conn.Node.ID, "chunk", chunk.Index) + return err + } + + // RestoreChunk errors. + switch { + case result.done: + // Signal to the toplevel handler that we're done. + chunkReturnCh <- nil + return nil + case result.err != nil: + n.logger.Error("chunk restoration failed", "node", conn.Node.ID, "chunk", chunk.Index, "err", result.err) + fallthrough + case errors.Is(result.err, checkpoint.ErrChunkCorrupted): + chunkReturnCh <- chunk + return result.err + case errors.Is(result.err, checkpoint.ErrChunkProofVerificationFailed): + errorCh <- checkpointStatusNext + return backoff.Permanent(result.err) + case result.err != nil: + errorCh <- checkpointStatusBail + return backoff.Permanent(result.err) + } + } +} + +func (n *Node) handleCheckpoint(check *checkpoint.Metadata, committeeClient committee.Client, groupSize uint64) (int, error) { + chunkDispatchCh := make(chan *checkpoint.ChunkMetadata) + defer close(chunkDispatchCh) + + chunkReturnCh := make(chan *checkpoint.ChunkMetadata, groupSize) + errorCh := make(chan int, groupSize) + + worker := func(ctx context.Context, conn *committee.ClientConnWithMeta) error { + return n.nodeWorker(ctx, conn, chunkDispatchCh, chunkReturnCh, errorCh) + } + + cancel, doneCh, err := n.goWithCommittee(committeeClient, worker) + if err != nil { + return checkpointStatusBail, fmt.Errorf("can't fetch chunks from committee nodes: %w", err) + } + defer cancel() + + if err := n.localStorage.Checkpointer().StartRestore(n.ctx, check); err != nil { + return checkpointStatusNext, fmt.Errorf("can't start checkpoint restore: %w", err) + } + + // Prepare the heap of chunks. + chunks := &chunkHeap{ + array: make([]*checkpoint.ChunkMetadata, len(check.Chunks)), + length: 0, + } + heap.Init(chunks) + + for i, c := range check.Chunks { + heap.Push(chunks, &checkpoint.ChunkMetadata{ + Version: 1, + Index: uint64(i), + Digest: c, + Root: check.Root, + }) + } + n.logger.Debug("checkpoint chunks prepared for dispatch", + "chunks", len(check.Chunks), + "checkpoint_root", check.Root, + ) + + // Feed the workers with chunks. + var next *checkpoint.ChunkMetadata + var outChan chan *checkpoint.ChunkMetadata + + for { + if chunks.length == 0 { + next = nil + outChan = nil + } else { + next = heap.Pop(chunks).(*checkpoint.ChunkMetadata) + outChan = chunkDispatchCh + } + + select { + case <-n.ctx.Done(): + return checkpointStatusBail, n.ctx.Err() + + case returned := <-chunkReturnCh: + if returned == nil { + // Restoration completed, no more chunks. + return checkpointStatusDone, nil + } + heap.Push(chunks, returned) + + case status := <-errorCh: + return status, nil + + // If there's no chunk to send, outChan will be nil here, blocking forever. We still need to wait + // for other events even if there's no chunk to dispatch, since they may simply all be in processing. + case outChan <- next: + next = nil + + case <-doneCh: + // No usable committee connections left, move on to the next checkpoint. + return checkpointStatusNext, storageClient.ErrStorageNotAvailable + } + + if next != nil { + heap.Push(chunks, next) + } + } +} + +func (n *Node) getCheckpointList(committeeClient committee.Client) ([]*checkpoint.Metadata, error) { + // Get checkpoint list from all current committee members. + listCh := make(chan []*checkpoint.Metadata) + req := &checkpoint.GetCheckpointsRequest{ + Version: 1, + Namespace: n.commonNode.Runtime.ID(), + } + getter := func(ctx context.Context, conn *committee.ClientConnWithMeta) error { + api := storageApi.NewStorageClient(conn.ClientConn) + meta, err := api.GetCheckpoints(ctx, req) + if err != nil { + n.logger.Error("error calling GetCheckpoints", + "err", err, + "node", conn.Node.ID, + "this_node", n.commonNode.Identity.NodeSigner.Public, + ) + return err + } + n.logger.Debug("got checkpoint list from a node", + "length", len(meta), + "node", conn.Node.ID, + ) + listCh <- meta + return nil + } + + cancel, doneCh, err := n.goWithCommittee(committeeClient, getter) + if err != nil { + return nil, err + } + defer cancel() + + var list []*checkpoint.Metadata +resultLoop: + for { + select { + case <-doneCh: + break resultLoop + case <-n.ctx.Done(): + return nil, n.ctx.Err() + case meta := <-listCh: + list = append(list, meta...) + } + } + + // Prepare the list: sort and deduplicate. + sort.Slice(list, func(i, j int) bool { + // Descending! + if list[j].Root.Version == list[i].Root.Version { + return bytes.Compare(list[j].Root.Hash[:], list[i].Root.Hash[:]) < 0 + } + return list[j].Root.Version < list[i].Root.Version + }) + retList := make([]*checkpoint.Metadata, len(list)) + var prevCheckpoint *checkpoint.Metadata + cursor := 0 + for i := 0; i < len(list); i++ { + if prevCheckpoint == nil || !list[i].Root.Equal(&prevCheckpoint.Root) { + retList[cursor] = list[i] + cursor++ + } + } + + return retList[:cursor], nil +} + +func (n *Node) checkCheckpointUsable(cp *checkpoint.Metadata, remainingMask outstandingMask) outstandingMask { + namespace := n.commonNode.Runtime.ID() + if !namespace.Equal(&cp.Root.Namespace) { + // Not for the right runtime. + return maskNone + } + blk, err := n.commonNode.Runtime.History().GetBlock(n.ctx, cp.Root.Version) + if err != nil { + n.logger.Error("can't get block information for checkpoint, skipping", "err", err, "root", cp.Root) + return maskNone + } + _, lastIORoot, lastStateRoot := n.GetLastSynced() + if namespace.Equal(&blk.Header.Namespace) { + if blk.Header.IORoot.Equal(&cp.Root.Hash) { + // Do we already have this root? + if lastIORoot.Version < cp.Root.Version && remainingMask&maskIO != maskNone { + return maskIO + } + return maskNone + } + if blk.Header.StateRoot.Equal(&cp.Root.Hash) { + // Do we already have this root? + if lastStateRoot.Version < cp.Root.Version && remainingMask&maskState != maskNone { + return maskState + } + return maskNone + } + } + n.logger.Info("checkpoint for unknown root skipped", "root", cp.Root) + return maskNone +} + +func (n *Node) syncCheckpoints() error { + // Start following the storage committee. + committeeWatcher, err := committee.NewWatcher( + n.ctx, + n.commonNode.Consensus.Scheduler(), + n.commonNode.Consensus.Registry(), + n.commonNode.Runtime.ID(), + schedulerApi.KindStorage, + committee.WithAutomaticEpochTransitions(), + ) + if err != nil { + return fmt.Errorf("can't establish storage committee watcher: %w", err) + } + committeeClient, err := committee.NewClient(n.ctx, committeeWatcher.Nodes(), committee.WithClientAuthentication(n.commonNode.Identity)) + if err != nil { + return fmt.Errorf("can't create committee client: %w", err) + } + + descriptor, err := n.commonNode.Runtime.RegistryDescriptor(n.ctx) + if err != nil { + return fmt.Errorf("can't get runtime descriptor: %w", err) + } + + // Fetch metadata from the current committee. + metadata, err := n.getCheckpointList(committeeClient) + if err != nil { + return fmt.Errorf("can't get checkpoint list from storage committee: %w", err) + } + + // Try all the checkpoints now, from most recent backwards. + remainingRoots := maskAll + var mask outstandingMask + for _, check := range metadata { + mask = n.checkCheckpointUsable(check, remainingRoots) + if mask == maskNone { + continue + } + + status, err := n.handleCheckpoint(check, committeeClient, descriptor.Storage.GroupSize) + switch status { + case checkpointStatusDone: + n.logger.Info("successfully restored from checkpoint", "root", check.Root, "mask", mask) + remainingRoots &= ^mask + if remainingRoots == maskNone { + return nil + } + continue + case checkpointStatusNext: + n.logger.Info("error trying to restore from checkpoint, trying next most recent", "root", check.Root, "err", err) + continue + case checkpointStatusBail: + n.logger.Error("error trying to restore from checkpoint, unrecoverable", "root", check.Root, "err", err) + return fmt.Errorf("error restoring from checkpoints: %w", err) + } + } + + return ErrNoUsableCheckpoints +} diff --git a/go/worker/storage/committee/node.go b/go/worker/storage/committee/node.go index b01e35467ba..abb41649f5d 100644 --- a/go/worker/storage/committee/node.go +++ b/go/worker/storage/committee/node.go @@ -649,14 +649,37 @@ func (n *Node) worker() { // nolint: gocyclo heap.Init(outOfOrderDiffs) - close(n.initCh) - // We are now ready to service requests. - n.roleProvider.SetAvailable(func(nd *node.Node) error { + registeredCh := make(chan interface{}) + n.roleProvider.SetAvailableWithCallback(func(nd *node.Node) error { nd.AddOrUpdateRuntime(n.commonNode.Runtime.ID()) return nil + }, func(ctx context.Context) error { + close(registeredCh) + return nil }) + // Wait for the registration to finish, because we'll need to ask + // questions immediately. + n.logger.Debug("waiting for node registration to finish") + select { + case <-registeredCh: + case <-n.ctx.Done(): + return + } + + // Try to perform initial sync from state and io checkpoints. + if err = n.syncCheckpoints(); err != nil { + n.logger.Info("checkpoint sync failed", "err", err) + } else { + n.logger.Info("checkpoint sync succeeded", + logging.LogEvent, LogEventCheckpointSyncSuccess, + ) + } + + // Once the initial sync is done, we're officially done with initialization. + close(n.initCh) + // Main processing loop. When a new block comes in, its state and io roots are inspected and their // writelogs fetched from remote storage nodes in case we don't have them locally yet. Fetches are // asynchronous and, once complete, trigger local Apply operations. These are serialized