Skip to content

Commit

Permalink
go/worker/compute: Replicate input batch locally
Browse files Browse the repository at this point in the history
  • Loading branch information
kostko committed Jul 2, 2021
1 parent 5da7eb8 commit d51699d
Show file tree
Hide file tree
Showing 2 changed files with 147 additions and 5 deletions.
14 changes: 10 additions & 4 deletions go/worker/common/committee/group.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,7 @@ type Group struct {
// storage is the storage backend that tracks the current committee.
storage storage.Backend
storageClient storage.ClientBackend
storageLocal storage.LocalBackend

logger *logging.Logger
}
Expand Down Expand Up @@ -544,6 +545,12 @@ func (g *Group) Storage() storage.Backend {
return g.storage
}

// StorageLocal returns the local storage backend if the local node is also a storage node.
// Otherwise it returns nil.
func (g *Group) StorageLocal() storage.LocalBackend {
return g.storageLocal
}

// Start starts the group services.
func (g *Group) Start() error {
g.Lock()
Expand All @@ -553,14 +560,13 @@ func (g *Group) Start() error {
// for this runtime). In this case we override the storage client's backend so that any updates
// don't go via gRPC but are redirected directly to the local backend instead.
var scOpts []storageClient.Option
var localStorageBackend storage.LocalBackend
if lsb, ok := g.runtime.Storage().(storage.LocalBackend); ok && g.runtime.HasRoles(node.RoleStorageWorker) {
// Make sure to unwrap the local backend as we need the raw local backend here.
if wrapped, ok := lsb.(storage.WrappedLocalBackend); ok {
lsb = wrapped.Unwrap()
}

localStorageBackend = lsb
g.storageLocal = lsb
scOpts = append(scOpts, storageClient.WithBackendOverride(g.identity.NodeSigner.Public(), lsb))
}

Expand All @@ -578,10 +584,10 @@ func (g *Group) Start() error {
g.storageClient = sc.(storage.ClientBackend)

// Create the storage multiplexer if we have a local storage backend.
if localStorageBackend != nil {
if g.storageLocal != nil {
g.storage = storage.NewStorageMux(
storage.MuxReadOpFinishEarly(storage.MuxIterateIgnoringErrors()),
localStorageBackend,
g.storageLocal,
g.storageClient,
)
} else {
Expand Down
138 changes: 137 additions & 1 deletion go/worker/compute/executor/committee/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -1072,6 +1072,123 @@ func (n *Node) maybeStartProcessingBatchLocked(batch *unresolvedBatch) {
}
}

func (n *Node) startLocalStorageReplication(
ctx context.Context,
blk *block.Block,
ioRootHash hash.Hash,
batch transaction.RawBatch,
) <-chan error {
ch := make(chan error, 1)
lsb := n.commonNode.Group.StorageLocal()
if lsb == nil {
// In case there is no local storage backend to replicate to, finish early.
ch <- nil
close(ch)
return ch
}

ioRoot := storage.Root{
Namespace: blk.Header.Namespace,
Version: blk.Header.Round + 1,
Type: storage.RootTypeIO,
Hash: ioRootHash,
}

// If we have a local storage node, replicate batch locally so we will be able to Apply
// locally later when proposing a batch. This also avoids needless replication for things
// that we already have.
replicateIO := make(chan error)
go func() {
defer close(replicateIO)

// Check if the root is already present as in this case no replication is needed.
if lsb.NodeDB().HasRoot(ioRoot) {
replicateIO <- nil
return
}

n.logger.Debug("replicating I/O root locally",
"io_root", ioRoot,
)

emptyRoot := ioRoot
emptyRoot.Hash.Empty()

ioTree := transaction.NewTree(nil, emptyRoot)
defer ioTree.Close()

for idx, tx := range batch {
if err := ioTree.AddTransaction(ctx, transaction.Transaction{Input: tx, BatchOrder: uint32(idx)}, nil); err != nil {
n.logger.Error("failed to create I/O tree",
"err", err,
)
replicateIO <- err
return
}
}

ioWriteLog, ioRootHashCheck, err := ioTree.Commit(ctx)
if err != nil {
n.logger.Error("failed to create I/O tree",
"err", err,
)
replicateIO <- err
return
}
if !ioRootHashCheck.Equal(&ioRootHash) {
n.logger.Error("inconsistent I/O root",
"io_root_hash", ioRootHashCheck,
"expected", ioRootHash,
)
replicateIO <- fmt.Errorf("inconsistent I/O root")
return
}

_, err = lsb.Apply(ctx, &storage.ApplyRequest{
Namespace: ioRoot.Namespace,
RootType: ioRoot.Type,
SrcRound: ioRoot.Version,
SrcRoot: emptyRoot.Hash,
DstRound: ioRoot.Version,
DstRoot: ioRoot.Hash,
WriteLog: ioWriteLog,
})
if err != nil {
n.logger.Error("failed to apply I/O tree locally",
"err", err,
)
replicateIO <- err
return
}

replicateIO <- nil
}()

// Wait for replication to complete.
go func() {
defer close(ch)

var combinedErr error
select {
case <-ctx.Done():
combinedErr = ctx.Err()
case err := <-replicateIO:
if err != nil {
combinedErr = fmt.Errorf("failed to replicate I/O root: %w", err)
}
}
// TODO: We should also wait for state replication to avoid extra fetches.

n.logger.Debug("local storage replication done",
"io_root", ioRoot,
)

ch <- combinedErr
}()

return ch
}

// Guarded by n.commonNode.CrossNode.
func (n *Node) startProcessingBatchLocked(batch *unresolvedBatch) {
if n.commonNode.CurrentBlock == nil {
Expand Down Expand Up @@ -1128,6 +1245,10 @@ func (n *Node) startProcessingBatchLocked(batch *unresolvedBatch) {
)
return
}

// Optionally start local storage replication in parallel to batch dispatch.
replicateCh := n.startLocalStorageReplication(ctx, blk, batch.ioRoot.Hash, resolvedBatch)

rq := &protocol.Body{
RuntimeExecuteTxBatchRequest: &protocol.RuntimeExecuteTxBatchRequest{
ConsensusBlock: *consensusBlk,
Expand Down Expand Up @@ -1185,6 +1306,21 @@ func (n *Node) startProcessingBatchLocked(batch *unresolvedBatch) {
}
n.schedulerMutex.Unlock()

// Wait for replication to complete before proposing a batch to ensure that we can cleanly
// apply any updates.
select {
case <-ctx.Done():
return
case err = <-replicateCh:
if err != nil {
n.logger.Error("local storage replication failed",
"err", err,
)
// We can still continue as other nodes may have the storage replicated. If this is
// not the case, generating a commit will fail in propose.
}
}

// Submit response to the executor worker.
done <- &processedBatch{
computed: &rsp.RuntimeExecuteTxBatchResponse.Batch,
Expand Down Expand Up @@ -1280,7 +1416,7 @@ func (n *Node) proposeBatch(
DstRound: lastHeader.Round + 1,
Ops: applyOps,
})
if err != nil {
if err != nil && receipts == nil { // Receipts can be non-nil in case only local apply failed.
n.logger.Error("failed to apply to storage",
"err", err,
)
Expand Down

0 comments on commit d51699d

Please sign in to comment.