Skip to content

Commit

Permalink
go/worker/executor: don't hold the lock during storage requests
Browse files Browse the repository at this point in the history
  • Loading branch information
ptrus committed Sep 24, 2020
1 parent c2355a8 commit 13b2407
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 31 deletions.
1 change: 1 addition & 0 deletions .changelog/3320.bugfix.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
go/worker/executor: don't unnecessarily hold the lock during storage requests
69 changes: 38 additions & 31 deletions go/worker/compute/executor/committee/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -752,37 +752,43 @@ func (n *Node) proposeTimeoutLocked() error {

// Dispatch dispatches a batch to the executor committee.
func (n *Node) Dispatch(batch transaction.RawBatch) error {
n.commonNode.CrossNode.Lock()
defer n.commonNode.CrossNode.Unlock()
lastHeader, err := func() (*block.Header, error) {
n.commonNode.CrossNode.Lock()
defer n.commonNode.CrossNode.Unlock()

// If we are not waiting for a batch, don't do anything.
if _, ok := n.state.(StateWaitingForBatch); !ok {
return errIncorrectState
}
if n.commonNode.CurrentBlock == nil {
return errNoBlocks
}
// If we are not waiting for a batch, don't do anything.
if _, ok := n.state.(StateWaitingForBatch); !ok {
return nil, errIncorrectState
}
if n.commonNode.CurrentBlock == nil {
return nil, errNoBlocks
}
header := n.commonNode.CurrentBlock.Header
round := header.Round
epoch := n.commonNode.Group.GetEpochSnapshot()

epoch := n.commonNode.Group.GetEpochSnapshot()
round := n.commonNode.CurrentBlock.Header.Round
switch {
case epoch.IsTransactionScheduler(round):
// Continues bellow.
case epoch.IsExecutorWorker():
// If we are an executor and not a scheduler try proposing a timeout.
err := n.proposeTimeoutLocked()
if err != nil {
n.logger.Error("error proposing a timeout",
"err", err,
)
}

switch {
case epoch.IsTransactionScheduler(round):
// Continues bellow.
case epoch.IsExecutorWorker():
// If we are an executor and not a scheduler try proposing a timeout.
err := n.proposeTimeoutLocked()
if err != nil {
n.logger.Error("error proposing a timeout",
"err", err,
)
fallthrough
default:
// XXX: Always return an error here in case we are not a txn scheduler,
// so that the batch is reinserted back into the queue.
return nil, errNotTxnScheduler
}

fallthrough
default:
// XXX: Always return an error here in case we are not a txn scheduler,
// so that the batch is reinserted back into the queue.
return errNotTxnScheduler
return &header, nil
}()
if err != nil {
return err
}

// Scheduler node opens a new parent span for batch processing.
Expand All @@ -792,7 +798,6 @@ func (n *Node) Dispatch(batch transaction.RawBatch) error {

// Generate the initial I/O root containing only the inputs (outputs and
// tags will be added later by the executor nodes).
lastHeader := n.commonNode.CurrentBlock.Header
emptyRoot := storage.Root{
Namespace: lastHeader.Namespace,
Version: lastHeader.Round + 1,
Expand All @@ -803,7 +808,7 @@ func (n *Node) Dispatch(batch transaction.RawBatch) error {
defer ioTree.Close()

for idx, tx := range batch {
if err := ioTree.AddTransaction(n.ctx, transaction.Transaction{Input: tx, BatchOrder: uint32(idx)}, nil); err != nil {
if err = ioTree.AddTransaction(n.ctx, transaction.Transaction{Input: tx, BatchOrder: uint32(idx)}, nil); err != nil {
n.logger.Error("failed to create I/O tree",
"err", err,
)
Expand Down Expand Up @@ -844,7 +849,7 @@ func (n *Node) Dispatch(batch transaction.RawBatch) error {
// Dispatch batch to group.
spanPublish := opentracing.StartSpan("PublishScheduledBatch(batchHash, header)",
opentracing.Tag{Key: "ioRoot", Value: ioRoot},
opentracing.Tag{Key: "header", Value: n.commonNode.CurrentBlock.Header},
opentracing.Tag{Key: "header", Value: lastHeader},
opentracing.ChildOf(batchSpanCtx),
)
ioReceiptSignatures := []signature.Signature{}
Expand All @@ -855,7 +860,7 @@ func (n *Node) Dispatch(batch transaction.RawBatch) error {
dispatchMsg := &commitment.ProposedBatch{
IORoot: ioRoot,
StorageSignatures: ioReceiptSignatures,
Header: n.commonNode.CurrentBlock.Header,
Header: *lastHeader,
}
signedDispatchMsg, err := commitment.SignProposedBatch(n.commonNode.Identity.NodeSigner, dispatchMsg)
if err != nil {
Expand Down Expand Up @@ -886,6 +891,8 @@ func (n *Node) Dispatch(batch transaction.RawBatch) error {
crash.Here(crashPointBatchPublishAfter)
spanPublish.Finish()

n.commonNode.CrossNode.Lock()
defer n.commonNode.CrossNode.Unlock()
// Also process the batch locally.
n.handleInternalBatchLocked(
batchSpanCtx,
Expand Down

0 comments on commit 13b2407

Please sign in to comment.