Skip to content

Commit

Permalink
Merge pull request #3049 from oasisprotocol/kostko/fix/compute-p2p-di…
Browse files Browse the repository at this point in the history
…spatch

go/worker: Defer various processing
  • Loading branch information
kostko authored Jun 25, 2020
2 parents 3590df4 + ac52c75 commit 695b240
Show file tree
Hide file tree
Showing 7 changed files with 145 additions and 118 deletions.
5 changes: 5 additions & 0 deletions .changelog/3049.internal.1.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
go/worker/compute/executor: Defer fetching the batch from storage

There is no need to attempt to fetch the batch immediately, we can defer it to
when we actually need to start processing the batch. This makes fetching not
block P2P dispatch.
5 changes: 5 additions & 0 deletions .changelog/3049.internal.2.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
go/worker/compute/merge: Defer finalization attempt

There is no need for the finalization attempt to block handling of an incoming
commitment as any errors from that are not propagated. This avoids blocking
P2P relaying as well.
3 changes: 2 additions & 1 deletion go/worker/common/committee/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
runtimeRegistry "github.com/oasisprotocol/oasis-core/go/runtime/registry"
storage "github.com/oasisprotocol/oasis-core/go/storage/api"
"github.com/oasisprotocol/oasis-core/go/worker/common/p2p"
p2pError "github.com/oasisprotocol/oasis-core/go/worker/common/p2p/error"
)

var (
Expand Down Expand Up @@ -167,7 +168,7 @@ func (n *Node) HandlePeerMessage(ctx context.Context, message *p2p.Message) erro
return nil
}
}
return errors.New("unknown message type")
return p2pError.Permanent(errors.New("unknown message type"))
}

// Guarded by n.CrossNode.
Expand Down
48 changes: 48 additions & 0 deletions go/worker/compute/executor/committee/batch.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package committee

import (
"context"
"fmt"

"github.com/opentracing/opentracing-go"

"github.com/oasisprotocol/oasis-core/go/common/crypto/signature"
"github.com/oasisprotocol/oasis-core/go/runtime/transaction"
storage "github.com/oasisprotocol/oasis-core/go/storage/api"
)

// unresolvedBatch is a batch that may still need to be resolved (fetched from storage).
type unresolvedBatch struct {
// ioRoot is the I/O root from the transaction scheduler containing the inputs.
ioRoot storage.Root
// txnSchedSignatures is the transaction scheduler signature of the dispatched batch.
txnSchedSignature signature.Signature
// storageSignatures are the storage node signatures of storage receipts for the I/O root.
storageSignatures []signature.Signature

batch transaction.RawBatch
spanCtx opentracing.SpanContext
}

func (ub *unresolvedBatch) String() string {
return fmt.Sprintf("UnresolvedBatch{ioRoot: %s}", ub.ioRoot)
}

