diff --git a/go/oasis-test-runner/oasis/fixture.go b/go/oasis-test-runner/oasis/fixture.go index 31205e920ab..f381bc15a59 100644 --- a/go/oasis-test-runner/oasis/fixture.go +++ b/go/oasis-test-runner/oasis/fixture.go @@ -325,6 +325,8 @@ type StorageWorkerFixture struct { // nolint: maligned AllowEarlyTermination bool `json:"allow_early_termination"` AllowErrorTermination bool `json:"allow_error_termination"` + NoAutoStart bool `json:"no_auto_start,omitempty"` + DisableCertRotation bool `json:"disable_cert_rotation"` LogWatcherHandlerFactories []log.WatcherHandlerFactory `json:"-"` @@ -353,6 +355,7 @@ func (f *StorageWorkerFixture) Create(net *Network) (*Storage, error) { NodeCfg: NodeCfg{ AllowEarlyTermination: f.AllowEarlyTermination, AllowErrorTermination: f.AllowErrorTermination, + NoAutoStart: f.NoAutoStart, LogWatcherHandlerFactories: f.LogWatcherHandlerFactories, Consensus: f.Consensus, }, diff --git a/go/oasis-test-runner/oasis/log.go b/go/oasis-test-runner/oasis/log.go index 05c8fd4ca99..99d359b50fb 100644 --- a/go/oasis-test-runner/oasis/log.go +++ b/go/oasis-test-runner/oasis/log.go @@ -6,6 +6,7 @@ import ( tendermint "github.com/oasisprotocol/oasis-core/go/consensus/tendermint/api" "github.com/oasisprotocol/oasis-core/go/oasis-test-runner/log" roothash "github.com/oasisprotocol/oasis-core/go/roothash/api" + workerStorage "github.com/oasisprotocol/oasis-core/go/worker/storage/committee" upgrade "github.com/oasisprotocol/oasis-core/go/upgrade/api" ) @@ -75,8 +76,20 @@ func LogEventABCIPruneDelete() log.WatcherHandlerFactory { return LogAssertEvent(abci.LogEventABCIPruneDelete, "expected ABCI pruning to be done") } -// LogAssertRoothashRoothashReindexing returns a handler witch checks wether roothash reindexing was +// LogAssertRoothashRoothashReindexing returns a handler which checks whether roothash reindexing was // run based on JSON log output. func LogAssertRoothashRoothashReindexing() log.WatcherHandlerFactory { return LogAssertEvent(roothash.LogEventHistoryReindexing, "roothash runtime reindexing not detected") } + +// LogAssertCheckpointSyncSuccess returns a handler which checks whether initial storage sync from +// a checkpoint was successful or not. +func LogAssertCheckpointSyncSuccess() log.WatcherHandlerFactory { + return LogAssertEvent(workerStorage.LogEventCheckpointSyncSuccess, "checkpoint sync did not succeed") +} + +// LogAssertCheckpointSync returns a handler which checks whether initial storage sync from +// a checkpoint failed or not. +func LogAssertCheckpointSync() log.WatcherHandlerFactory { + return LogAssertNotEvent(workerStorage.LogEventCheckpointSyncFailed, "checkpoint sync failed") +} diff --git a/go/oasis-test-runner/oasis/storage.go b/go/oasis-test-runner/oasis/storage.go index 6d531cd50af..829dad6cef3 100644 --- a/go/oasis-test-runner/oasis/storage.go +++ b/go/oasis-test-runner/oasis/storage.go @@ -186,6 +186,7 @@ func (net *Network) NewStorage(cfg *StorageCfg) (*Storage, error) { Name: storageName, net: net, dir: storageDir, + noAutoStart: cfg.NoAutoStart, disableDefaultLogWatcherHandlerFactories: cfg.DisableDefaultLogWatcherHandlerFactories, logWatcherHandlerFactories: cfg.LogWatcherHandlerFactories, consensus: cfg.Consensus, diff --git a/go/oasis-test-runner/scenario/e2e/runtime/storage_sync.go b/go/oasis-test-runner/scenario/e2e/runtime/storage_sync.go index c7b07141397..463038f917b 100644 --- a/go/oasis-test-runner/scenario/e2e/runtime/storage_sync.go +++ b/go/oasis-test-runner/scenario/e2e/runtime/storage_sync.go @@ -54,6 +54,18 @@ func (sc *storageSyncImpl) Fixture() (*oasis.NetworkFixture, error) { Entity: 1, IgnoreApplies: true, }) + + // One more storage worker for later, so it can do an initial sync with the snapshots. + f.StorageWorkers = append(f.StorageWorkers, oasis.StorageWorkerFixture{ + Backend: database.BackendNameBadgerDB, + Entity: 1, + NoAutoStart: true, + }) + lateWorker := len(f.StorageWorkers)-1 + + // Add log assertion for the late worker. + f.StorageWorkers[lateWorker].LogWatcherHandlerFactories = append(f.StorageWorkers[lateWorker].LogWatcherHandlerFactories, oasis.LogAssertCheckpointSync()) + return f, nil } @@ -161,5 +173,16 @@ func (sc *storageSyncImpl) Run(childEnv *env.Env) error { return fmt.Errorf("incorrect number of valid checkpoints (expected: >=2 got: %d)", validCps) } + // Now spin up the last storage worker and check if it syncs with a checkpoint. + lateWorker := sc.Net.StorageWorkers()[3] + err = lateWorker.Start() + if err != nil { + return fmt.Errorf("can't start last storage worker: %w", err) + } + err = lateWorker.WaitReady(ctx) + if err != nil { + return fmt.Errorf("error waiting for last storage worker to become ready: %w", err) + } + return nil } diff --git a/go/worker/storage/committee/checkpoint_sync.go b/go/worker/storage/committee/checkpoint_sync.go new file mode 100644 index 00000000000..b8d40897bb5 --- /dev/null +++ b/go/worker/storage/committee/checkpoint_sync.go @@ -0,0 +1,412 @@ +package committee + +import ( + "bytes" + "container/heap" + "context" + "errors" + "io" + "fmt" + "sort" + "sync" + "time" + + "github.com/cenkalti/backoff/v4" + + "github.com/oasisprotocol/oasis-core/go/runtime/committee" + schedulerApi "github.com/oasisprotocol/oasis-core/go/scheduler/api" + storageApi "github.com/oasisprotocol/oasis-core/go/storage/api" + storageClient "github.com/oasisprotocol/oasis-core/go/storage/client" + "github.com/oasisprotocol/oasis-core/go/storage/mkvs/checkpoint" +) + +const ( + retryInterval = 1 * time.Second + maxRetries = 15 + + checkpointStatusDone = 0 + checkpointStatusNext = 1 + checkpointStatusBail = 2 + + // LogEventCheckpointSyncSuccess is a log event value that signals that checkpoint sync was successful. + LogEventCheckpointSyncSuccess = "worker/storage/checkpoint-sync-success" + // LogEventCheckpointSyncFailed is a log event value that signals that checkpoint sync failed. + LogEventCheckpointSyncFailed = "worker/storage/checkpoint-sync-failed" +) + +var ( + // ErrNoUsableCheckpoints is the error returned when none of the checkpoints could be synced. + ErrNoUsableCheckpoints = errors.New("storage: no checkpoint could be synced") +) + +type restoreResult struct { + done bool + err error +} + +type chunkHeap struct { + array []*checkpoint.ChunkMetadata + length int +} + +func (h chunkHeap) Len() int { return h.length } +func (h chunkHeap) Less(i, j int) bool { return h.array[i].Index < h.array[j].Index } +func (h chunkHeap) Swap(i, j int) { h.array[i], h.array[j] = h.array[j], h.array[i] } + +func (h *chunkHeap) Push(x interface{}) { + h.array[h.length] = x.(*checkpoint.ChunkMetadata) + h.length++ +} + +func (h *chunkHeap) Pop() interface{} { + h.length-- + ret := h.array[h.length] + h.array[h.length] = nil + return ret +} + +// goWithCommittee runs the given operation with all the connections to the storage committee. +func (n *Node) goWithCommittee( + committeeClient committee.Client, + fn func(context.Context, *committee.ClientConnWithMeta)error, +) ( + context.CancelFunc, + chan interface{}, + error, +) { + connCh := make(chan []*committee.ClientConnWithMeta) + connGetter := func() error { + conns := committeeClient.GetConnectionsWithMeta() + if len(conns) == 0 { + return storageClient.ErrStorageNotAvailable + } + connCh <- conns + return nil + } + go func() { + sched := backoff.WithMaxRetries(backoff.NewConstantBackOff(retryInterval), maxRetries) + backoff.Retry(connGetter, backoff.WithContext(sched, n.ctx)) + close(connCh) + }() + conns, ok := <-connCh + if !ok || len(conns) == 0 { + return nil, nil, storageClient.ErrStorageNotAvailable + } + + workerCtx, workerCancel := context.WithCancel(n.ctx) + var workerGroup sync.WaitGroup + doneCh := make(chan interface{}) + + go func() { + defer close(doneCh) + workerGroup.Wait() + }() + + for _, conn := range conns { + workerGroup.Add(1) + go func(conn *committee.ClientConnWithMeta) { + defer workerGroup.Done() + op := func() error { + return fn(workerCtx, conn) + } + sched := backoff.WithMaxRetries(backoff.NewConstantBackOff(retryInterval), maxRetries) + backoff.Retry(op, backoff.WithContext(sched, workerCtx)) + }(conn) + } + + return workerCancel, doneCh, nil +} + +func (n *Node) nodeWorker( + ctx context.Context, + conn *committee.ClientConnWithMeta, + chunkDispatchCh chan *checkpoint.ChunkMetadata, + chunkReturnCh chan *checkpoint.ChunkMetadata, + errorCh chan int, +) error { + api := storageApi.NewStorageClient(conn.ClientConn) + for { + var chunk *checkpoint.ChunkMetadata + var ok bool + select { + case <-ctx.Done(): + return backoff.Permanent(ctx.Err()) + case chunk, ok = <-chunkDispatchCh: + if !ok { + return nil + } + } + + restoreCh := make(chan *restoreResult) + rd, wr := io.Pipe() + go func() { + done, err := n.localStorage.Checkpointer().RestoreChunk(ctx, chunk.Index, rd) + restoreCh <- &restoreResult{ + done: done, + err: err, + } + }() + err := api.GetCheckpointChunk(ctx, chunk, wr) + wr.Close() + result := <-restoreCh + + // GetCheckpointChunk errors. + switch { + case err == nil: + case errors.Is(err, checkpoint.ErrChunkNotFound): + chunkReturnCh <- chunk + return backoff.Permanent(err) + default: + n.logger.Error("can't fetch chunk from storage node", "node", conn.Node.ID, "chunk", chunk.Index) + return err + } + + // RestoreChunk errors. + switch { + case result.done: + // Signal to the toplevel handler that we're done. + chunkReturnCh <- nil + return nil + case errors.Is(result.err, checkpoint.ErrChunkCorrupted): + chunkReturnCh <- chunk + return nil + case errors.Is(result.err, checkpoint.ErrChunkProofVerificationFailed): + errorCh <- checkpointStatusNext + return backoff.Permanent(result.err) + case result.err != nil: + n.logger.Error("chunk restoration failed", "node", conn.Node.ID, "chunk", chunk.Index, "err", result.err) + errorCh <- checkpointStatusBail + return backoff.Permanent(result.err) + } + } +} + +func (n *Node) handleCheckpoint(check *checkpoint.Metadata, committeeClient committee.Client, groupSize uint64) (int, error) { + chunkDispatchCh := make(chan *checkpoint.ChunkMetadata) + defer close(chunkDispatchCh) + + chunkReturnCh := make(chan *checkpoint.ChunkMetadata, groupSize) + errorCh := make(chan int, groupSize) + + worker := func(ctx context.Context, conn *committee.ClientConnWithMeta) error { + return n.nodeWorker(ctx, conn, chunkDispatchCh, chunkReturnCh, errorCh) + } + + cancel, doneCh, err := n.goWithCommittee(committeeClient, worker) + if err != nil { + return checkpointStatusBail, fmt.Errorf("can't fetch chunks from committee nodes: %w", err) + } + defer cancel() + + if err := n.localStorage.Checkpointer().StartRestore(n.ctx, check); err != nil { + return checkpointStatusNext, fmt.Errorf("can't start checkpoint restore: %w", err) + } + + // Prepare the heap of chunks. + chunks := &chunkHeap{ + array: make([]*checkpoint.ChunkMetadata, len(check.Chunks)), + length: 0, + } + heap.Init(chunks) + + for i, c := range check.Chunks { + heap.Push(chunks, &checkpoint.ChunkMetadata{ + Version: 1, + Index: uint64(i), + Digest: c, + Root: check.Root, + }) + } + + // Feed the workers with chunks. + var next *checkpoint.ChunkMetadata + var outChan chan *checkpoint.ChunkMetadata + + for { + if chunks.length == 0 { + next = nil + outChan = nil + } else { + next = heap.Pop(chunks).(*checkpoint.ChunkMetadata) + outChan = chunkDispatchCh + } + + select { + case <-n.ctx.Done(): + return checkpointStatusBail, n.ctx.Err() + + case returned := <-chunkReturnCh: + if returned == nil { + // Restoration completed, no more chunks. + return checkpointStatusDone, nil + } + heap.Push(chunks, returned) + + case status := <-errorCh: + return status, nil + + // If there's no chunk to send, outChan will be nil here, blocking forever. We still need to wait + // for other events even if there's no chunk to dispatch, since they may simply all be in processing. + case outChan <- next: + next = nil + + case <-doneCh: + // No usable committee connections left, move on to the next checkpoint. + return checkpointStatusNext, storageClient.ErrStorageNotAvailable + } + + if next != nil { + heap.Push(chunks, next) + } + } +} + +func (n *Node) getCheckpointList(committeeClient committee.Client) ([]*checkpoint.Metadata, error) { + // Get checkpoint list from all current committee members. + listCh := make(chan []*checkpoint.Metadata) + req := &checkpoint.GetCheckpointsRequest{ + Version: 1, + Namespace: n.commonNode.Runtime.ID(), + } + getter := func(ctx context.Context, conn *committee.ClientConnWithMeta) error { + api := storageApi.NewStorageClient(conn.ClientConn) + meta, err := api.GetCheckpoints(ctx, req) + if err != nil { + n.logger.Error("error calling GetCheckpoints", + "err", err, + "node", conn.Node.ID, + "this_node", n.commonNode.Identity.NodeSigner.Public, + ) + return err + } + n.logger.Debug("got checkpoint list from a node", + "length", len(meta), + "node", conn.Node.ID, + ) + listCh <- meta + return nil + } + + cancel, doneCh, err := n.goWithCommittee(committeeClient, getter) + if err != nil { + return nil, err + } + defer cancel() + + var list []*checkpoint.Metadata +resultLoop: + for { + select { + case <-doneCh: + break resultLoop + case <-n.ctx.Done(): + return nil, n.ctx.Err() + case meta := <-listCh: + list = append(list, meta...) + } + } + + // Prepare the list: sort and deduplicate. + sort.Slice(list, func(i, j int) bool { + // Descending! + if list[j].Root.Version == list[i].Root.Version { + return bytes.Compare(list[j].Root.Hash[:], list[i].Root.Hash[:]) < 0 + } + return list[j].Root.Version < list[i].Root.Version + }) + retList := make([]*checkpoint.Metadata, len(list)) + var prevCheckpoint *checkpoint.Metadata + cursor := 0 + for i := 0; i < len(list); i++ { + if prevCheckpoint == nil || !list[i].Root.Equal(&prevCheckpoint.Root) { + retList[cursor] = list[i] + cursor++ + } + } + + return retList[:cursor], nil +} + +func (n *Node) checkCheckpointUsable(cp *checkpoint.Metadata) bool { + namespace := n.commonNode.Runtime.ID() + if !namespace.Equal(&cp.Root.Namespace) { + // Not for the right runtime. + return false + } + blk, err := n.commonNode.Runtime.History().GetBlock(n.ctx, cp.Root.Version) + if err != nil { + n.logger.Error("can't get block information for checkpoint, skipping", "err", err, "root", cp.Root) + return false + } + _, lastIORoot, lastStateRoot := n.GetLastSynced() + if namespace.Equal(&blk.Header.Namespace) { + if blk.Header.IORoot.Equal(&cp.Root.Hash) { + if lastIORoot.Version >= cp.Root.Version { + // We already have this root. + return false + } + return true + } + if blk.Header.StateRoot.Equal(&cp.Root.Hash) { + if lastStateRoot.Version >= cp.Root.Version { + // We already have this root. + return false + } + return true + } + } + n.logger.Info("checkpoint for unknown root skipped", "root", cp.Root) + return false +} + +func (n *Node) syncCheckpoint() error { + // Start following the storage committee. + committeeWatcher, err := committee.NewWatcher( + n.ctx, + n.commonNode.Consensus.Scheduler(), + n.commonNode.Consensus.Registry(), + n.commonNode.Runtime.ID(), + schedulerApi.KindStorage, + committee.WithAutomaticEpochTransitions(), + ) + if err != nil { + return fmt.Errorf("can't establish storage committee watcher: %w", err) + } + committeeClient, err := committee.NewClient(n.ctx, committeeWatcher.Nodes(), committee.WithClientAuthentication(n.commonNode.Identity)) + if err != nil { + return fmt.Errorf("can't create committee client: %w", err) + } + + descriptor, err := n.commonNode.Runtime.RegistryDescriptor(n.ctx) + if err != nil { + return fmt.Errorf("can't get runtime descriptor: %w", err) + } + + // Fetch metadata from the current committee. + metadata, err := n.getCheckpointList(committeeClient) + if err != nil { + return fmt.Errorf("can't get checkpoint list from storage committee: %w", err) + } + + // Try all the checkpoints now, from most recent backwards. + for _, check := range metadata { + if !n.checkCheckpointUsable(check) { + continue + } + + status, err := n.handleCheckpoint(check, committeeClient, descriptor.Storage.GroupSize) + switch status { + case checkpointStatusDone: + n.logger.Info("successfully restored from checkpoint", "root", check.Root) + return nil + case checkpointStatusNext: + n.logger.Info("error trying to restore from checkpoint, trying next most recent", "root", check.Root, "err", err) + continue + case checkpointStatusBail: + n.logger.Error("error trying to restore from checkpoint, unrecoverable", "root", check.Root, "err", err) + return fmt.Errorf("error restoring from checkpoints: %w", err) + } + } + + return ErrNoUsableCheckpoints +} diff --git a/go/worker/storage/committee/node.go b/go/worker/storage/committee/node.go index b01e35467ba..53afa3cc4d2 100644 --- a/go/worker/storage/committee/node.go +++ b/go/worker/storage/committee/node.go @@ -649,14 +649,40 @@ func (n *Node) worker() { // nolint: gocyclo heap.Init(outOfOrderDiffs) - close(n.initCh) - // We are now ready to service requests. - n.roleProvider.SetAvailable(func(nd *node.Node) error { + registeredCh := make(chan interface{}) + n.roleProvider.SetAvailableWithCallback(func(nd *node.Node) error { nd.AddOrUpdateRuntime(n.commonNode.Runtime.ID()) return nil + }, func(ctx context.Context) error { + close(registeredCh) + return nil }) + // Wait for the registration to finish, because we'll need to ask + // questions immediately. + n.logger.Debug("waiting for node registration to finish") + select { + case <-registeredCh: + case <-n.ctx.Done(): + return + } + + // Try to perform initial sync from a checkpoint. + if err := n.syncCheckpoint(); err != nil { + n.logger.Info("checkpoint sync failed", + "err", err, + logging.LogEvent, LogEventCheckpointSyncFailed, + ) + } else { + n.logger.Info("checkpoint sync succeeded", + logging.LogEvent, LogEventCheckpointSyncSuccess, + ) + } + + // Once the initial sync is done, we're officially done with initialization. + close(n.initCh) + // Main processing loop. When a new block comes in, its state and io roots are inspected and their // writelogs fetched from remote storage nodes in case we don't have them locally yet. Fetches are // asynchronous and, once complete, trigger local Apply operations. These are serialized