Skip to content

Commit

Permalink
Merge pull request #3801 from oasisprotocol/ptrus/fix/executor-node
Browse files Browse the repository at this point in the history
go/worker/executor: avoid holding lock while applying processed batch to storage
  • Loading branch information
ptrus authored Mar 18, 2021
2 parents 1e16eed + 2a61257 commit 653d1aa
Show file tree
Hide file tree
Showing 2 changed files with 82 additions and 61 deletions.
4 changes: 4 additions & 0 deletions .changelog/3801.bugfix.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
go/worker/executor: avoid holding lock while applying processed batch

Before, the executor worker would hold the `CrossNode` lock, while doing
requests to storage.
139 changes: 78 additions & 61 deletions go/worker/compute/executor/committee/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -658,7 +658,7 @@ func (n *Node) removeTxBatch(batch [][]byte) error {
return nil
}

func (n *Node) proposeTimeoutLocked() error {
func (n *Node) proposeTimeoutLocked(roundCtx context.Context) error {
// Do not propose a timeout if we are already proposing it.
// The flag will get cleared on the next round or if the propose timeout
// tx fails.
Expand All @@ -669,7 +669,7 @@ func (n *Node) proposeTimeoutLocked() error {
if n.commonNode.CurrentBlock == nil {
return fmt.Errorf("executor: propose timeout error, nil block")
}
rt, err := n.commonNode.Runtime.ActiveDescriptor(n.roundCtx)
rt, err := n.commonNode.Runtime.ActiveDescriptor(roundCtx)
if err != nil {
return err
}
Expand Down Expand Up @@ -701,11 +701,11 @@ func (n *Node) proposeTimeoutLocked() error {
// scheduler node proposing a batch.
select {
case <-time.After(proposeTimeoutDelay):
case <-n.roundCtx.Done():
case <-roundCtx.Done():
return
}

err := consensus.SignAndSubmitTx(n.roundCtx, n.commonNode.Consensus, n.commonNode.Identity.NodeSigner, tx)
err := consensus.SignAndSubmitTx(roundCtx, n.commonNode.Consensus, n.commonNode.Identity.NodeSigner, tx)
switch err {
case nil:
n.logger.Info("executor timeout request finalized",
Expand Down Expand Up @@ -750,30 +750,31 @@ func (n *Node) getRtStateAndRoundResults(ctx context.Context, height int64) (*ro
}

func (n *Node) handleScheduleBatch(force bool) {
epoch, lastHeader, roundResults, err := func() (*committee.EpochSnapshot, *block.Header, *roothash.RoundResults, error) {
roundCtx, epoch, lastHeader, roundResults, err := func() (context.Context, *committee.EpochSnapshot, *block.Header, *roothash.RoundResults, error) {
n.commonNode.CrossNode.Lock()
defer n.commonNode.CrossNode.Unlock()
roundCtx := n.roundCtx

// If we are not waiting for a batch, don't do anything.
if _, ok := n.state.(StateWaitingForBatch); !ok {
return nil, nil, nil, errIncorrectState
return roundCtx, nil, nil, nil, errIncorrectState
}
if n.commonNode.CurrentBlock == nil {
return nil, nil, nil, errNoBlocks
return roundCtx, nil, nil, nil, errNoBlocks
}
header := n.commonNode.CurrentBlock.Header
epoch := n.commonNode.Group.GetEpochSnapshot()

// If we are not an executor worker in this epoch, we don't need to do anything.
if !epoch.IsExecutorWorker() {
return nil, nil, nil, errNotTxnScheduler
return roundCtx, nil, nil, nil, errNotTxnScheduler
}

_, roundResults, err := n.getRtStateAndRoundResults(n.roundCtx, n.commonNode.CurrentBlockHeight)
_, roundResults, err := n.getRtStateAndRoundResults(roundCtx, n.commonNode.CurrentBlockHeight)
if err != nil {
return nil, nil, nil, err
return roundCtx, nil, nil, nil, err
}
return epoch, &header, roundResults, nil
return roundCtx, epoch, &header, roundResults, nil
}()
if err != nil {
n.logger.Debug("not scheduling a batch",
Expand Down Expand Up @@ -804,7 +805,7 @@ func (n *Node) handleScheduleBatch(force bool) {
if _, ok := n.state.(StateWaitingForBatch); !ok || lastHeader.Round != n.commonNode.CurrentBlock.Header.Round {
return errIncorrectState
}
return n.proposeTimeoutLocked()
return n.proposeTimeoutLocked(roundCtx)
}()
switch err {
case nil:
Expand Down Expand Up @@ -844,15 +845,15 @@ func (n *Node) handleScheduleBatch(force bool) {
defer ioTree.Close()

for idx, tx := range batch {
if err = ioTree.AddTransaction(n.ctx, transaction.Transaction{Input: tx, BatchOrder: uint32(idx)}, nil); err != nil {
if err = ioTree.AddTransaction(roundCtx, transaction.Transaction{Input: tx, BatchOrder: uint32(idx)}, nil); err != nil {
n.logger.Error("failed to create I/O tree",
"err", err,
)
return
}
}

ioWriteLog, ioRoot, err := ioTree.Commit(n.ctx)
ioWriteLog, ioRoot, err := ioTree.Commit(roundCtx)
if err != nil {
n.logger.Error("failed to create I/O tree",
"err", err,
Expand All @@ -861,11 +862,11 @@ func (n *Node) handleScheduleBatch(force bool) {
}

// Commit I/O tree to storage and obtain receipts.
spanInsert, ctx := tracing.StartSpanWithContext(n.ctx, "Apply(ioWriteLog)",
spanInsert, roundCtx := tracing.StartSpanWithContext(roundCtx, "Apply(ioWriteLog)",
opentracing.ChildOf(batchSpanCtx),
)

ioReceipts, err := n.commonNode.Group.Storage().Apply(ctx, &storage.ApplyRequest{
ioReceipts, err := n.commonNode.Group.Storage().Apply(roundCtx, &storage.ApplyRequest{
Namespace: lastHeader.Namespace,
RootType: storage.RootTypeIO,
SrcRound: lastHeader.Round + 1,
Expand Down Expand Up @@ -910,17 +911,10 @@ func (n *Node) handleScheduleBatch(force bool) {
n.commonNode.CrossNode.Lock()
defer n.commonNode.CrossNode.Unlock()

// If we are not waiting for a batch, don't do anything.
if _, ok := n.state.(StateWaitingForBatch); !ok {
n.logger.Error("new state since started the dispatch",
// Make sure we are still in the right state/round.
if _, ok := n.state.(StateWaitingForBatch); !ok || lastHeader.Round != n.commonNode.CurrentBlock.Header.Round {
n.logger.Error("new state or round since started the dispatch",
"state", n.state,
)
return
}

// Ensure we are still in the same round as when we started the dispatch.
if lastHeader.Round != n.commonNode.CurrentBlock.Header.Round {
n.logger.Error("new round since started the dispatch",
"expected_round", lastHeader.Round,
"round", n.commonNode.CurrentBlock.Header.Round,
)
Expand Down Expand Up @@ -994,7 +988,7 @@ func (n *Node) startProcessingBatchLocked(batch *unresolvedBatch) {
)

// Create batch processing context and channel for receiving the response.
ctx, cancel := context.WithCancel(n.ctx)
ctx, cancel := context.WithCancel(n.roundCtx)
done := make(chan *processedBatch, 1)

batchStartTime := time.Now()
Expand Down Expand Up @@ -1124,54 +1118,55 @@ func (n *Node) abortBatchLocked(reason error) {
})
}

// Guarded by n.commonNode.CrossNode.
func (n *Node) proposeBatchLocked(processedBatch *processedBatch) {
batch := processedBatch.computed
// We must be in ProcessingBatch state if we are here.
state := n.state.(StateProcessingBatch)

func (n *Node) proposeBatch(
roundCtx context.Context,
lastHeader *block.Header,
unresolved *unresolvedBatch,
processed *processedBatch,
) {
crash.Here(crashPointBatchProposeBefore)

batch := processed.computed
epoch := n.commonNode.Group.GetEpochSnapshot()

n.logger.Debug("proposing batch",
"batch", batch,
)

epoch := n.commonNode.Group.GetEpochSnapshot()

// Generate proposed compute results.
rakSig := batch.RakSig
proposedResults := &commitment.ComputeBody{
Header: batch.Header,
RakSig: &rakSig,
TxnSchedSig: state.batch.txnSchedSignature,
InputRoot: state.batch.ioRoot.Hash,
InputStorageSigs: state.batch.storageSignatures,
TxnSchedSig: unresolved.txnSchedSignature,
InputRoot: unresolved.ioRoot.Hash,
InputStorageSigs: unresolved.storageSignatures,
}
// If we are the transaction scheduler also include all the emitted messages.
if epoch.IsTransactionScheduler(n.commonNode.CurrentBlock.Header.Round) {
if epoch.IsTransactionScheduler(lastHeader.Round) {
proposedResults.Messages = batch.Messages
}

// Commit I/O and state write logs to storage.
start := time.Now()
storageErr := func() error {
span, ctx := tracing.StartSpanWithContext(n.ctx, "Apply(io, state)",
opentracing.ChildOf(state.batch.spanCtx),
start := time.Now()
defer storageCommitLatency.With(n.getMetricLabels()).Observe(time.Since(start).Seconds())

span, ctx := tracing.StartSpanWithContext(roundCtx, "Apply(io, state)",
opentracing.ChildOf(unresolved.spanCtx),
)
defer span.Finish()

ctx, cancel := context.WithTimeout(ctx, n.commonCfg.StorageCommitTimeout)
defer cancel()

lastHeader := n.commonNode.CurrentBlock.Header

// NOTE: Order is important for verifying the receipt.
applyOps := []storage.ApplyOp{
// I/O root.
{
RootType: storage.RootTypeIO,
SrcRound: lastHeader.Round + 1,
SrcRoot: state.batch.ioRoot.Hash,
SrcRoot: unresolved.ioRoot.Hash,
DstRoot: *batch.Header.IORoot,
WriteLog: batch.IOWriteLog,
},
Expand Down Expand Up @@ -1227,16 +1222,29 @@ func (n *Node) proposeBatchLocked(processedBatch *processedBatch) {

return nil
}()
storageCommitLatency.With(n.getMetricLabels()).Observe(time.Since(start).Seconds())

if storageErr != nil {
n.logger.Error("storage failure, submitting failure indicating commitment",
"err", storageErr,
)
proposedResults.SetFailure(commitment.FailureStorageUnavailable)
}

if err := n.signAndSubmitCommitment(proposedResults); err != nil {
// Submit commitment.
n.commonNode.Group.Lock()
defer n.commonNode.Group.Unlock()

// Make sure we are still in the right state/round.
state, ok := n.state.(StateProcessingBatch)
if !ok || lastHeader.Round != n.commonNode.CurrentBlock.Header.Round {
n.logger.Error("new state or round since started proposing batch",
"state", state,
"round", n.commonNode.CurrentBlock.Header.Round,
"expected_round", lastHeader.Round,
)
return
}

if err := n.signAndSubmitCommitment(roundCtx, proposedResults); err != nil {
n.logger.Error("failed to sign and submit the commitment",
"commit", proposedResults,
"err", err,
Expand All @@ -1249,7 +1257,7 @@ func (n *Node) proposeBatchLocked(processedBatch *processedBatch) {
case nil:
n.transitionLocked(StateWaitingForFinalize{
batchStartTime: state.batchStartTime,
raw: processedBatch.raw,
raw: processed.raw,
proposedIORoot: *proposedResults.Header.IORoot,
})
default:
Expand All @@ -1259,7 +1267,7 @@ func (n *Node) proposeBatchLocked(processedBatch *processedBatch) {
crash.Here(crashPointBatchProposeAfter)
}

func (n *Node) signAndSubmitCommitment(body *commitment.ComputeBody) error {
func (n *Node) signAndSubmitCommitment(roundCtx context.Context, body *commitment.ComputeBody) error {
commit, err := commitment.SignExecutorCommitment(n.commonNode.Identity.NodeSigner, n.commonNode.Runtime.ID(), body)
if err != nil {
n.logger.Error("failed to sign commitment",
Expand All @@ -1271,7 +1279,7 @@ func (n *Node) signAndSubmitCommitment(body *commitment.ComputeBody) error {

tx := roothash.NewExecutorCommitTx(0, nil, n.commonNode.Runtime.ID(), []commitment.ExecutorCommitment{*commit})
go func() {
commitErr := consensus.SignAndSubmitTx(n.roundCtx, n.commonNode.Consensus, n.commonNode.Identity.NodeSigner, tx)
commitErr := consensus.SignAndSubmitTx(roundCtx, n.commonNode.Consensus, n.commonNode.Identity.NodeSigner, tx)
switch commitErr {
case nil:
n.logger.Info("executor commit finalized")
Expand Down Expand Up @@ -1408,38 +1416,47 @@ func (n *Node) handleRuntimeHostEvent(ev *host.Event) {

func (n *Node) handleProcessedBatch(batch *processedBatch, processingCh chan *processedBatch) {
n.commonNode.CrossNode.Lock()
defer n.commonNode.CrossNode.Unlock()

// To avoid stale events, check if the stored state is still valid.
// XXX: processingCh not changing ensures we are in the same state and not
// in a "new" processing batch state. This also ensures that the round did
// not change.
state, ok := n.state.(StateProcessingBatch)
if !ok || state.done != processingCh {
n.commonNode.CrossNode.Unlock()
return
}
roundCtx := n.roundCtx
lastHeader := n.commonNode.CurrentBlock.Header

// Successfully processed a batch.
if batch != nil && batch.computed != nil {
stateBatch := state.batch
n.commonNode.CrossNode.Unlock()
n.logger.Info("worker has finished processing a batch")
n.proposeBatchLocked(batch)

n.proposeBatch(roundCtx, &lastHeader, stateBatch, batch)
return
}

n.logger.Warn("worker has aborted batch processing")
defer n.commonNode.CrossNode.Unlock()

// Submit a failure indicating commitment.
n.logger.Debug("submitting failure indicating commitment")
header := n.commonNode.CurrentBlock.Header
// Unsuccessful batch processing.
n.logger.Warn("worker has aborted batch processing")
commit := &commitment.ComputeBody{
Header: commitment.ComputeResultsHeader{
Round: header.Round + 1,
PreviousHash: header.EncodedHash(),
Round: lastHeader.Round + 1,
PreviousHash: lastHeader.EncodedHash(),
},
TxnSchedSig: state.batch.txnSchedSignature,
InputRoot: state.batch.ioRoot.Hash,
InputStorageSigs: state.batch.storageSignatures,
}
commit.SetFailure(commitment.FailureUnknown)

if err := n.signAndSubmitCommitment(commit); err != nil {
n.logger.Debug("submitting failure indicating commitment",
"commitment", commit,
)
if err := n.signAndSubmitCommitment(roundCtx, commit); err != nil {
n.logger.Error("failed to sign and submit the commitment",
"commit", commit,
"err", err,
Expand Down

0 comments on commit 653d1aa

Please sign in to comment.