From a780b94bc1466082eb6dae9ad46c857de5b6b090 Mon Sep 17 00:00:00 2001 From: ptrus Date: Wed, 30 Sep 2020 10:47:15 +0200 Subject: [PATCH] go/executor/node: ensure correct state before publishing a message --- .changelog/3342.bugfix.md | 1 + go/worker/compute/executor/committee/node.go | 40 ++++++++++---------- 2 files changed, 21 insertions(+), 20 deletions(-) create mode 100644 .changelog/3342.bugfix.md diff --git a/.changelog/3342.bugfix.md b/.changelog/3342.bugfix.md new file mode 100644 index 00000000000..4baf344b001 --- /dev/null +++ b/.changelog/3342.bugfix.md @@ -0,0 +1 @@ +go/executor/node: ensure correct state before publishing a message diff --git a/go/worker/compute/executor/committee/node.go b/go/worker/compute/executor/committee/node.go index 571b292fdcf..99250c9fac3 100644 --- a/go/worker/compute/executor/committee/node.go +++ b/go/worker/compute/executor/committee/node.go @@ -865,6 +865,26 @@ func (n *Node) Dispatch(batch transaction.RawBatch) error { return fmt.Errorf("failed to sign txn scheduler batch: %w", err) } + n.commonNode.CrossNode.Lock() + defer n.commonNode.CrossNode.Unlock() + + // If we are not waiting for a batch, don't do anything. + if _, ok := n.state.(StateWaitingForBatch); !ok { + n.logger.Error("new state since started the dispatch", + "state", n.state, + ) + return errIncorrectState + } + + // Ensure we are still in the same round as when we started the dispatch. + if lastHeader.Round != n.commonNode.CurrentBlock.Header.Round { + n.logger.Error("new round since started the dispatch", + "expected_round", lastHeader.Round, + "round", n.commonNode.CurrentBlock.Header.Round, + ) + return errSeenNewerBlock + } + n.logger.Debug("dispatching a new batch proposal", "io_root", ioRoot, "num_txs", len(batch), @@ -886,26 +906,6 @@ func (n *Node) Dispatch(batch transaction.RawBatch) error { crash.Here(crashPointBatchPublishAfter) spanPublish.Finish() - n.commonNode.CrossNode.Lock() - defer n.commonNode.CrossNode.Unlock() - - // If we are not waiting for a batch, don't do anything. - if _, ok := n.state.(StateWaitingForBatch); !ok { - n.logger.Error("new state since started the dispatch", - "state", n.state, - ) - return errIncorrectState - } - - // Ensure we are still in the same round as when we started the dispatch. - if lastHeader.Round != n.commonNode.CurrentBlock.Header.Round { - n.logger.Error("new round since started the dispatch", - "expected_round", lastHeader.Round, - "round", n.commonNode.CurrentBlock.Header.Round, - ) - return errSeenNewerBlock - } - // Also process the batch locally. n.handleInternalBatchLocked( batchSpanCtx,