Skip to content

Commit

Permalink
Merge pull request #4107 from oasisprotocol/kostko/fix/executor-local…
Browse files Browse the repository at this point in the history
…-batch

go/worker/compute: Replicate input batch locally
  • Loading branch information
kostko authored Jul 2, 2021
2 parents 5da7eb8 + 0dccf54 commit e85fcca
Show file tree
Hide file tree
Showing 5 changed files with 167 additions and 13 deletions.
4 changes: 4 additions & 0 deletions .changelog/4107.bugfix.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
go/worker/compute: Replicate input batch locally

Previously storage commit could fail in case the node was both an executor
and a storage node but not in the storage committee.
18 changes: 13 additions & 5 deletions go/storage/api/mux.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ type MuxContinueWithError struct {
}

func (e MuxContinueWithError) Error() string {
if e.wrapped == nil {
return "<nil>"
}
return e.wrapped.Error()
}

Expand Down Expand Up @@ -65,13 +68,18 @@ func MuxReadOpFinishEarly(next MuxController) MuxController {
}
}

// MuxIterateIgnoringErrors creates a controller that tells the muxer to continue iterating through
// its backends even if one returns an error. The errors are stored and the last one is returned
// at the end.
func MuxIterateIgnoringErrors() MuxController {
// MuxIterateIgnoringLocalErrors creates a controller that tells the muxer to continue iterating
// through its backends even if a local one returns an error.
func MuxIterateIgnoringLocalErrors() MuxController {
return func(i int, backend Backend, meth string, resp interface{}, err error) (interface{}, error) {
// Non-local errors are propagated as-is and abort processing.
if _, ok := backend.(LocalBackend); !ok {
return resp, err
}

// Error in a local backend is ignored.
if err != nil {
return resp, &MuxContinueWithError{err}
return resp, &MuxContinueWithError{nil}
}
return resp, nil
}
Expand Down
6 changes: 3 additions & 3 deletions go/storage/api/mux_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
)

type faultyBackend struct {
Backend
LocalBackend

calledCh chan struct{}
returnCh chan error
Expand Down Expand Up @@ -45,7 +45,7 @@ func TestStorageMux(t *testing.T) {
}

mux := NewStorageMux(
MuxReadOpFinishEarly(MuxIterateIgnoringErrors()),
MuxReadOpFinishEarly(MuxIterateIgnoringLocalErrors()),
faulty1,
faulty2,
)
Expand All @@ -69,7 +69,7 @@ func TestStorageMux(t *testing.T) {
faulty1.returnCh <- someError
faulty2.returnCh <- nil
applyResp, err = mux.Apply(ctx, &ApplyRequest{})
require.EqualError(t, err, "error")
require.NoError(t, err)
require.NotNil(t, applyResp)
<-faulty1.calledCh
<-faulty2.calledCh
Expand Down
16 changes: 11 additions & 5 deletions go/worker/common/committee/group.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,7 @@ type Group struct {
// storage is the storage backend that tracks the current committee.
storage storage.Backend
storageClient storage.ClientBackend
storageLocal storage.LocalBackend

logger *logging.Logger
}
Expand Down Expand Up @@ -544,6 +545,12 @@ func (g *Group) Storage() storage.Backend {
return g.storage
}

// StorageLocal returns the local storage backend if the local node is also a storage node.
// Otherwise it returns nil.
func (g *Group) StorageLocal() storage.LocalBackend {
return g.storageLocal
}

// Start starts the group services.
func (g *Group) Start() error {
g.Lock()
Expand All @@ -553,14 +560,13 @@ func (g *Group) Start() error {
// for this runtime). In this case we override the storage client's backend so that any updates
// don't go via gRPC but are redirected directly to the local backend instead.
var scOpts []storageClient.Option
var localStorageBackend storage.LocalBackend
if lsb, ok := g.runtime.Storage().(storage.LocalBackend); ok && g.runtime.HasRoles(node.RoleStorageWorker) {
// Make sure to unwrap the local backend as we need the raw local backend here.
if wrapped, ok := lsb.(storage.WrappedLocalBackend); ok {
lsb = wrapped.Unwrap()
}

localStorageBackend = lsb
g.storageLocal = lsb
scOpts = append(scOpts, storageClient.WithBackendOverride(g.identity.NodeSigner.Public(), lsb))
}

Expand All @@ -578,10 +584,10 @@ func (g *Group) Start() error {
g.storageClient = sc.(storage.ClientBackend)

// Create the storage multiplexer if we have a local storage backend.
if localStorageBackend != nil {
if g.storageLocal != nil {
g.storage = storage.NewStorageMux(
storage.MuxReadOpFinishEarly(storage.MuxIterateIgnoringErrors()),
localStorageBackend,
storage.MuxReadOpFinishEarly(storage.MuxIterateIgnoringLocalErrors()),
g.storageLocal,
g.storageClient,
)
} else {
Expand Down
136 changes: 136 additions & 0 deletions go/worker/compute/executor/committee/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -1072,6 +1072,123 @@ func (n *Node) maybeStartProcessingBatchLocked(batch *unresolvedBatch) {
}
}

func (n *Node) startLocalStorageReplication(
ctx context.Context,
blk *block.Block,
ioRootHash hash.Hash,
batch transaction.RawBatch,
) <-chan error {
ch := make(chan error, 1)
lsb := n.commonNode.Group.StorageLocal()
if lsb == nil {
// In case there is no local storage backend to replicate to, finish early.
ch <- nil
close(ch)
return ch
}

ioRoot := storage.Root{
Namespace: blk.Header.Namespace,
Version: blk.Header.Round + 1,
Type: storage.RootTypeIO,
Hash: ioRootHash,
}

// If we have a local storage node, replicate batch locally so we will be able to Apply
// locally later when proposing a batch. This also avoids needless replication for things
// that we already have.
replicateIO := make(chan error)
go func() {
defer close(replicateIO)

// Check if the root is already present as in this case no replication is needed.
if lsb.NodeDB().HasRoot(ioRoot) {
replicateIO <- nil
return
}

n.logger.Debug("replicating I/O root locally",
"io_root", ioRoot,
)

emptyRoot := ioRoot
emptyRoot.Hash.Empty()

ioTree := transaction.NewTree(nil, emptyRoot)
defer ioTree.Close()

for idx, tx := range batch {
if err := ioTree.AddTransaction(ctx, transaction.Transaction{Input: tx, BatchOrder: uint32(idx)}, nil); err != nil {
n.logger.Error("failed to create I/O tree",
"err", err,
)
replicateIO <- err
return
}
}

ioWriteLog, ioRootHashCheck, err := ioTree.Commit(ctx)
if err != nil {
n.logger.Error("failed to create I/O tree",
"err", err,
)
replicateIO <- err
return
}
if !ioRootHashCheck.Equal(&ioRootHash) {
n.logger.Error("inconsistent I/O root",
"io_root_hash", ioRootHashCheck,
"expected", ioRootHash,
)
replicateIO <- fmt.Errorf("inconsistent I/O root")
return
}

_, err = lsb.Apply(ctx, &storage.ApplyRequest{
Namespace: ioRoot.Namespace,
RootType: ioRoot.Type,
SrcRound: ioRoot.Version,
SrcRoot: emptyRoot.Hash,
DstRound: ioRoot.Version,
DstRoot: ioRoot.Hash,
WriteLog: ioWriteLog,
})
if err != nil {
n.logger.Error("failed to apply I/O tree locally",
"err", err,
)
replicateIO <- err
return
}

replicateIO <- nil
}()

// Wait for replication to complete.
go func() {
defer close(ch)

var combinedErr error
select {
case <-ctx.Done():
combinedErr = ctx.Err()
case err := <-replicateIO:
if err != nil {
combinedErr = fmt.Errorf("failed to replicate I/O root: %w", err)
}
}
// TODO: We should also wait for state replication to avoid extra fetches.

n.logger.Debug("local storage replication done",
"io_root", ioRoot,
)

ch <- combinedErr
}()

return ch
}

// Guarded by n.commonNode.CrossNode.
func (n *Node) startProcessingBatchLocked(batch *unresolvedBatch) {
if n.commonNode.CurrentBlock == nil {
Expand Down Expand Up @@ -1128,6 +1245,10 @@ func (n *Node) startProcessingBatchLocked(batch *unresolvedBatch) {
)
return
}

// Optionally start local storage replication in parallel to batch dispatch.
replicateCh := n.startLocalStorageReplication(ctx, blk, batch.ioRoot.Hash, resolvedBatch)

rq := &protocol.Body{
RuntimeExecuteTxBatchRequest: &protocol.RuntimeExecuteTxBatchRequest{
ConsensusBlock: *consensusBlk,
Expand Down Expand Up @@ -1185,6 +1306,21 @@ func (n *Node) startProcessingBatchLocked(batch *unresolvedBatch) {
}
n.schedulerMutex.Unlock()

// Wait for replication to complete before proposing a batch to ensure that we can cleanly
// apply any updates.
select {
case <-ctx.Done():
return
case err = <-replicateCh:
if err != nil {
n.logger.Error("local storage replication failed",
"err", err,
)
// We can still continue as other nodes may have the storage replicated. If this is
// not the case, generating a commit will fail in propose.
}
}

// Submit response to the executor worker.
done <- &processedBatch{
computed: &rsp.RuntimeExecuteTxBatchResponse.Batch,
Expand Down

0 comments on commit e85fcca

Please sign in to comment.