diff --git a/.changelog/2502.feature.md b/.changelog/2502.feature.md new file mode 100644 index 00000000000..e7a62b8f35e --- /dev/null +++ b/.changelog/2502.feature.md @@ -0,0 +1,6 @@ +go/worker/txnscheduler: Check transactions before queuing them + +The transaction scheduler can now optionally run runtimes and +check transactions before scheduling them (see issue #1963). +This functionality is disabled by default, enable it with +`worker.txn_scheduler.check_tx.enabled`. diff --git a/go/oasis-node/node_test.go b/go/oasis-node/node_test.go index 5fe9bdc12c8..a0c20c443a4 100644 --- a/go/oasis-node/node_test.go +++ b/go/oasis-node/node_test.go @@ -73,6 +73,7 @@ var ( {workerCommon.CfgClientPort, workerClientPort}, {storageWorker.CfgWorkerEnabled, true}, {txnscheduler.CfgWorkerEnabled, true}, + {txnscheduler.CfgCheckTxEnabled, false}, {mergeWorker.CfgWorkerEnabled, true}, {supplementarysanity.CfgEnabled, true}, {supplementarysanity.CfgInterval, 1}, diff --git a/go/oasis-test-runner/oasis/args.go b/go/oasis-test-runner/oasis/args.go index c9c8c735971..14b8083ee28 100644 --- a/go/oasis-test-runner/oasis/args.go +++ b/go/oasis-test-runner/oasis/args.go @@ -273,6 +273,11 @@ func (args *argBuilder) workerTxnschedulerEnabled() *argBuilder { return args } +func (args *argBuilder) workerTxnschedulerCheckTxEnabled() *argBuilder { + args.vec = append(args.vec, "--"+txnscheduler.CfgCheckTxEnabled) + return args +} + func (args *argBuilder) iasUseGenesis() *argBuilder { args.vec = append(args.vec, "--ias.use_genesis") return args diff --git a/go/oasis-test-runner/oasis/compute.go b/go/oasis-test-runner/oasis/compute.go index 1dec1ef2fd7..7386d3c6f14 100644 --- a/go/oasis-test-runner/oasis/compute.go +++ b/go/oasis-test-runner/oasis/compute.go @@ -77,6 +77,7 @@ func (worker *Compute) startNode() error { workerRuntimeLoader(worker.net.cfg.RuntimeLoaderBinary). workerMergeEnabled(). workerTxnschedulerEnabled(). + workerTxnschedulerCheckTxEnabled(). appendNetwork(worker.net). appendEntity(worker.entity) for _, v := range worker.net.runtimes { diff --git a/go/worker/common/runtime_host.go b/go/worker/common/runtime_host.go index 95fad257c3e..17a4a1e820c 100644 --- a/go/worker/common/runtime_host.go +++ b/go/worker/common/runtime_host.go @@ -15,6 +15,7 @@ import ( keymanagerApi "github.com/oasislabs/oasis-core/go/keymanager/api" keymanagerClient "github.com/oasislabs/oasis-core/go/keymanager/client" registry "github.com/oasislabs/oasis-core/go/registry/api" + roothash "github.com/oasislabs/oasis-core/go/roothash/api/block" "github.com/oasislabs/oasis-core/go/runtime/localstorage" storage "github.com/oasislabs/oasis-core/go/storage/api" "github.com/oasislabs/oasis-core/go/worker/common/committee" @@ -268,6 +269,14 @@ func (n *RuntimeHostNode) GetWorkerHostLocked() host.Host { return n.workerHost } +// GetCurrentBlock returns the current roothash block from the underlying common node. +func (n *RuntimeHostNode) GetCurrentBlock() roothash.Block { + n.commonNode.CrossNode.Lock() + defer n.commonNode.CrossNode.Unlock() + + return *n.commonNode.CurrentBlock +} + // NewRuntimeHostNode creates a new runtime host node. func NewRuntimeHostNode(commonNode *committee.Node, workerHostFactory host.Factory) *RuntimeHostNode { return &RuntimeHostNode{ diff --git a/go/worker/txnscheduler/api/api.go b/go/worker/txnscheduler/api/api.go index 19f1f9f2356..6ccb11c1784 100644 --- a/go/worker/txnscheduler/api/api.go +++ b/go/worker/txnscheduler/api/api.go @@ -23,6 +23,9 @@ var ( // ErrNotReady is the error returned when the transaction scheduler is not // yet ready to process transactions. ErrNotReady = errors.New(ModuleName, 3, "txnscheduler: not ready") + + // ErrCheckTxFailed is the error returned when CheckTx fails. + ErrCheckTxFailed = errors.New(ModuleName, 4, "txnscheduler: CheckTx failed") ) // TransactionScheduler is the transaction scheduler API interface. diff --git a/go/worker/txnscheduler/committee/node.go b/go/worker/txnscheduler/committee/node.go index 7763da932f3..c9a49f38182 100644 --- a/go/worker/txnscheduler/committee/node.go +++ b/go/worker/txnscheduler/committee/node.go @@ -51,9 +51,11 @@ var ( ) // Node is a committee node. -type Node struct { +type Node struct { // nolint: maligned *commonWorker.RuntimeHostNode + checkTxEnabled bool + commonNode *committee.Node computeNode *computeCommittee.Node @@ -389,14 +391,16 @@ func (n *Node) worker() { n.logger.Info("starting committee node") - // Initialize worker host for the new runtime. - if err := n.InitializeRuntimeWorkerHost(n.ctx); err != nil { - n.logger.Error("failed to initialize worker host", - "err", err, - ) - return + if n.checkTxEnabled { + // Initialize worker host for the new runtime. + if err := n.InitializeRuntimeWorkerHost(n.ctx); err != nil { + n.logger.Error("failed to initialize worker host", + "err", err, + ) + return + } + defer n.StopRuntimeWorkerHost() } - defer n.StopRuntimeWorkerHost() // Initialize transaction scheduler's algorithm. runtime, err := n.commonNode.Runtime.RegistryDescriptor(n.ctx) @@ -451,6 +455,7 @@ func NewNode( commonNode *committee.Node, computeNode *computeCommittee.Node, workerHostFactory host.Factory, + checkTxEnabled bool, ) (*Node, error) { metricsOnce.Do(func() { prometheus.MustRegister(nodeCollectors...) @@ -460,6 +465,7 @@ func NewNode( n := &Node{ RuntimeHostNode: commonWorker.NewRuntimeHostNode(commonNode, workerHostFactory), + checkTxEnabled: checkTxEnabled, commonNode: commonNode, computeNode: computeNode, ctx: ctx, diff --git a/go/worker/txnscheduler/init.go b/go/worker/txnscheduler/init.go index 7a8d1c73995..ce77ef14ab4 100644 --- a/go/worker/txnscheduler/init.go +++ b/go/worker/txnscheduler/init.go @@ -13,6 +13,8 @@ import ( const ( // CfgWorkerEnabled enables the tx scheduler worker. CfgWorkerEnabled = "worker.txn_scheduler.enabled" + // CfgCheckTxEnabled enables checking each transaction before scheduling it. + CfgCheckTxEnabled = "worker.txn_scheduler.check_tx.enabled" ) // Flags has the configuration flags. @@ -23,17 +25,23 @@ func Enabled() bool { return viper.GetBool(CfgWorkerEnabled) } +// CheckTxEnabled reads our CheckTx enabled flag from viper. +func CheckTxEnabled() bool { + return viper.GetBool(CfgCheckTxEnabled) +} + // New creates a new worker. func New( commonWorker *workerCommon.Worker, compute *compute.Worker, registration *registration.Worker, ) (*Worker, error) { - return newWorker(Enabled(), commonWorker, compute, registration) + return newWorker(Enabled(), commonWorker, compute, registration, CheckTxEnabled()) } func init() { Flags.Bool(CfgWorkerEnabled, false, "Enable transaction scheduler process") + Flags.Bool(CfgCheckTxEnabled, false, "Enable checking transactions before scheduling them") _ = viper.BindPFlags(Flags) diff --git a/go/worker/txnscheduler/service.go b/go/worker/txnscheduler/service.go index ac4187cff89..9be349ed4f7 100644 --- a/go/worker/txnscheduler/service.go +++ b/go/worker/txnscheduler/service.go @@ -2,7 +2,11 @@ package txnscheduler import ( "context" + "fmt" + "github.com/oasislabs/oasis-core/go/common/cbor" + "github.com/oasislabs/oasis-core/go/runtime/transaction" + "github.com/oasislabs/oasis-core/go/worker/common/host/protocol" "github.com/oasislabs/oasis-core/go/worker/txnscheduler/api" ) @@ -15,6 +19,55 @@ func (w *Worker) SubmitTx(ctx context.Context, rq *api.SubmitTxRequest) (*api.Su return nil, api.ErrUnknownRuntime } + if w.checkTxEnabled { + // Check transaction before queuing it. + checkRq := &protocol.Body{ + WorkerCheckTxBatchRequest: &protocol.WorkerCheckTxBatchRequest{ + Inputs: transaction.RawBatch{rq.Data}, + Block: runtime.GetCurrentBlock(), + }, + } + workerHost := runtime.GetWorkerHost() + if workerHost == nil { + w.logger.Error("worker host not initialized") + return nil, api.ErrNotReady + } + resp, err := workerHost.Call(ctx, checkRq) + if err != nil { + w.logger.Error("worker host CheckTx call error", + "err", err, + ) + return nil, err + } + if resp == nil { + w.logger.Error("worker host CheckTx reponse is nil") + return nil, api.ErrCheckTxFailed + } + if resp.WorkerCheckTxBatchResponse.Results == nil { + w.logger.Error("worker host CheckTx response contains no results") + return nil, api.ErrCheckTxFailed + } + if len(resp.WorkerCheckTxBatchResponse.Results) != 1 { + w.logger.Error("worker host CheckTx response doesn't contain exactly one result", + "num_results", len(resp.WorkerCheckTxBatchResponse.Results), + ) + return nil, api.ErrCheckTxFailed + } + + // Interpret CheckTx result. + resultRaw := resp.WorkerCheckTxBatchResponse.Results[0] + var result transaction.TxnOutput + cbor.MustUnmarshal(resultRaw, &result) + if result.Error != nil { + w.logger.Error("worker CheckTx failed with error", + "err", result.Error, + ) + return nil, fmt.Errorf("%w: %s", api.ErrCheckTxFailed, *result.Error) + } + + w.logger.Debug("worker CheckTx successful, queuing transaction") + } + if err := runtime.QueueCall(ctx, rq.Data); err != nil { return nil, err } diff --git a/go/worker/txnscheduler/worker.go b/go/worker/txnscheduler/worker.go index 8a7992c3b14..32bdf78121c 100644 --- a/go/worker/txnscheduler/worker.go +++ b/go/worker/txnscheduler/worker.go @@ -18,7 +18,8 @@ import ( type Worker struct { *workerCommon.RuntimeHostWorker - enabled bool + enabled bool + checkTxEnabled bool commonWorker *workerCommon.Worker registration *registration.Worker @@ -152,7 +153,7 @@ func (w *Worker) registerRuntime(commonNode *committeeCommon.Node) error { } // Create committee node for the given runtime. - node, err := committee.NewNode(commonNode, computeNode, workerHostFactory) + node, err := committee.NewNode(commonNode, computeNode, workerHostFactory, w.checkTxEnabled) if err != nil { return err } @@ -172,20 +173,22 @@ func newWorker( commonWorker *workerCommon.Worker, compute *compute.Worker, registration *registration.Worker, + checkTxEnabled bool, ) (*Worker, error) { ctx, cancelCtx := context.WithCancel(context.Background()) w := &Worker{ - enabled: enabled, - commonWorker: commonWorker, - registration: registration, - compute: compute, - runtimes: make(map[common.Namespace]*committee.Node), - ctx: ctx, - cancelCtx: cancelCtx, - quitCh: make(chan struct{}), - initCh: make(chan struct{}), - logger: logging.GetLogger("worker/txnscheduler"), + enabled: enabled, + checkTxEnabled: checkTxEnabled, + commonWorker: commonWorker, + registration: registration, + compute: compute, + runtimes: make(map[common.Namespace]*committee.Node), + ctx: ctx, + cancelCtx: cancelCtx, + quitCh: make(chan struct{}), + initCh: make(chan struct{}), + logger: logging.GetLogger("worker/txnscheduler"), } if enabled { @@ -193,8 +196,9 @@ func newWorker( panic("common worker should have been enabled for transaction scheduler") } - // Create the runtime host worker. var err error + + // Create the runtime host worker. w.RuntimeHostWorker, err = workerCommon.NewRuntimeHostWorker(commonWorker) if err != nil { return nil, err @@ -212,48 +216,14 @@ func newWorker( // Register transaction scheduler worker role. if err = w.registration.RegisterRole(node.RoleTransactionScheduler, func(n *node.Node) error { - // Wait until all the runtimes are initialized. - for _, rt := range w.runtimes { - select { - case <-rt.Initialized(): - case <-w.ctx.Done(): - return w.ctx.Err() - } - } - - for _, rt := range n.Runtimes { - var grr error - - workerRT := w.runtimes[rt.ID] - if workerRT == nil { - continue - } - - workerHost := workerRT.GetWorkerHost() - if workerHost == nil { - w.logger.Debug("runtime has shut down", - "runtime", rt.ID, - ) - continue - } - if rt.Capabilities.TEE, grr = workerHost.WaitForCapabilityTEE(w.ctx); grr != nil { - w.logger.Error("failed to obtain CapabilityTEE", - "err", grr, - "runtime", rt.ID, - ) - continue - } - - runtimeVersion, grr := workerHost.WaitForRuntimeVersion(w.ctx) - if grr == nil && runtimeVersion != nil { - rt.Version = *runtimeVersion - } else { - w.logger.Error("failed to obtain RuntimeVersion", - "err", grr, - "runtime", rt.ID, - "runtime_version", runtimeVersion, - ) - continue + if w.checkTxEnabled { + // Wait until all the runtimes are initialized. + for _, rt := range w.runtimes { + select { + case <-rt.Initialized(): + case <-w.ctx.Done(): + return w.ctx.Err() + } } }