From 60491d10371d6575e64de522aa656cf1be72e11c Mon Sep 17 00:00:00 2001 From: ptrus Date: Thu, 8 Oct 2020 17:04:39 +0200 Subject: [PATCH] go/worker/executor: Submit failure indicating commitment in case of storage failure --- go/worker/compute/executor/committee/node.go | 26 +++++++++++++------- 1 file changed, 17 insertions(+), 9 deletions(-) diff --git a/go/worker/compute/executor/committee/node.go b/go/worker/compute/executor/committee/node.go index 033a3506f06..649f61cc7e8 100644 --- a/go/worker/compute/executor/committee/node.go +++ b/go/worker/compute/executor/committee/node.go @@ -1094,7 +1094,7 @@ func (n *Node) proposeBatchLocked(processedBatch *processedBatch) { // Commit I/O and state write logs to storage. start := time.Now() - err := func() error { + storageErr := func() error { span, ctx := tracing.StartSpanWithContext(n.ctx, "Apply(io, state)", opentracing.ChildOf(state.batch.spanCtx), ) @@ -1167,9 +1167,11 @@ func (n *Node) proposeBatchLocked(processedBatch *processedBatch) { }() storageCommitLatency.With(n.getMetricLabels()).Observe(time.Since(start).Seconds()) - if err != nil { - n.abortBatchLocked(err) - return + if storageErr != nil { + n.logger.Error("storage failure, sibmitting failure indicating commitment", + "err", storageErr, + ) + proposedResults.Failure = commitment.FailureStorageUnavailable } // Sign the commitment and submit. @@ -1197,11 +1199,17 @@ func (n *Node) proposeBatchLocked(processedBatch *processedBatch) { }() // TODO: Add crash point. - n.transitionLocked(StateWaitingForFinalize{ - batchStartTime: state.batchStartTime, - raw: processedBatch.raw, - proposedIORoot: proposedResults.Header.IORoot, - }) + + switch storageErr { + case nil: + n.transitionLocked(StateWaitingForFinalize{ + batchStartTime: state.batchStartTime, + raw: processedBatch.raw, + proposedIORoot: proposedResults.Header.IORoot, + }) + default: + n.abortBatchLocked(err) + } crash.Here(crashPointBatchProposeAfter) }