From c37135981ea7a39b678099e47bfd3190f68f985d Mon Sep 17 00:00:00 2001 From: Jernej Kos Date: Thu, 25 Jun 2020 11:10:21 +0200 Subject: [PATCH 1/3] go/worker/compute/executor: Defer fetching the batch from storage There is no need to attempt to fetch the batch immediately, we can defer it to when we actually need to start processing the batch. This makes fetching not block P2P dispatch. --- .changelog/3049.internal.1.md | 5 + go/worker/compute/executor/committee/batch.go | 48 ++++++ go/worker/compute/executor/committee/node.go | 148 ++++++++---------- go/worker/compute/executor/committee/state.go | 35 +---- 4 files changed, 120 insertions(+), 116 deletions(-) create mode 100644 .changelog/3049.internal.1.md create mode 100644 go/worker/compute/executor/committee/batch.go diff --git a/.changelog/3049.internal.1.md b/.changelog/3049.internal.1.md new file mode 100644 index 00000000000..ff4e1542751 --- /dev/null +++ b/.changelog/3049.internal.1.md @@ -0,0 +1,5 @@ +go/worker/compute/executor: Defer fetching the batch from storage + +There is no need to attempt to fetch the batch immediately, we can defer it to +when we actually need to start processing the batch. This makes fetching not +block P2P dispatch. diff --git a/go/worker/compute/executor/committee/batch.go b/go/worker/compute/executor/committee/batch.go new file mode 100644 index 00000000000..2d678aa4bdc --- /dev/null +++ b/go/worker/compute/executor/committee/batch.go @@ -0,0 +1,48 @@ +package committee + +import ( + "context" + "fmt" + + "github.com/opentracing/opentracing-go" + + "github.com/oasisprotocol/oasis-core/go/common/crypto/signature" + "github.com/oasisprotocol/oasis-core/go/runtime/transaction" + storage "github.com/oasisprotocol/oasis-core/go/storage/api" +) + +// unresolvedBatch is a batch that may still need to be resolved (fetched from storage). +type unresolvedBatch struct { + // ioRoot is the I/O root from the transaction scheduler containing the inputs. + ioRoot storage.Root + // txnSchedSignatures is the transaction scheduler signature of the dispatched batch. + txnSchedSignature signature.Signature + // storageSignatures are the storage node signatures of storage receipts for the I/O root. + storageSignatures []signature.Signature + + batch transaction.RawBatch + spanCtx opentracing.SpanContext +} + +func (ub *unresolvedBatch) String() string { + return fmt.Sprintf("UnresolvedBatch{ioRoot: %s}", ub.ioRoot) +} + +func (ub *unresolvedBatch) resolve(ctx context.Context, storage storage.Backend) (transaction.RawBatch, error) { + if ub.batch != nil { + // In case we already have a resolved batch, just return it. + return ub.batch, nil + } + + txs := transaction.NewTree(storage, ub.ioRoot) + defer txs.Close() + + batch, err := txs.GetInputBatch(ctx) + if err != nil || len(batch) == 0 { + return nil, fmt.Errorf("failed to fetch inputs from storage: %w", err) + } + if len(batch) == 0 { + return nil, fmt.Errorf("failed to fetch inputs from storage: batch is empty") + } + return batch, nil +} diff --git a/go/worker/compute/executor/committee/node.go b/go/worker/compute/executor/committee/node.go index 2cf6289fbdf..288759c1321 100644 --- a/go/worker/compute/executor/committee/node.go +++ b/go/worker/compute/executor/committee/node.go @@ -41,7 +41,6 @@ var ( errRuntimeAborted = errors.New("executor: runtime aborted batch processing") errIncompatibleHeader = p2pError.Permanent(errors.New("executor: incompatible header")) errInvalidReceipt = p2pError.Permanent(errors.New("executor: invalid storage receipt")) - errStorageFailed = p2pError.Permanent(errors.New("executor: failed to fetch from storage")) errIncorrectRole = errors.New("executor: incorrect role") errIncorrectState = errors.New("executor: incorrect state") errMsgFromNonTxnSched = errors.New("executor: received txn scheduler dispatch msg from non-txn scheduler") @@ -275,34 +274,23 @@ func (n *Node) queueBatchBlocking( return errInvalidReceipt } - // Fetch inputs from storage. - ioRoot := storage.Root{ - Namespace: hdr.Namespace, - Version: hdr.Round + 1, - Hash: ioRootHash, - } - txs := transaction.NewTree(n.commonNode.Storage, ioRoot) - defer txs.Close() - - readStartTime := time.Now() - batch, err := txs.GetInputBatch(ctx) - if err != nil || len(batch) == 0 { - n.logger.Error("failed to fetch inputs from storage", - "err", err, - "io_root", ioRoot, - ) - return errStorageFailed + // Defer fetching inputs from storage to when we actually start processing a batch. + batch := &unresolvedBatch{ + ioRoot: storage.Root{ + Namespace: hdr.Namespace, + Version: hdr.Round + 1, + Hash: ioRootHash, + }, + txnSchedSignature: txnSchedSig, + storageSignatures: storageSignatures, } - batchReadTime.With(n.getMetricLabels()).Observe(time.Since(readStartTime).Seconds()) - - var batchSpanCtx opentracing.SpanContext if batchSpan := opentracing.SpanFromContext(ctx); batchSpan != nil { - batchSpanCtx = batchSpan.Context() + batch.spanCtx = batchSpan.Context() } n.commonNode.CrossNode.Lock() defer n.commonNode.CrossNode.Unlock() - return n.handleExternalBatchLocked(committeeID, ioRootHash, batch, batchSpanCtx, hdr, txnSchedSig, storageSignatures) + return n.handleExternalBatchLocked(committeeID, batch, hdr) } // HandleBatchFromTransactionSchedulerLocked processes a batch from the transaction scheduler. @@ -321,7 +309,17 @@ func (n *Node) HandleBatchFromTransactionSchedulerLocked( return } - n.maybeStartProcessingBatchLocked(ioRoot, batch, batchSpanCtx, txnSchedSig, inputStorageSigs) + n.maybeStartProcessingBatchLocked(&unresolvedBatch{ + ioRoot: storage.Root{ + Namespace: n.commonNode.CurrentBlock.Header.Namespace, + Version: n.commonNode.CurrentBlock.Header.Round + 1, + Hash: ioRoot, + }, + txnSchedSignature: txnSchedSig, + storageSignatures: inputStorageSigs, + batch: batch, + spanCtx: batchSpanCtx, + }) } func (n *Node) bumpReselect() { @@ -396,7 +394,7 @@ func (n *Node) HandleNewBlockLocked(blk *block.Block) { // Check if this was the block we were waiting for. if header.MostlyEqual(state.header) { n.logger.Info("received block needed for batch processing") - n.maybeStartProcessingBatchLocked(state.ioRoot, state.batch, state.batchSpanCtx, state.txnSchedSig, state.inputStorageSigs) + n.maybeStartProcessingBatchLocked(state.batch) break } @@ -437,36 +435,24 @@ func (n *Node) HandleNewBlockLocked(blk *block.Block) { } // Guarded by n.commonNode.CrossNode. -func (n *Node) maybeStartProcessingBatchLocked( - ioRoot hash.Hash, - batch transaction.RawBatch, - batchSpanCtx opentracing.SpanContext, - txnSchedSig signature.Signature, - inputStorageSigs []signature.Signature, -) { +func (n *Node) maybeStartProcessingBatchLocked(batch *unresolvedBatch) { epoch := n.commonNode.Group.GetEpochSnapshot() switch { case epoch.IsExecutorWorker(): // Worker, start processing immediately. - n.startProcessingBatchLocked(ioRoot, batch, batchSpanCtx, txnSchedSig, inputStorageSigs) + n.startProcessingBatchLocked(batch) case epoch.IsExecutorBackupWorker(): // Backup worker, wait for discrepancy event. state, ok := n.state.(StateWaitingForBatch) if ok && state.pendingEvent != nil { // We have already received a discrepancy event, start processing immediately. n.logger.Info("already received a discrepancy event, start processing batch") - n.startProcessingBatchLocked(ioRoot, batch, batchSpanCtx, txnSchedSig, inputStorageSigs) + n.startProcessingBatchLocked(batch) return } - n.transitionLocked(StateWaitingForEvent{ - ioRoot: ioRoot, - batch: batch, - batchSpanCtx: batchSpanCtx, - txnSchedSig: txnSchedSig, - inputStorageSigs: inputStorageSigs, - }) + n.transitionLocked(StateWaitingForEvent{batch: batch}) default: // Currently not a member of an executor committee, log. n.logger.Warn("not an executor committee member, ignoring batch") @@ -474,13 +460,7 @@ func (n *Node) maybeStartProcessingBatchLocked( } // Guarded by n.commonNode.CrossNode. -func (n *Node) startProcessingBatchLocked( - ioRoot hash.Hash, - batch transaction.RawBatch, - batchSpanCtx opentracing.SpanContext, - txnSchedSig signature.Signature, - inputStorageSigs []signature.Signature, -) { +func (n *Node) startProcessingBatchLocked(batch *unresolvedBatch) { if n.commonNode.CurrentBlock == nil { panic("attempted to start processing batch with a nil block") } @@ -493,17 +473,8 @@ func (n *Node) startProcessingBatchLocked( ctx, cancel := context.WithCancel(n.ctx) done := make(chan *protocol.ComputedBatch, 1) - rq := &protocol.Body{ - RuntimeExecuteTxBatchRequest: &protocol.RuntimeExecuteTxBatchRequest{ - IORoot: ioRoot, - Inputs: batch, - Block: *n.commonNode.CurrentBlock, - }, - } - batchStartTime := time.Now() - batchSize.With(n.getMetricLabels()).Observe(float64(len(batch))) - n.transitionLocked(StateProcessingBatch{ioRoot, batch, batchSpanCtx, batchStartTime, cancel, done, txnSchedSig, inputStorageSigs}) + n.transitionLocked(StateProcessingBatch{batch, batchStartTime, cancel, done}) rt := n.GetHostedRuntime() if rt == nil { @@ -516,12 +487,33 @@ func (n *Node) startProcessingBatchLocked( // Request the worker host to process a batch. This is done in a separate // goroutine so that the committee node can continue processing blocks. + blk := n.commonNode.CurrentBlock go func() { defer close(done) + // Resolve the batch and dispatch it to the runtime. + readStartTime := time.Now() + resolvedBatch, err := batch.resolve(ctx, n.commonNode.Storage) + if err != nil { + n.logger.Error("failed to resolve batch", + "err", err, + "batch", batch, + ) + return + } + rq := &protocol.Body{ + RuntimeExecuteTxBatchRequest: &protocol.RuntimeExecuteTxBatchRequest{ + IORoot: batch.ioRoot.Hash, + Inputs: resolvedBatch, + Block: *blk, + }, + } + batchReadTime.With(n.getMetricLabels()).Observe(time.Since(readStartTime).Seconds()) + batchSize.With(n.getMetricLabels()).Observe(float64(len(resolvedBatch))) + span := opentracing.StartSpan("CallBatch(rq)", opentracing.Tag{Key: "rq", Value: rq}, - opentracing.ChildOf(batchSpanCtx), + opentracing.ChildOf(batch.spanCtx), ) ctx = opentracing.ContextWithSpan(ctx, span) defer span.Finish() @@ -611,16 +603,16 @@ func (n *Node) proposeBatchLocked(batch *protocol.ComputedBatch) { CommitteeID: epoch.GetExecutorCommitteeID(), Header: batch.Header, RakSig: batch.RakSig, - TxnSchedSig: state.txnSchedSig, - InputRoot: state.ioRoot, - InputStorageSigs: state.inputStorageSigs, + TxnSchedSig: state.batch.txnSchedSignature, + InputRoot: state.batch.ioRoot.Hash, + InputStorageSigs: state.batch.storageSignatures, } // Commit I/O and state write logs to storage. start := time.Now() err := func() error { span, ctx := tracing.StartSpanWithContext(n.ctx, "Apply(io, state)", - opentracing.ChildOf(state.batchSpanCtx), + opentracing.ChildOf(state.batch.spanCtx), ) defer span.Finish() @@ -634,7 +626,7 @@ func (n *Node) proposeBatchLocked(batch *protocol.ComputedBatch) { // I/O root. storage.ApplyOp{ SrcRound: lastHeader.Round + 1, - SrcRoot: state.ioRoot, + SrcRoot: state.batch.ioRoot.Hash, DstRoot: batch.Header.IORoot, WriteLog: batch.IOWriteLog, }, @@ -708,9 +700,9 @@ func (n *Node) proposeBatchLocked(batch *protocol.ComputedBatch) { // Publish commitment to merge committee. spanPublish := opentracing.StartSpan("PublishExecuteFinished(commitment)", - opentracing.ChildOf(state.batchSpanCtx), + opentracing.ChildOf(state.batch.spanCtx), ) - err = n.commonNode.Group.PublishExecuteFinished(state.batchSpanCtx, commit) + err = n.commonNode.Group.PublishExecuteFinished(state.batch.spanCtx, commit) if err != nil { spanPublish.Finish() n.logger.Error("failed to publish results to committee", @@ -736,7 +728,7 @@ func (n *Node) proposeBatchLocked(batch *protocol.ComputedBatch) { if n.mergeNode == nil { n.logger.Error("scheduler says we are a merge worker, but we are not") } else { - n.mergeNode.HandleResultsFromExecutorWorkerLocked(state.batchSpanCtx, commit) + n.mergeNode.HandleResultsFromExecutorWorkerLocked(state.batch.spanCtx, commit) } } @@ -799,7 +791,7 @@ func (n *Node) HandleNewEventLocked(ev *roothash.Event) { // Backup worker, start processing a batch. n.logger.Info("backup worker activating and processing batch") - n.startProcessingBatchLocked(state.ioRoot, state.batch, state.batchSpanCtx, state.txnSchedSig, state.inputStorageSigs) + n.startProcessingBatchLocked(state.batch) } // HandleNodeUpdateLocked implements NodeHooks. @@ -809,15 +801,7 @@ func (n *Node) HandleNodeUpdateLocked(update *runtimeCommittee.NodeUpdate, snaps } // Guarded by n.commonNode.CrossNode. -func (n *Node) handleExternalBatchLocked( - committeeID hash.Hash, - ioRoot hash.Hash, - batch transaction.RawBatch, - batchSpanCtx opentracing.SpanContext, - hdr block.Header, - txnSchedSig signature.Signature, - inputStorageSigs []signature.Signature, -) error { +func (n *Node) handleExternalBatchLocked(committeeID hash.Hash, batch *unresolvedBatch, hdr block.Header) error { // If we are not waiting for a batch, don't do anything. if _, ok := n.state.(StateWaitingForBatch); !ok { return errIncorrectState @@ -843,7 +827,7 @@ func (n *Node) handleExternalBatchLocked( // Check if we have the correct block -- in this case, start processing the batch. if n.commonNode.CurrentBlock.Header.MostlyEqual(&hdr) { - n.maybeStartProcessingBatchLocked(ioRoot, batch, batchSpanCtx, txnSchedSig, inputStorageSigs) + n.maybeStartProcessingBatchLocked(batch) return nil } @@ -860,12 +844,8 @@ func (n *Node) handleExternalBatchLocked( // Wait for the correct block to arrive. n.transitionLocked(StateWaitingForBlock{ - ioRoot: ioRoot, - batch: batch, - batchSpanCtx: batchSpanCtx, - header: &hdr, - txnSchedSig: txnSchedSig, - inputStorageSigs: inputStorageSigs, + batch: batch, + header: &hdr, }) return nil diff --git a/go/worker/compute/executor/committee/state.go b/go/worker/compute/executor/committee/state.go index 88e5714a243..f046dcb54df 100644 --- a/go/worker/compute/executor/committee/state.go +++ b/go/worker/compute/executor/committee/state.go @@ -4,14 +4,9 @@ import ( "context" "time" - "github.com/opentracing/opentracing-go" - - "github.com/oasisprotocol/oasis-core/go/common/crypto/hash" - "github.com/oasisprotocol/oasis-core/go/common/crypto/signature" roothash "github.com/oasisprotocol/oasis-core/go/roothash/api" "github.com/oasisprotocol/oasis-core/go/roothash/api/block" "github.com/oasisprotocol/oasis-core/go/runtime/host/protocol" - "github.com/oasisprotocol/oasis-core/go/runtime/transaction" ) // StateName is a symbolic state without the attached values. @@ -131,18 +126,10 @@ func (s StateWaitingForBatch) String() string { // StateWaitingForBlock is the waiting for block state. type StateWaitingForBlock struct { - // I/O root from the transaction scheduler containing the inputs. - ioRoot hash.Hash // Batch that is waiting to be processed. - batch transaction.RawBatch - // Tracing for this batch. - batchSpanCtx opentracing.SpanContext + batch *unresolvedBatch // Header of the block we are waiting for. header *block.Header - // Transaction scheduler's signature. - txnSchedSig signature.Signature - // Storage signatures for the I/O root containing the inputs. - inputStorageSigs []signature.Signature } // Name returns the name of the state. @@ -157,16 +144,8 @@ func (s StateWaitingForBlock) String() string { // StateWaitingForEvent is the waiting for event state. type StateWaitingForEvent struct { - // I/O root from the transaction scheduler containing the inputs. - ioRoot hash.Hash // Batch that is being processed. - batch transaction.RawBatch - // Tracing for this batch. - batchSpanCtx opentracing.SpanContext - // Transaction scheduler's signature. - txnSchedSig signature.Signature - // Storage signatures for the I/O root containing the inputs. - inputStorageSigs []signature.Signature + batch *unresolvedBatch } // Name returns the name of the state. @@ -181,22 +160,14 @@ func (s StateWaitingForEvent) String() string { // StateProcessingBatch is the processing batch state. type StateProcessingBatch struct { - // I/O root from the transaction scheduler containing the inputs. - ioRoot hash.Hash // Batch that is being processed. - batch transaction.RawBatch - // Tracing for this batch. - batchSpanCtx opentracing.SpanContext + batch *unresolvedBatch // Timing for this batch. batchStartTime time.Time // Function for cancelling batch processing. cancelFn context.CancelFunc // Channel which will provide the result. done chan *protocol.ComputedBatch - // Transaction scheduler's signature. - txnSchedSig signature.Signature - // Storage signatures for the I/O root containing the inputs. - inputStorageSigs []signature.Signature } // Name returns the name of the state. From 652877be6ef495ee65eb0075d61c6afc7296596f Mon Sep 17 00:00:00 2001 From: Jernej Kos Date: Thu, 25 Jun 2020 11:11:59 +0200 Subject: [PATCH 2/3] go/worker/compute/merge: Defer finalization attempt There is no need for the finalization attempt to block handling of an incoming commitment as any errors from that are not propagated. This avoids blocking P2P relaying as well. --- .changelog/3049.internal.2.md | 5 +++++ go/worker/compute/merge/committee/node.go | 19 ++++++++++++++++++- 2 files changed, 23 insertions(+), 1 deletion(-) create mode 100644 .changelog/3049.internal.2.md diff --git a/.changelog/3049.internal.2.md b/.changelog/3049.internal.2.md new file mode 100644 index 00000000000..ccd7fcad029 --- /dev/null +++ b/.changelog/3049.internal.2.md @@ -0,0 +1,5 @@ +go/worker/compute/merge: Defer finalization attempt + +There is no need for the finalization attempt to block handling of an incoming +commitment as any errors from that are not propagated. This avoids blocking +P2P relaying as well. diff --git a/go/worker/compute/merge/committee/node.go b/go/worker/compute/merge/committee/node.go index 86a55528d2c..1bfb1e810fa 100644 --- a/go/worker/compute/merge/committee/node.go +++ b/go/worker/compute/merge/committee/node.go @@ -307,7 +307,24 @@ func (n *Node) handleResultsLocked(ctx context.Context, commit *commitment.Execu return err } - n.tryFinalizeResultsLocked(sp, false) + // Attempt finalization. We defer this part in order to not block P2P relaying. + expectedRound := n.commonNode.CurrentBlock.Header.Round + go func() { + n.commonNode.CrossNode.Lock() + defer n.commonNode.CrossNode.Unlock() + + // Ignore defered finalization attempt if state has changed. + if _, ok := n.state.(StateWaitingForResults); !ok { + return + } + + // Ignore defered finalization attempt if current block has changed. + if n.commonNode.CurrentBlock.Header.Round != expectedRound { + return + } + + n.tryFinalizeResultsLocked(sp, false) + }() return nil } From ac52c75be5b506144cddca93ae60ee4cc598a5ad Mon Sep 17 00:00:00 2001 From: Jernej Kos Date: Thu, 25 Jun 2020 11:12:29 +0200 Subject: [PATCH 3/3] go/worker/common: Treat unknown P2P message type as a permanent error --- go/worker/common/committee/node.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/go/worker/common/committee/node.go b/go/worker/common/committee/node.go index 674badcc480..d1c8106492b 100644 --- a/go/worker/common/committee/node.go +++ b/go/worker/common/committee/node.go @@ -18,6 +18,7 @@ import ( runtimeRegistry "github.com/oasisprotocol/oasis-core/go/runtime/registry" storage "github.com/oasisprotocol/oasis-core/go/storage/api" "github.com/oasisprotocol/oasis-core/go/worker/common/p2p" + p2pError "github.com/oasisprotocol/oasis-core/go/worker/common/p2p/error" ) var ( @@ -167,7 +168,7 @@ func (n *Node) HandlePeerMessage(ctx context.Context, message *p2p.Message) erro return nil } } - return errors.New("unknown message type") + return p2pError.Permanent(errors.New("unknown message type")) } // Guarded by n.CrossNode.