diff --git a/go/roothash/api/commitment/executor.go b/go/roothash/api/commitment/executor.go index 516dfe0fa4e..0205996b3ab 100644 --- a/go/roothash/api/commitment/executor.go +++ b/go/roothash/api/commitment/executor.go @@ -126,7 +126,7 @@ func (m *ComputeBody) RootsForStorageReceipt() []hash.Hash { } } -// ValidateBasic performs basic compute body commitment parameters checks. +// ValidateBasic performs basic executor commitment validity checks. func (m *ComputeBody) ValidateBasic() error { header := &m.Header switch m.Failure { diff --git a/go/roothash/api/commitment/pool.go b/go/roothash/api/commitment/pool.go index 9edbb1c6423..a471556f815 100644 --- a/go/roothash/api/commitment/pool.go +++ b/go/roothash/api/commitment/pool.go @@ -227,6 +227,14 @@ func (p *Pool) addOpenExecutorCommitment( return ErrNotBasedOnCorrectBlock } + if err := body.ValidateBasic(); err != nil { + logger.Debug("executor commitment validate basic error", + "body", body, + "err", err, + ) + return ErrBadExecutorCommitment + } + if err := sv.VerifyTxnSchedulerSignature(body.TxnSchedSig, blk.Header.Round); err != nil { logger.Debug("executor commitment has bad transaction scheduler signer", "node_id", id, @@ -239,15 +247,6 @@ func (p *Pool) addOpenExecutorCommitment( return ErrTxnSchedSigInvalid } - // Validate basic. - if err := body.ValidateBasic(); err != nil { - logger.Debug("executor commitment validate basic error", - "body", body, - "err", err, - ) - return ErrBadExecutorCommitment - } - switch openCom.IsIndicatingFailure() { case true: default: diff --git a/go/worker/compute/executor/committee/node.go b/go/worker/compute/executor/committee/node.go index 25449af9e16..a9fb2b71ef9 100644 --- a/go/worker/compute/executor/committee/node.go +++ b/go/worker/compute/executor/committee/node.go @@ -26,6 +26,7 @@ import ( "github.com/oasisprotocol/oasis-core/go/roothash/api/block" "github.com/oasisprotocol/oasis-core/go/roothash/api/commitment" runtimeCommittee "github.com/oasisprotocol/oasis-core/go/runtime/committee" + "github.com/oasisprotocol/oasis-core/go/runtime/host" "github.com/oasisprotocol/oasis-core/go/runtime/host/protocol" "github.com/oasisprotocol/oasis-core/go/runtime/scheduling" schedulingAPI "github.com/oasisprotocol/oasis-core/go/runtime/scheduling/api" @@ -134,8 +135,9 @@ var ( type Node struct { // nolint: maligned *commonWorker.RuntimeHostNode - lastScheduledCache *lru.Cache + runtimeVersion version.Version + lastScheduledCache *lru.Cache scheduleCheckTxEnabled bool scheduleMaxQueueSize uint64 @@ -1175,17 +1177,39 @@ func (n *Node) proposeBatchLocked(processedBatch *processedBatch) { proposedResults.SetFailure(commitment.FailureStorageUnavailable) } - // Sign the commitment and submit. - commit, err := commitment.SignExecutorCommitment(n.commonNode.Identity.NodeSigner, proposedResults) - if err != nil { - n.logger.Error("failed to sign commitment", + if err := n.signAndSubmitCommitment(proposedResults); err != nil { + n.logger.Error("failed to sign and submit the commitment", + "commit", proposedResults, "err", err, ) n.abortBatchLocked(err) return } - // Publish commitment to the consensus layer. + switch storageErr { + case nil: + n.transitionLocked(StateWaitingForFinalize{ + batchStartTime: state.batchStartTime, + raw: processedBatch.raw, + proposedIORoot: *proposedResults.Header.IORoot, + }) + default: + n.abortBatchLocked(storageErr) + } + + crash.Here(crashPointBatchProposeAfter) +} + +func (n *Node) signAndSubmitCommitment(body *commitment.ComputeBody) error { + commit, err := commitment.SignExecutorCommitment(n.commonNode.Identity.NodeSigner, body) + if err != nil { + n.logger.Error("failed to sign commitment", + "commit", body, + "err", err, + ) + return err + } + tx := roothash.NewExecutorCommitTx(0, nil, n.commonNode.Runtime.ID(), []commitment.ExecutorCommitment{*commit}) go func() { commitErr := consensus.SignAndSubmitTx(n.roundCtx, n.commonNode.Consensus, n.commonNode.Identity.NodeSigner, tx) @@ -1194,25 +1218,13 @@ func (n *Node) proposeBatchLocked(processedBatch *processedBatch) { n.logger.Info("executor commit finalized") default: n.logger.Error("failed to submit executor commit", + "commit", body, "err", commitErr, ) } }() - // TODO: Add crash point. - - switch storageErr { - case nil: - n.transitionLocked(StateWaitingForFinalize{ - batchStartTime: state.batchStartTime, - raw: processedBatch.raw, - proposedIORoot: *proposedResults.Header.IORoot, - }) - default: - n.abortBatchLocked(err) - } - - crash.Here(crashPointBatchProposeAfter) + return nil } // HandleNewEventLocked implements NodeHooks. @@ -1300,6 +1312,80 @@ func (n *Node) handleExternalBatchLocked(batch *unresolvedBatch, hdr block.Heade return nil } +func (n *Node) handleRuntimeHostEvent(ev *host.Event) { + switch { + case ev.Started != nil: + // We are now able to service requests for this runtime. + n.runtimeVersion = ev.Started.Version + + n.roleProvider.SetAvailable(func(nd *node.Node) error { + rt := nd.AddOrUpdateRuntime(n.commonNode.Runtime.ID()) + rt.Version = n.runtimeVersion + rt.Capabilities.TEE = ev.Started.CapabilityTEE + return nil + }) + case ev.Updated != nil: + // Update runtime capabilities. + n.roleProvider.SetAvailable(func(nd *node.Node) error { + rt := nd.AddOrUpdateRuntime(n.commonNode.Runtime.ID()) + rt.Version = n.runtimeVersion + rt.Capabilities.TEE = ev.Updated.CapabilityTEE + return nil + }) + case ev.FailedToStart != nil, ev.Stopped != nil: + // Runtime failed to start or was stopped -- we can no longer service requests. + n.roleProvider.SetUnavailable() + default: + // Unknown event. + n.logger.Warn("unknown worker event", + "ev", ev, + ) + } +} + +func (n *Node) handleProcessedBatch(batch *processedBatch, processingCh chan *processedBatch) { + n.commonNode.CrossNode.Lock() + defer n.commonNode.CrossNode.Unlock() + + // To avoid stale events, check if the stored state is still valid. + state, ok := n.state.(StateProcessingBatch) + if !ok || state.done != processingCh { + return + } + + if batch != nil && batch.computed != nil { + n.logger.Info("worker has finished processing a batch") + n.proposeBatchLocked(batch) + + return + } + + n.logger.Warn("worker has aborted batch processing") + + // Submit a failure indicating commitment. + n.logger.Debug("submitting failure indicating commitment") + header := n.commonNode.CurrentBlock.Header + commit := &commitment.ComputeBody{ + Header: commitment.ComputeResultsHeader{ + Round: header.Round + 1, + PreviousHash: header.EncodedHash(), + }, + TxnSchedSig: state.batch.txnSchedSignature, + InputRoot: state.batch.ioRoot.Hash, + InputStorageSigs: state.batch.storageSignatures, + } + commit.SetFailure(commitment.FailureUnknown) + + if err := n.signAndSubmitCommitment(commit); err != nil { + n.logger.Error("failed to sign and submit the commitment", + "commit", commit, + "err", err, + ) + } + + n.abortBatchLocked(errRuntimeAborted) +} + func (n *Node) worker() { defer close(n.quitCh) defer (n.cancelCtx)() @@ -1393,7 +1479,6 @@ func (n *Node) worker() { // We are initialized. close(n.initCh) - var runtimeVersion version.Version for { // Check if we are currently processing a batch. In this case, we also // need to select over the result channel. @@ -1413,63 +1498,10 @@ func (n *Node) worker() { n.logger.Info("termination requested") return case ev := <-hrtEventCh: - switch { - case ev.Started != nil: - // We are now able to service requests for this runtime. - runtimeVersion = ev.Started.Version - - n.roleProvider.SetAvailable(func(nd *node.Node) error { - rt := nd.AddOrUpdateRuntime(n.commonNode.Runtime.ID()) - rt.Version = runtimeVersion - rt.Capabilities.TEE = ev.Started.CapabilityTEE - return nil - }) - case ev.Updated != nil: - // Update runtime capabilities. - n.roleProvider.SetAvailable(func(nd *node.Node) error { - rt := nd.AddOrUpdateRuntime(n.commonNode.Runtime.ID()) - rt.Version = runtimeVersion - rt.Capabilities.TEE = ev.Updated.CapabilityTEE - return nil - }) - case ev.FailedToStart != nil, ev.Stopped != nil: - // Runtime failed to start or was stopped -- we can no longer service requests. - n.roleProvider.SetUnavailable() - default: - // Unknown event. - n.logger.Warn("unknown worker event", - "ev", ev, - ) - } + n.handleRuntimeHostEvent(ev) case batch := <-processingDoneCh: // Batch processing has finished. - if batch == nil || batch.computed == nil { - n.logger.Warn("worker has aborted batch processing") - func() { - n.commonNode.CrossNode.Lock() - defer n.commonNode.CrossNode.Unlock() - - // To avoid stale events, check if the stored state is still valid. - if state, ok := n.state.(StateProcessingBatch); !ok || state.done != processingDoneCh { - return - } - n.abortBatchLocked(errRuntimeAborted) - }() - break - } - - n.logger.Info("worker has finished processing a batch") - - func() { - n.commonNode.CrossNode.Lock() - defer n.commonNode.CrossNode.Unlock() - - // To avoid stale events, check if the stored state is still valid. - if state, ok := n.state.(StateProcessingBatch); !ok || state.done != processingDoneCh { - return - } - n.proposeBatchLocked(batch) - }() + n.handleProcessedBatch(batch, processingDoneCh) case runtime := <-rtCh: // XXX: Once there is more than one scheduling algorithm available // this might need to recreate the scheduler and reinsert