func (ub *unresolvedBatch) resolve(ctx context.Context, storage storage.Backend) (transaction.RawBatch, error) {
if ub.batch != nil {
// In case we already have a resolved batch, just return it.
return ub.batch, nil
}

txs := transaction.NewTree(storage, ub.ioRoot)
defer txs.Close()

batch, err := txs.GetInputBatch(ctx)
if err != nil || len(batch) == 0 {
return nil, fmt.Errorf("failed to fetch inputs from storage: %w", err)
}
if len(batch) == 0 {
return nil, fmt.Errorf("failed to fetch inputs from storage: batch is empty")
}
return batch, nil
}
148 changes: 64 additions & 84 deletions go/worker/compute/executor/committee/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ var (
errRuntimeAborted = errors.New("executor: runtime aborted batch processing")
errIncompatibleHeader = p2pError.Permanent(errors.New("executor: incompatible header"))
errInvalidReceipt = p2pError.Permanent(errors.New("executor: invalid storage receipt"))
errStorageFailed = p2pError.Permanent(errors.New("executor: failed to fetch from storage"))
errIncorrectRole = errors.New("executor: incorrect role")
errIncorrectState = errors.New("executor: incorrect state")
errMsgFromNonTxnSched = errors.New("executor: received txn scheduler dispatch msg from non-txn scheduler")
Expand Down Expand Up @@ -275,34 +274,23 @@ func (n *Node) queueBatchBlocking(
return errInvalidReceipt
}

// Fetch inputs from storage.
ioRoot := storage.Root{
Namespace: hdr.Namespace,
Version: hdr.Round + 1,
Hash: ioRootHash,
}
txs := transaction.NewTree(n.commonNode.Storage, ioRoot)
defer txs.Close()

readStartTime := time.Now()
batch, err := txs.GetInputBatch(ctx)
if err != nil || len(batch) == 0 {
n.logger.Error("failed to fetch inputs from storage",
"err", err,
"io_root", ioRoot,
)
return errStorageFailed
// Defer fetching inputs from storage to when we actually start processing a batch.
batch := &unresolvedBatch{
ioRoot: storage.Root{
Namespace: hdr.Namespace,
Version: hdr.Round + 1,
Hash: ioRootHash,
},
txnSchedSignature: txnSchedSig,
storageSignatures: storageSignatures,
}
batchReadTime.With(n.getMetricLabels()).Observe(time.Since(readStartTime).Seconds())

var batchSpanCtx opentracing.SpanContext
if batchSpan := opentracing.SpanFromContext(ctx); batchSpan != nil {
batchSpanCtx = batchSpan.Context()
batch.spanCtx = batchSpan.Context()
}

n.commonNode.CrossNode.Lock()
defer n.commonNode.CrossNode.Unlock()
return n.handleExternalBatchLocked(committeeID, ioRootHash, batch, batchSpanCtx, hdr, txnSchedSig, storageSignatures)
return n.handleExternalBatchLocked(committeeID, batch, hdr)
}

// HandleBatchFromTransactionSchedulerLocked processes a batch from the transaction scheduler.
Expand All @@ -321,7 +309,17 @@ func (n *Node) HandleBatchFromTransactionSchedulerLocked(
return
}

n.maybeStartProcessingBatchLocked(ioRoot, batch, batchSpanCtx, txnSchedSig, inputStorageSigs)
n.maybeStartProcessingBatchLocked(&unresolvedBatch{
ioRoot: storage.Root{
Namespace: n.commonNode.CurrentBlock.Header.Namespace,
Version: n.commonNode.CurrentBlock.Header.Round + 1,
Hash: ioRoot,
},
txnSchedSignature: txnSchedSig,
storageSignatures: inputStorageSigs,
batch: batch,
spanCtx: batchSpanCtx,
})
}

