diff --git a/.changelog/4107.bugfix.md b/.changelog/4107.bugfix.md new file mode 100644 index 00000000000..302c50f0ef0 --- /dev/null +++ b/.changelog/4107.bugfix.md @@ -0,0 +1,4 @@ +go/worker/compute: Replicate input batch locally + +Previously storage commit could fail in case the node was both an executor +and a storage node but not in the storage committee. diff --git a/go/storage/api/mux.go b/go/storage/api/mux.go index d3ff066f628..429a8ac7048 100644 --- a/go/storage/api/mux.go +++ b/go/storage/api/mux.go @@ -25,6 +25,9 @@ type MuxContinueWithError struct { } func (e MuxContinueWithError) Error() string { + if e.wrapped == nil { + return "" + } return e.wrapped.Error() } @@ -65,13 +68,18 @@ func MuxReadOpFinishEarly(next MuxController) MuxController { } } -// MuxIterateIgnoringErrors creates a controller that tells the muxer to continue iterating through -// its backends even if one returns an error. The errors are stored and the last one is returned -// at the end. -func MuxIterateIgnoringErrors() MuxController { +// MuxIterateIgnoringLocalErrors creates a controller that tells the muxer to continue iterating +// through its backends even if a local one returns an error. +func MuxIterateIgnoringLocalErrors() MuxController { return func(i int, backend Backend, meth string, resp interface{}, err error) (interface{}, error) { + // Non-local errors are propagated as-is and abort processing. + if _, ok := backend.(LocalBackend); !ok { + return resp, err + } + + // Error in a local backend is ignored. if err != nil { - return resp, &MuxContinueWithError{err} + return resp, &MuxContinueWithError{nil} } return resp, nil } diff --git a/go/storage/api/mux_test.go b/go/storage/api/mux_test.go index b8a21f10de1..fa3fee6f841 100644 --- a/go/storage/api/mux_test.go +++ b/go/storage/api/mux_test.go @@ -9,7 +9,7 @@ import ( ) type faultyBackend struct { - Backend + LocalBackend calledCh chan struct{} returnCh chan error @@ -45,7 +45,7 @@ func TestStorageMux(t *testing.T) { } mux := NewStorageMux( - MuxReadOpFinishEarly(MuxIterateIgnoringErrors()), + MuxReadOpFinishEarly(MuxIterateIgnoringLocalErrors()), faulty1, faulty2, ) @@ -69,7 +69,7 @@ func TestStorageMux(t *testing.T) { faulty1.returnCh <- someError faulty2.returnCh <- nil applyResp, err = mux.Apply(ctx, &ApplyRequest{}) - require.EqualError(t, err, "error") + require.NoError(t, err) require.NotNil(t, applyResp) <-faulty1.calledCh <-faulty2.calledCh diff --git a/go/worker/common/committee/group.go b/go/worker/common/committee/group.go index 34a77d299f1..8a037a9332a 100644 --- a/go/worker/common/committee/group.go +++ b/go/worker/common/committee/group.go @@ -250,6 +250,7 @@ type Group struct { // storage is the storage backend that tracks the current committee. storage storage.Backend storageClient storage.ClientBackend + storageLocal storage.LocalBackend logger *logging.Logger } @@ -544,6 +545,12 @@ func (g *Group) Storage() storage.Backend { return g.storage } +// StorageLocal returns the local storage backend if the local node is also a storage node. +// Otherwise it returns nil. +func (g *Group) StorageLocal() storage.LocalBackend { + return g.storageLocal +} + // Start starts the group services. func (g *Group) Start() error { g.Lock() @@ -553,14 +560,13 @@ func (g *Group) Start() error { // for this runtime). In this case we override the storage client's backend so that any updates // don't go via gRPC but are redirected directly to the local backend instead. var scOpts []storageClient.Option - var localStorageBackend storage.LocalBackend if lsb, ok := g.runtime.Storage().(storage.LocalBackend); ok && g.runtime.HasRoles(node.RoleStorageWorker) { // Make sure to unwrap the local backend as we need the raw local backend here. if wrapped, ok := lsb.(storage.WrappedLocalBackend); ok { lsb = wrapped.Unwrap() } - localStorageBackend = lsb + g.storageLocal = lsb scOpts = append(scOpts, storageClient.WithBackendOverride(g.identity.NodeSigner.Public(), lsb)) } @@ -578,10 +584,10 @@ func (g *Group) Start() error { g.storageClient = sc.(storage.ClientBackend) // Create the storage multiplexer if we have a local storage backend. - if localStorageBackend != nil { + if g.storageLocal != nil { g.storage = storage.NewStorageMux( - storage.MuxReadOpFinishEarly(storage.MuxIterateIgnoringErrors()), - localStorageBackend, + storage.MuxReadOpFinishEarly(storage.MuxIterateIgnoringLocalErrors()), + g.storageLocal, g.storageClient, ) } else { diff --git a/go/worker/compute/executor/committee/node.go b/go/worker/compute/executor/committee/node.go index b53d0482464..d4d4048dc04 100644 --- a/go/worker/compute/executor/committee/node.go +++ b/go/worker/compute/executor/committee/node.go @@ -1072,6 +1072,123 @@ func (n *Node) maybeStartProcessingBatchLocked(batch *unresolvedBatch) { } } +func (n *Node) startLocalStorageReplication( + ctx context.Context, + blk *block.Block, + ioRootHash hash.Hash, + batch transaction.RawBatch, +) <-chan error { + ch := make(chan error, 1) + lsb := n.commonNode.Group.StorageLocal() + if lsb == nil { + // In case there is no local storage backend to replicate to, finish early. + ch <- nil + close(ch) + return ch + } + + ioRoot := storage.Root{ + Namespace: blk.Header.Namespace, + Version: blk.Header.Round + 1, + Type: storage.RootTypeIO, + Hash: ioRootHash, + } + + // If we have a local storage node, replicate batch locally so we will be able to Apply + // locally later when proposing a batch. This also avoids needless replication for things + // that we already have. + replicateIO := make(chan error) + go func() { + defer close(replicateIO) + + // Check if the root is already present as in this case no replication is needed. + if lsb.NodeDB().HasRoot(ioRoot) { + replicateIO <- nil + return + } + + n.logger.Debug("replicating I/O root locally", + "io_root", ioRoot, + ) + + emptyRoot := ioRoot + emptyRoot.Hash.Empty() + + ioTree := transaction.NewTree(nil, emptyRoot) + defer ioTree.Close() + + for idx, tx := range batch { + if err := ioTree.AddTransaction(ctx, transaction.Transaction{Input: tx, BatchOrder: uint32(idx)}, nil); err != nil { + n.logger.Error("failed to create I/O tree", + "err", err, + ) + replicateIO <- err + return + } + } + + ioWriteLog, ioRootHashCheck, err := ioTree.Commit(ctx) + if err != nil { + n.logger.Error("failed to create I/O tree", + "err", err, + ) + replicateIO <- err + return + } + if !ioRootHashCheck.Equal(&ioRootHash) { + n.logger.Error("inconsistent I/O root", + "io_root_hash", ioRootHashCheck, + "expected", ioRootHash, + ) + replicateIO <- fmt.Errorf("inconsistent I/O root") + return + } + + _, err = lsb.Apply(ctx, &storage.ApplyRequest{ + Namespace: ioRoot.Namespace, + RootType: ioRoot.Type, + SrcRound: ioRoot.Version, + SrcRoot: emptyRoot.Hash, + DstRound: ioRoot.Version, + DstRoot: ioRoot.Hash, + WriteLog: ioWriteLog, + }) + if err != nil { + n.logger.Error("failed to apply I/O tree locally", + "err", err, + ) + replicateIO <- err + return + } + + replicateIO <- nil + }() + + // Wait for replication to complete. + go func() { + defer close(ch) + + var combinedErr error + select { + case <-ctx.Done(): + combinedErr = ctx.Err() + case err := <-replicateIO: + if err != nil { + combinedErr = fmt.Errorf("failed to replicate I/O root: %w", err) + } + } + // TODO: We should also wait for state replication to avoid extra fetches. + + n.logger.Debug("local storage replication done", + "io_root", ioRoot, + ) + + ch <- combinedErr + }() + + return ch +} + // Guarded by n.commonNode.CrossNode. func (n *Node) startProcessingBatchLocked(batch *unresolvedBatch) { if n.commonNode.CurrentBlock == nil { @@ -1128,6 +1245,10 @@ func (n *Node) startProcessingBatchLocked(batch *unresolvedBatch) { ) return } + + // Optionally start local storage replication in parallel to batch dispatch. + replicateCh := n.startLocalStorageReplication(ctx, blk, batch.ioRoot.Hash, resolvedBatch) + rq := &protocol.Body{ RuntimeExecuteTxBatchRequest: &protocol.RuntimeExecuteTxBatchRequest{ ConsensusBlock: *consensusBlk, @@ -1185,6 +1306,21 @@ func (n *Node) startProcessingBatchLocked(batch *unresolvedBatch) { } n.schedulerMutex.Unlock() + // Wait for replication to complete before proposing a batch to ensure that we can cleanly + // apply any updates. + select { + case <-ctx.Done(): + return + case err = <-replicateCh: + if err != nil { + n.logger.Error("local storage replication failed", + "err", err, + ) + // We can still continue as other nodes may have the storage replicated. If this is + // not the case, generating a commit will fail in propose. + } + } + // Submit response to the executor worker. done <- &processedBatch{ computed: &rsp.RuntimeExecuteTxBatchResponse.Batch,