Skip to content

Commit

Permalink
go/worker/executor: Extract parts of the main worker function
Browse files Browse the repository at this point in the history
  • Loading branch information
ptrus committed Oct 15, 2020
1 parent f18cdae commit 66063cc
Show file tree
Hide file tree
Showing 3 changed files with 117 additions and 86 deletions.
2 changes: 1 addition & 1 deletion go/roothash/api/commitment/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ func (m *ComputeBody) RootsForStorageReceipt() []hash.Hash {
}
}

// ValidateBasic performs basic compute body commitment parameters checks.
// ValidateBasic performs basic executor commitment validity checks.
func (m *ComputeBody) ValidateBasic() error {
header := &m.Header
switch m.Failure {
Expand Down
17 changes: 8 additions & 9 deletions go/roothash/api/commitment/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,14 @@ func (p *Pool) addOpenExecutorCommitment(
return ErrNotBasedOnCorrectBlock
}

if err := body.ValidateBasic(); err != nil {
logger.Debug("executor commitment validate basic error",
"body", body,
"err", err,
)
return ErrBadExecutorCommitment
}

if err := sv.VerifyTxnSchedulerSignature(body.TxnSchedSig, blk.Header.Round); err != nil {
logger.Debug("executor commitment has bad transaction scheduler signer",
"node_id", id,
Expand All @@ -239,15 +247,6 @@ func (p *Pool) addOpenExecutorCommitment(
return ErrTxnSchedSigInvalid
}

// Validate basic.
if err := body.ValidateBasic(); err != nil {
logger.Debug("executor commitment validate basic error",
"body", body,
"err", err,
)
return ErrBadExecutorCommitment
}

switch openCom.IsIndicatingFailure() {
case true:
default:
Expand Down
184 changes: 108 additions & 76 deletions go/worker/compute/executor/committee/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/oasisprotocol/oasis-core/go/roothash/api/block"
"github.com/oasisprotocol/oasis-core/go/roothash/api/commitment"
runtimeCommittee "github.com/oasisprotocol/oasis-core/go/runtime/committee"
"github.com/oasisprotocol/oasis-core/go/runtime/host"
"github.com/oasisprotocol/oasis-core/go/runtime/host/protocol"
"github.com/oasisprotocol/oasis-core/go/runtime/scheduling"
schedulingAPI "github.com/oasisprotocol/oasis-core/go/runtime/scheduling/api"
Expand Down Expand Up @@ -134,8 +135,9 @@ var (
type Node struct { // nolint: maligned
*commonWorker.RuntimeHostNode

lastScheduledCache *lru.Cache
runtimeVersion version.Version

lastScheduledCache *lru.Cache
scheduleCheckTxEnabled bool
scheduleMaxQueueSize uint64

Expand Down Expand Up @@ -1175,17 +1177,39 @@ func (n *Node) proposeBatchLocked(processedBatch *processedBatch) {
proposedResults.SetFailure(commitment.FailureStorageUnavailable)
}

// Sign the commitment and submit.
commit, err := commitment.SignExecutorCommitment(n.commonNode.Identity.NodeSigner, proposedResults)
if err != nil {
n.logger.Error("failed to sign commitment",
if err := n.signAndSubmitCommitment(proposedResults); err != nil {
n.logger.Error("failed to sign and submit the commitment",
"commit", proposedResults,
"err", err,
)
n.abortBatchLocked(err)
return
}

// Publish commitment to the consensus layer.
switch storageErr {
case nil:
n.transitionLocked(StateWaitingForFinalize{
batchStartTime: state.batchStartTime,
raw: processedBatch.raw,
proposedIORoot: *proposedResults.Header.IORoot,
})
default:
n.abortBatchLocked(storageErr)
}

crash.Here(crashPointBatchProposeAfter)
}

func (n *Node) signAndSubmitCommitment(body *commitment.ComputeBody) error {
commit, err := commitment.SignExecutorCommitment(n.commonNode.Identity.NodeSigner, body)
if err != nil {
n.logger.Error("failed to sign commitment",
"commit", body,
"err", err,
)
return err
}

tx := roothash.NewExecutorCommitTx(0, nil, n.commonNode.Runtime.ID(), []commitment.ExecutorCommitment{*commit})
go func() {
commitErr := consensus.SignAndSubmitTx(n.roundCtx, n.commonNode.Consensus, n.commonNode.Identity.NodeSigner, tx)
Expand All @@ -1194,25 +1218,13 @@ func (n *Node) proposeBatchLocked(processedBatch *processedBatch) {
n.logger.Info("executor commit finalized")
default:
n.logger.Error("failed to submit executor commit",
"commit", body,
"err", commitErr,
)
}
}()

// TODO: Add crash point.

switch storageErr {
case nil:
n.transitionLocked(StateWaitingForFinalize{
batchStartTime: state.batchStartTime,
raw: processedBatch.raw,
proposedIORoot: *proposedResults.Header.IORoot,
})
default:
n.abortBatchLocked(err)
}

crash.Here(crashPointBatchProposeAfter)
return nil
}

// HandleNewEventLocked implements NodeHooks.
Expand Down Expand Up @@ -1300,6 +1312,80 @@ func (n *Node) handleExternalBatchLocked(batch *unresolvedBatch, hdr block.Heade
return nil
}

func (n *Node) handleRuntimeHostEvent(ev *host.Event) {
switch {
case ev.Started != nil:
// We are now able to service requests for this runtime.
n.runtimeVersion = ev.Started.Version

n.roleProvider.SetAvailable(func(nd *node.Node) error {
rt := nd.AddOrUpdateRuntime(n.commonNode.Runtime.ID())
rt.Version = n.runtimeVersion
rt.Capabilities.TEE = ev.Started.CapabilityTEE
return nil
})
case ev.Updated != nil:
// Update runtime capabilities.
n.roleProvider.SetAvailable(func(nd *node.Node) error {
rt := nd.AddOrUpdateRuntime(n.commonNode.Runtime.ID())
rt.Version = n.runtimeVersion
rt.Capabilities.TEE = ev.Updated.CapabilityTEE
return nil
})
case ev.FailedToStart != nil, ev.Stopped != nil:
// Runtime failed to start or was stopped -- we can no longer service requests.
n.roleProvider.SetUnavailable()
default:
// Unknown event.
n.logger.Warn("unknown worker event",
"ev", ev,
)
}
}

func (n *Node) handleProcessedBatch(batch *processedBatch, processingCh chan *processedBatch) {
n.commonNode.CrossNode.Lock()
defer n.commonNode.CrossNode.Unlock()

// To avoid stale events, check if the stored state is still valid.
state, ok := n.state.(StateProcessingBatch)
if !ok || state.done != processingCh {
return
}

if batch != nil && batch.computed != nil {
n.logger.Info("worker has finished processing a batch")
n.proposeBatchLocked(batch)

return
}

n.logger.Warn("worker has aborted batch processing")

// Submit a failure indicating commitment.
n.logger.Debug("submitting failure indicating commitment")
header := n.commonNode.CurrentBlock.Header
commit := &commitment.ComputeBody{
Header: commitment.ComputeResultsHeader{
Round: header.Round + 1,
PreviousHash: header.EncodedHash(),
},
TxnSchedSig: state.batch.txnSchedSignature,
InputRoot: state.batch.ioRoot.Hash,
InputStorageSigs: state.batch.storageSignatures,
}
commit.SetFailure(commitment.FailureUnknown)

if err := n.signAndSubmitCommitment(commit); err != nil {
n.logger.Error("failed to sign and submit the commitment",
"commit", commit,
"err", err,
)
}

n.abortBatchLocked(errRuntimeAborted)
}

func (n *Node) worker() {
defer close(n.quitCh)
defer (n.cancelCtx)()
Expand Down Expand Up @@ -1393,7 +1479,6 @@ func (n *Node) worker() {
// We are initialized.
close(n.initCh)

var runtimeVersion version.Version
for {
// Check if we are currently processing a batch. In this case, we also
// need to select over the result channel.
Expand All @@ -1413,63 +1498,10 @@ func (n *Node) worker() {
n.logger.Info("termination requested")
return
case ev := <-hrtEventCh:
switch {
case ev.Started != nil:
// We are now able to service requests for this runtime.
runtimeVersion = ev.Started.Version

n.roleProvider.SetAvailable(func(nd *node.Node) error {
rt := nd.AddOrUpdateRuntime(n.commonNode.Runtime.ID())
rt.Version = runtimeVersion
rt.Capabilities.TEE = ev.Started.CapabilityTEE
return nil
})
case ev.Updated != nil:
// Update runtime capabilities.
n.roleProvider.SetAvailable(func(nd *node.Node) error {
rt := nd.AddOrUpdateRuntime(n.commonNode.Runtime.ID())
rt.Version = runtimeVersion
rt.Capabilities.TEE = ev.Updated.CapabilityTEE
return nil
})
case ev.FailedToStart != nil, ev.Stopped != nil:
// Runtime failed to start or was stopped -- we can no longer service requests.
n.roleProvider.SetUnavailable()
default:
// Unknown event.
n.logger.Warn("unknown worker event",
"ev", ev,
)
}
n.handleRuntimeHostEvent(ev)
case batch := <-processingDoneCh:
// Batch processing has finished.
if batch == nil || batch.computed == nil {
n.logger.Warn("worker has aborted batch processing")
func() {
n.commonNode.CrossNode.Lock()
defer n.commonNode.CrossNode.Unlock()

// To avoid stale events, check if the stored state is still valid.
if state, ok := n.state.(StateProcessingBatch); !ok || state.done != processingDoneCh {
return
}
n.abortBatchLocked(errRuntimeAborted)
}()
break
}

n.logger.Info("worker has finished processing a batch")

func() {
n.commonNode.CrossNode.Lock()
defer n.commonNode.CrossNode.Unlock()

// To avoid stale events, check if the stored state is still valid.
if state, ok := n.state.(StateProcessingBatch); !ok || state.done != processingDoneCh {
return
}
n.proposeBatchLocked(batch)
}()
n.handleProcessedBatch(batch, processingDoneCh)
case runtime := <-rtCh:
// XXX: Once there is more than one scheduling algorithm available
// this might need to recreate the scheduler and reinsert
Expand Down

0 comments on commit 66063cc

Please sign in to comment.