func (n *Node) bumpReselect() {
Expand Down Expand Up @@ -396,7 +394,7 @@ func (n *Node) HandleNewBlockLocked(blk *block.Block) {
// Check if this was the block we were waiting for.
if header.MostlyEqual(state.header) {
n.logger.Info("received block needed for batch processing")
n.maybeStartProcessingBatchLocked(state.ioRoot, state.batch, state.batchSpanCtx, state.txnSchedSig, state.inputStorageSigs)
n.maybeStartProcessingBatchLocked(state.batch)
break
}

Expand Down Expand Up @@ -437,50 +435,32 @@ func (n *Node) HandleNewBlockLocked(blk *block.Block) {
}

// Guarded by n.commonNode.CrossNode.
func (n *Node) maybeStartProcessingBatchLocked(
ioRoot hash.Hash,
batch transaction.RawBatch,
batchSpanCtx opentracing.SpanContext,
txnSchedSig signature.Signature,
inputStorageSigs []signature.Signature,
) {
func (n *Node) maybeStartProcessingBatchLocked(batch *unresolvedBatch) {
epoch := n.commonNode.Group.GetEpochSnapshot()

switch {
case epoch.IsExecutorWorker():
// Worker, start processing immediately.
n.startProcessingBatchLocked(ioRoot, batch, batchSpanCtx, txnSchedSig, inputStorageSigs)
n.startProcessingBatchLocked(batch)
case epoch.IsExecutorBackupWorker():
// Backup worker, wait for discrepancy event.
state, ok := n.state.(StateWaitingForBatch)
if ok && state.pendingEvent != nil {
// We have already received a discrepancy event, start processing immediately.
n.logger.Info("already received a discrepancy event, start processing batch")
n.startProcessingBatchLocked(ioRoot, batch, batchSpanCtx, txnSchedSig, inputStorageSigs)
n.startProcessingBatchLocked(batch)
return
}

n.transitionLocked(StateWaitingForEvent{
ioRoot: ioRoot,
batch: batch,
batchSpanCtx: batchSpanCtx,
txnSchedSig: txnSchedSig,
inputStorageSigs: inputStorageSigs,
})
n.transitionLocked(StateWaitingForEvent{batch: batch})
default:
// Currently not a member of an executor committee, log.
n.logger.Warn("not an executor committee member, ignoring batch")
}
}

// Guarded by n.commonNode.CrossNode.
func (n *Node) startProcessingBatchLocked(
ioRoot hash.Hash,
batch transaction.RawBatch,
batchSpanCtx opentracing.SpanContext,
txnSchedSig signature.Signature,
inputStorageSigs []signature.Signature,
) {
func (n *Node) startProcessingBatchLocked(batch *unresolvedBatch) {
if n.commonNode.CurrentBlock == nil {
panic("attempted to start processing batch with a nil block")
}
Expand All @@ -493,17 +473,8 @@ func (n *Node) startProcessingBatchLocked(
ctx, cancel := context.WithCancel(n.ctx)
done := make(chan *protocol.ComputedBatch, 1)

rq := &protocol.Body{
RuntimeExecuteTxBatchRequest: &protocol.RuntimeExecuteTxBatchRequest{
IORoot: ioRoot,
Inputs: batch,
Block: *n.commonNode.CurrentBlock,
},
}

batchStartTime := time.Now()
batchSize.With(n.getMetricLabels()).Observe(float64(len(batch)))
n.transitionLocked(StateProcessingBatch{ioRoot, batch, batchSpanCtx, batchStartTime, cancel, done, txnSchedSig, inputStorageSigs})
n.transitionLocked(StateProcessingBatch{batch, batchStartTime, cancel, done})

rt := n.GetHostedRuntime()
if rt == nil {
Expand All @@ -516,12 +487,33 @@ func (n *Node) startProcessingBatchLocked(

// Request the worker host to process a batch. This is done in a separate
// goroutine so that the committee node can continue processing blocks.
blk := n.commonNode.CurrentBlock
go func() {
defer close(done)

// Resolve the batch and dispatch it to the runtime.
readStartTime := time.Now()
resolvedBatch, err := batch.resolve(ctx, n.commonNode.Storage)
if err != nil {
n.logger.Error("failed to resolve batch",
"err", err,
"batch", batch,
)
return
}
rq := &protocol.Body{
RuntimeExecuteTxBatchRequest: &protocol.RuntimeExecuteTxBatchRequest{
IORoot: batch.ioRoot.Hash,
Inputs: resolvedBatch,
Block: *blk,
},
}
batchReadTime.With(n.getMetricLabels()).Observe(time.Since(readStartTime).Seconds())
batchSize.With(n.getMetricLabels()).Observe(float64(len(resolvedBatch)))

span := opentracing.StartSpan("CallBatch(rq)",
opentracing.Tag{Key: "rq", Value: rq},
opentracing.ChildOf(batchSpanCtx),
opentracing.ChildOf(batch.spanCtx),
)
ctx = opentracing.ContextWithSpan(ctx, span)
defer span.Finish()
Expand Down Expand Up @@ -611,16 +603,16 @@ func (n *Node) proposeBatchLocked(batch *protocol.ComputedBatch) {
CommitteeID: epoch.GetExecutorCommitteeID(),
Header: batch.Header,
RakSig: batch.RakSig,
TxnSchedSig: state.txnSchedSig,
InputRoot: state.ioRoot,
InputStorageSigs: state.inputStorageSigs,
TxnSchedSig: state.batch.txnSchedSignature,
InputRoot: state.batch.ioRoot.Hash,
InputStorageSigs: state.batch.storageSignatures,
}

// Commit I/O and state write logs to storage.
start := time.Now()
err := func() error {
span, ctx := tracing.StartSpanWithContext(n.ctx, "Apply(io, state)",
opentracing.ChildOf(state.batchSpanCtx),
opentracing.ChildOf(state.batch.spanCtx),
)
defer span.Finish()

Expand All @@ -634,7 +626,7 @@ func (n *Node) proposeBatchLocked(batch *protocol.ComputedBatch) {
// I/O root.
storage.ApplyOp{
SrcRound: lastHeader.Round + 1,
SrcRoot: state.ioRoot,
SrcRoot: state.batch.ioRoot.Hash,
DstRoot: batch.Header.IORoot,
WriteLog: batch.IOWriteLog,
},
Expand Down Expand Up @@ -708,9 +700,9 @@ func (n *Node) proposeBatchLocked(batch *protocol.ComputedBatch) {

// Publish commitment to merge committee.
spanPublish := opentracing.StartSpan("PublishExecuteFinished(commitment)",
opentracing.ChildOf(state.batchSpanCtx),
opentracing.ChildOf(state.batch.spanCtx),
)
err = n.commonNode.Group.PublishExecuteFinished(state.batchSpanCtx, commit)
err = n.commonNode.Group.PublishExecuteFinished(state.batch.spanCtx, commit)
if err != nil {
spanPublish.Finish()
n.logger.Error("failed to publish results to committee",
Expand All @@ -736,7 +728,7 @@ func (n *Node) proposeBatchLocked(batch *protocol.ComputedBatch) {
if n.mergeNode == nil {
n.logger.Error("scheduler says we are a merge worker, but we are not")
} else {
n.mergeNode.HandleResultsFromExecutorWorkerLocked(state.batchSpanCtx, commit)
n.mergeNode.HandleResultsFromExecutorWorkerLocked(state.batch.spanCtx, commit)
}
}

Expand Down Expand Up @@ -799,7 +791,7 @@ func (n *Node) HandleNewEventLocked(ev *roothash.Event) {

// Backup worker, start processing a batch.
n.logger.Info("backup worker activating and processing batch")
n.startProcessingBatchLocked(state.ioRoot, state.batch, state.batchSpanCtx, state.txnSchedSig, state.inputStorageSigs)
n.startProcessingBatchLocked(state.batch)
}

// HandleNodeUpdateLocked implements NodeHooks.
Expand All @@ -809,15 +801,7 @@ func (n *Node) HandleNodeUpdateLocked(update *runtimeCommittee.NodeUpdate, snaps
}

// Guarded by n.commonNode.CrossNode.
func (n *Node) handleExternalBatchLocked(
committeeID hash.Hash,
ioRoot hash.Hash,
batch transaction.RawBatch,
batchSpanCtx opentracing.SpanContext,
hdr block.Header,
txnSchedSig signature.Signature,
inputStorageSigs []signature.Signature,
) error {
func (n *Node) handleExternalBatchLocked(committeeID hash.Hash, batch *unresolvedBatch, hdr block.Header) error {
// If we are not waiting for a batch, don't do anything.
if _, ok := n.state.(StateWaitingForBatch); !ok {
return errIncorrectState
Expand All @@ -843,7 +827,7 @@ func (n *Node) handleExternalBatchLocked(

// Check if we have the correct block -- in this case, start processing the batch.
if n.commonNode.CurrentBlock.Header.MostlyEqual(&hdr) {
n.maybeStartProcessingBatchLocked(ioRoot, batch, batchSpanCtx, txnSchedSig, inputStorageSigs)
n.maybeStartProcessingBatchLocked(batch)
return nil
}

Expand All @@ -860,12 +844,8 @@ func (n *Node) handleExternalBatchLocked(

// Wait for the correct block to arrive.
n.transitionLocked(StateWaitingForBlock{
ioRoot: ioRoot,
batch: batch,
batchSpanCtx: batchSpanCtx,
header: &hdr,
txnSchedSig: txnSchedSig,
inputStorageSigs: inputStorageSigs,
batch: batch,
header: &hdr,
})

return nil
Expand Down
Loading

0 comments on commit 695b240

Please sign in to comment.