From bc1fe5727ddcb24385019b817053e6441ee76188 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andrej=20Buko=C5=A1ek?= Date: Fri, 20 Dec 2019 17:14:09 +0100 Subject: [PATCH 1/2] go/worker/txnscheduler: Run runtimes --- go/registry/api/api.go | 11 ++-- go/worker/txnscheduler/committee/node.go | 15 +++++ go/worker/txnscheduler/worker.go | 82 ++++++++++++++++++++++-- 3 files changed, 97 insertions(+), 11 deletions(-) diff --git a/go/registry/api/api.go b/go/registry/api/api.go index b86756d3db3..0ac02b3b692 100644 --- a/go/registry/api/api.go +++ b/go/registry/api/api.go @@ -143,7 +143,8 @@ var ( // RuntimesRequiredRoles are the Node roles that require runtimes. RuntimesRequiredRoles = node.RoleComputeWorker | - node.RoleKeyManager + node.RoleKeyManager | + node.RoleTransactionScheduler // ConsensusAddressRequiredRoles are the Node roles that require Consensus Address. ConsensusAddressRequiredRoles = node.RoleValidator @@ -1228,12 +1229,12 @@ func SanityCheckNodes(nodes []*node.SignedNode, seenEntities map[signature.Publi return fmt.Errorf("registry: sanity check failed: key manager node must have runtime(s)") } - if n.HasRoles(node.RoleStorageWorker) && !n.HasRoles(node.RoleComputeWorker) && !n.HasRoles(node.RoleKeyManager) && len(n.Runtimes) > 0 { - return fmt.Errorf("registry: sanity check failed: storage worker node shouldn't have any runtimes") + if n.HasRoles(node.RoleTransactionScheduler) && len(n.Runtimes) == 0 { + return fmt.Errorf("registry: sanity check failed: transaction scheduler node must have runtime(s)") } - if n.HasRoles(node.RoleTransactionScheduler) && !n.HasRoles(node.RoleComputeWorker) && !n.HasRoles(node.RoleKeyManager) && len(n.Runtimes) > 0 { - return fmt.Errorf("registry: sanity check failed: transaction scheduler node shouldn't have any runtimes") + if n.HasRoles(node.RoleStorageWorker) && !n.HasRoles(node.RoleComputeWorker) && !n.HasRoles(node.RoleKeyManager) && len(n.Runtimes) > 0 { + return fmt.Errorf("registry: sanity check failed: storage worker node shouldn't have any runtimes") } if n.HasRoles(node.RoleMergeWorker) && !n.HasRoles(node.RoleComputeWorker) && !n.HasRoles(node.RoleKeyManager) && len(n.Runtimes) > 0 { diff --git a/go/worker/txnscheduler/committee/node.go b/go/worker/txnscheduler/committee/node.go index a434939887f..7763da932f3 100644 --- a/go/worker/txnscheduler/committee/node.go +++ b/go/worker/txnscheduler/committee/node.go @@ -20,7 +20,9 @@ import ( "github.com/oasislabs/oasis-core/go/roothash/api/block" "github.com/oasislabs/oasis-core/go/runtime/transaction" storage "github.com/oasislabs/oasis-core/go/storage/api" + commonWorker "github.com/oasislabs/oasis-core/go/worker/common" "github.com/oasislabs/oasis-core/go/worker/common/committee" + "github.com/oasislabs/oasis-core/go/worker/common/host" "github.com/oasislabs/oasis-core/go/worker/common/p2p" computeCommittee "github.com/oasislabs/oasis-core/go/worker/compute/committee" txnSchedulerAlgorithm "github.com/oasislabs/oasis-core/go/worker/txnscheduler/algorithm" @@ -50,6 +52,8 @@ var ( // Node is a committee node. type Node struct { + *commonWorker.RuntimeHostNode + commonNode *committee.Node computeNode *computeCommittee.Node @@ -385,6 +389,15 @@ func (n *Node) worker() { n.logger.Info("starting committee node") + // Initialize worker host for the new runtime. + if err := n.InitializeRuntimeWorkerHost(n.ctx); err != nil { + n.logger.Error("failed to initialize worker host", + "err", err, + ) + return + } + defer n.StopRuntimeWorkerHost() + // Initialize transaction scheduler's algorithm. runtime, err := n.commonNode.Runtime.RegistryDescriptor(n.ctx) if err != nil { @@ -437,6 +450,7 @@ func (n *Node) worker() { func NewNode( commonNode *committee.Node, computeNode *computeCommittee.Node, + workerHostFactory host.Factory, ) (*Node, error) { metricsOnce.Do(func() { prometheus.MustRegister(nodeCollectors...) @@ -445,6 +459,7 @@ func NewNode( ctx, cancel := context.WithCancel(context.Background()) n := &Node{ + RuntimeHostNode: commonWorker.NewRuntimeHostNode(commonNode, workerHostFactory), commonNode: commonNode, computeNode: computeNode, ctx: ctx, diff --git a/go/worker/txnscheduler/worker.go b/go/worker/txnscheduler/worker.go index e8cf728ae8f..8a7992c3b14 100644 --- a/go/worker/txnscheduler/worker.go +++ b/go/worker/txnscheduler/worker.go @@ -1,6 +1,8 @@ package txnscheduler import ( + "context" + "github.com/oasislabs/oasis-core/go/common" "github.com/oasislabs/oasis-core/go/common/logging" "github.com/oasislabs/oasis-core/go/common/node" @@ -14,6 +16,8 @@ import ( // Worker is a transaction scheduler handling many runtimes. type Worker struct { + *workerCommon.RuntimeHostWorker + enabled bool commonWorker *workerCommon.Worker @@ -22,8 +26,10 @@ type Worker struct { runtimes map[common.Namespace]*committee.Node - quitCh chan struct{} - initCh chan struct{} + ctx context.Context + cancelCtx context.CancelFunc + quitCh chan struct{} + initCh chan struct{} logger *logging.Logger } @@ -139,7 +145,14 @@ func (w *Worker) registerRuntime(commonNode *committeeCommon.Node) error { // Get other nodes from this runtime. computeNode := w.compute.GetRuntime(id) - node, err := committee.NewNode(commonNode, computeNode) + // Create worker host for the given runtime. + workerHostFactory, err := w.NewRuntimeWorkerHostFactory(node.RoleTransactionScheduler, id) + if err != nil { + return err + } + + // Create committee node for the given runtime. + node, err := committee.NewNode(commonNode, computeNode, workerHostFactory) if err != nil { return err } @@ -160,12 +173,16 @@ func newWorker( compute *compute.Worker, registration *registration.Worker, ) (*Worker, error) { + ctx, cancelCtx := context.WithCancel(context.Background()) + w := &Worker{ enabled: enabled, commonWorker: commonWorker, registration: registration, compute: compute, runtimes: make(map[common.Namespace]*committee.Node), + ctx: ctx, + cancelCtx: cancelCtx, quitCh: make(chan struct{}), initCh: make(chan struct{}), logger: logging.GetLogger("worker/txnscheduler"), @@ -176,19 +193,72 @@ func newWorker( panic("common worker should have been enabled for transaction scheduler") } + // Create the runtime host worker. + var err error + w.RuntimeHostWorker, err = workerCommon.NewRuntimeHostWorker(commonWorker) + if err != nil { + return nil, err + } + // Use existing gRPC server passed from the node. api.RegisterService(commonWorker.Grpc.Server(), w) // Register all configured runtimes. for _, rt := range commonWorker.GetRuntimes() { - if err := w.registerRuntime(rt); err != nil { + if err = w.registerRuntime(rt); err != nil { return nil, err } } // Register transaction scheduler worker role. - if err := w.registration.RegisterRole(node.RoleTransactionScheduler, - func(n *node.Node) error { return nil }); err != nil { + if err = w.registration.RegisterRole(node.RoleTransactionScheduler, func(n *node.Node) error { + // Wait until all the runtimes are initialized. + for _, rt := range w.runtimes { + select { + case <-rt.Initialized(): + case <-w.ctx.Done(): + return w.ctx.Err() + } + } + + for _, rt := range n.Runtimes { + var grr error + + workerRT := w.runtimes[rt.ID] + if workerRT == nil { + continue + } + + workerHost := workerRT.GetWorkerHost() + if workerHost == nil { + w.logger.Debug("runtime has shut down", + "runtime", rt.ID, + ) + continue + } + if rt.Capabilities.TEE, grr = workerHost.WaitForCapabilityTEE(w.ctx); grr != nil { + w.logger.Error("failed to obtain CapabilityTEE", + "err", grr, + "runtime", rt.ID, + ) + continue + } + + runtimeVersion, grr := workerHost.WaitForRuntimeVersion(w.ctx) + if grr == nil && runtimeVersion != nil { + rt.Version = *runtimeVersion + } else { + w.logger.Error("failed to obtain RuntimeVersion", + "err", grr, + "runtime", rt.ID, + "runtime_version", runtimeVersion, + ) + continue + } + } + + return nil + }); err != nil { return nil, err } } From 25e1f505fe16491c1d960f5d49f58bf5edb955e4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andrej=20Buko=C5=A1ek?= Date: Mon, 23 Dec 2019 14:27:53 +0100 Subject: [PATCH 2/2] go/worker/txnscheduler: Check txns before queuing them --- .changelog/2502.feature.md | 6 ++ go/oasis-node/node_test.go | 1 + go/oasis-test-runner/oasis/args.go | 5 ++ go/oasis-test-runner/oasis/compute.go | 1 + go/worker/txnscheduler/api/api.go | 3 + go/worker/txnscheduler/committee/node.go | 94 ++++++++++++++++++++++-- go/worker/txnscheduler/init.go | 10 ++- go/worker/txnscheduler/worker.go | 88 +++++++--------------- 8 files changed, 139 insertions(+), 69 deletions(-) create mode 100644 .changelog/2502.feature.md diff --git a/.changelog/2502.feature.md b/.changelog/2502.feature.md new file mode 100644 index 00000000000..e7a62b8f35e --- /dev/null +++ b/.changelog/2502.feature.md @@ -0,0 +1,6 @@ +go/worker/txnscheduler: Check transactions before queuing them + +The transaction scheduler can now optionally run runtimes and +check transactions before scheduling them (see issue #1963). +This functionality is disabled by default, enable it with +`worker.txn_scheduler.check_tx.enabled`. diff --git a/go/oasis-node/node_test.go b/go/oasis-node/node_test.go index 5fe9bdc12c8..a0c20c443a4 100644 --- a/go/oasis-node/node_test.go +++ b/go/oasis-node/node_test.go @@ -73,6 +73,7 @@ var ( {workerCommon.CfgClientPort, workerClientPort}, {storageWorker.CfgWorkerEnabled, true}, {txnscheduler.CfgWorkerEnabled, true}, + {txnscheduler.CfgCheckTxEnabled, false}, {mergeWorker.CfgWorkerEnabled, true}, {supplementarysanity.CfgEnabled, true}, {supplementarysanity.CfgInterval, 1}, diff --git a/go/oasis-test-runner/oasis/args.go b/go/oasis-test-runner/oasis/args.go index c9c8c735971..14b8083ee28 100644 --- a/go/oasis-test-runner/oasis/args.go +++ b/go/oasis-test-runner/oasis/args.go @@ -273,6 +273,11 @@ func (args *argBuilder) workerTxnschedulerEnabled() *argBuilder { return args } +func (args *argBuilder) workerTxnschedulerCheckTxEnabled() *argBuilder { + args.vec = append(args.vec, "--"+txnscheduler.CfgCheckTxEnabled) + return args +} + func (args *argBuilder) iasUseGenesis() *argBuilder { args.vec = append(args.vec, "--ias.use_genesis") return args diff --git a/go/oasis-test-runner/oasis/compute.go b/go/oasis-test-runner/oasis/compute.go index 1dec1ef2fd7..7386d3c6f14 100644 --- a/go/oasis-test-runner/oasis/compute.go +++ b/go/oasis-test-runner/oasis/compute.go @@ -77,6 +77,7 @@ func (worker *Compute) startNode() error { workerRuntimeLoader(worker.net.cfg.RuntimeLoaderBinary). workerMergeEnabled(). workerTxnschedulerEnabled(). + workerTxnschedulerCheckTxEnabled(). appendNetwork(worker.net). appendEntity(worker.entity) for _, v := range worker.net.runtimes { diff --git a/go/worker/txnscheduler/api/api.go b/go/worker/txnscheduler/api/api.go index 19f1f9f2356..6ccb11c1784 100644 --- a/go/worker/txnscheduler/api/api.go +++ b/go/worker/txnscheduler/api/api.go @@ -23,6 +23,9 @@ var ( // ErrNotReady is the error returned when the transaction scheduler is not // yet ready to process transactions. ErrNotReady = errors.New(ModuleName, 3, "txnscheduler: not ready") + + // ErrCheckTxFailed is the error returned when CheckTx fails. + ErrCheckTxFailed = errors.New(ModuleName, 4, "txnscheduler: CheckTx failed") ) // TransactionScheduler is the transaction scheduler API interface. diff --git a/go/worker/txnscheduler/committee/node.go b/go/worker/txnscheduler/committee/node.go index 7763da932f3..130ac033673 100644 --- a/go/worker/txnscheduler/committee/node.go +++ b/go/worker/txnscheduler/committee/node.go @@ -10,6 +10,7 @@ import ( "github.com/opentracing/opentracing-go" "github.com/prometheus/client_golang/prometheus" + "github.com/oasislabs/oasis-core/go/common/cbor" "github.com/oasislabs/oasis-core/go/common/crash" "github.com/oasislabs/oasis-core/go/common/crypto/hash" "github.com/oasislabs/oasis-core/go/common/crypto/signature" @@ -23,6 +24,7 @@ import ( commonWorker "github.com/oasislabs/oasis-core/go/worker/common" "github.com/oasislabs/oasis-core/go/worker/common/committee" "github.com/oasislabs/oasis-core/go/worker/common/host" + "github.com/oasislabs/oasis-core/go/worker/common/host/protocol" "github.com/oasislabs/oasis-core/go/worker/common/p2p" computeCommittee "github.com/oasislabs/oasis-core/go/worker/compute/committee" txnSchedulerAlgorithm "github.com/oasislabs/oasis-core/go/worker/txnscheduler/algorithm" @@ -51,9 +53,11 @@ var ( ) // Node is a committee node. -type Node struct { +type Node struct { // nolint: maligned *commonWorker.RuntimeHostNode + checkTxEnabled bool + commonNode *committee.Node computeNode *computeCommittee.Node @@ -132,6 +136,68 @@ func (n *Node) HandlePeerMessage(ctx context.Context, message *p2p.Message) (boo return false, nil } +// CheckTx checks the given call in the node's runtime. +func (n *Node) CheckTx(ctx context.Context, call []byte) error { + n.commonNode.CrossNode.Lock() + currentBlock := n.commonNode.CurrentBlock + n.commonNode.CrossNode.Unlock() + + if currentBlock == nil { + return api.ErrNotReady + } + + checkRq := &protocol.Body{ + WorkerCheckTxBatchRequest: &protocol.WorkerCheckTxBatchRequest{ + Inputs: transaction.RawBatch{call}, + Block: *currentBlock, + }, + } + workerHost := n.GetWorkerHost() + if workerHost == nil { + n.logger.Error("worker host not initialized") + return api.ErrNotReady + } + resp, err := workerHost.Call(ctx, checkRq) + if err != nil { + n.logger.Error("worker host CheckTx call error", + "err", err, + ) + return err + } + if resp == nil { + n.logger.Error("worker host CheckTx reponse is nil") + return api.ErrCheckTxFailed + } + if resp.WorkerCheckTxBatchResponse.Results == nil { + n.logger.Error("worker host CheckTx response contains no results") + return api.ErrCheckTxFailed + } + if len(resp.WorkerCheckTxBatchResponse.Results) != 1 { + n.logger.Error("worker host CheckTx response doesn't contain exactly one result", + "num_results", len(resp.WorkerCheckTxBatchResponse.Results), + ) + return api.ErrCheckTxFailed + } + + // Interpret CheckTx result. + resultRaw := resp.WorkerCheckTxBatchResponse.Results[0] + var result transaction.TxnOutput + if err = cbor.Unmarshal(resultRaw, &result); err != nil { + n.logger.Error("worker host CheckTx response failed to deserialize", + "err", err, + ) + return api.ErrCheckTxFailed + } + if result.Error != nil { + n.logger.Error("worker CheckTx failed with error", + "err", result.Error, + ) + return fmt.Errorf("%w: %s", api.ErrCheckTxFailed, *result.Error) + } + + return nil +} + // QueueCall queues a call for processing by this node. func (n *Node) QueueCall(ctx context.Context, call []byte) error { // Check if we are a leader. Note that we may be in the middle of a @@ -140,6 +206,14 @@ func (n *Node) QueueCall(ctx context.Context, call []byte) error { return api.ErrNotLeader } + if n.checkTxEnabled { + // Check transaction before queuing it. + if err := n.CheckTx(ctx, call); err != nil { + return err + } + n.logger.Debug("worker CheckTx successful, queuing transaction") + } + n.algorithmMutex.RLock() defer n.algorithmMutex.RUnlock() @@ -389,14 +463,16 @@ func (n *Node) worker() { n.logger.Info("starting committee node") - // Initialize worker host for the new runtime. - if err := n.InitializeRuntimeWorkerHost(n.ctx); err != nil { - n.logger.Error("failed to initialize worker host", - "err", err, - ) - return + if n.checkTxEnabled { + // Initialize worker host for the new runtime. + if err := n.InitializeRuntimeWorkerHost(n.ctx); err != nil { + n.logger.Error("failed to initialize worker host", + "err", err, + ) + return + } + defer n.StopRuntimeWorkerHost() } - defer n.StopRuntimeWorkerHost() // Initialize transaction scheduler's algorithm. runtime, err := n.commonNode.Runtime.RegistryDescriptor(n.ctx) @@ -451,6 +527,7 @@ func NewNode( commonNode *committee.Node, computeNode *computeCommittee.Node, workerHostFactory host.Factory, + checkTxEnabled bool, ) (*Node, error) { metricsOnce.Do(func() { prometheus.MustRegister(nodeCollectors...) @@ -460,6 +537,7 @@ func NewNode( n := &Node{ RuntimeHostNode: commonWorker.NewRuntimeHostNode(commonNode, workerHostFactory), + checkTxEnabled: checkTxEnabled, commonNode: commonNode, computeNode: computeNode, ctx: ctx, diff --git a/go/worker/txnscheduler/init.go b/go/worker/txnscheduler/init.go index 7a8d1c73995..ce77ef14ab4 100644 --- a/go/worker/txnscheduler/init.go +++ b/go/worker/txnscheduler/init.go @@ -13,6 +13,8 @@ import ( const ( // CfgWorkerEnabled enables the tx scheduler worker. CfgWorkerEnabled = "worker.txn_scheduler.enabled" + // CfgCheckTxEnabled enables checking each transaction before scheduling it. + CfgCheckTxEnabled = "worker.txn_scheduler.check_tx.enabled" ) // Flags has the configuration flags. @@ -23,17 +25,23 @@ func Enabled() bool { return viper.GetBool(CfgWorkerEnabled) } +// CheckTxEnabled reads our CheckTx enabled flag from viper. +func CheckTxEnabled() bool { + return viper.GetBool(CfgCheckTxEnabled) +} + // New creates a new worker. func New( commonWorker *workerCommon.Worker, compute *compute.Worker, registration *registration.Worker, ) (*Worker, error) { - return newWorker(Enabled(), commonWorker, compute, registration) + return newWorker(Enabled(), commonWorker, compute, registration, CheckTxEnabled()) } func init() { Flags.Bool(CfgWorkerEnabled, false, "Enable transaction scheduler process") + Flags.Bool(CfgCheckTxEnabled, false, "Enable checking transactions before scheduling them") _ = viper.BindPFlags(Flags) diff --git a/go/worker/txnscheduler/worker.go b/go/worker/txnscheduler/worker.go index 8a7992c3b14..b609bce50c4 100644 --- a/go/worker/txnscheduler/worker.go +++ b/go/worker/txnscheduler/worker.go @@ -18,7 +18,8 @@ import ( type Worker struct { *workerCommon.RuntimeHostWorker - enabled bool + enabled bool + checkTxEnabled bool commonWorker *workerCommon.Worker registration *registration.Worker @@ -26,10 +27,9 @@ type Worker struct { runtimes map[common.Namespace]*committee.Node - ctx context.Context - cancelCtx context.CancelFunc - quitCh chan struct{} - initCh chan struct{} + ctx context.Context + quitCh chan struct{} + initCh chan struct{} logger *logging.Logger } @@ -152,7 +152,7 @@ func (w *Worker) registerRuntime(commonNode *committeeCommon.Node) error { } // Create committee node for the given runtime. - node, err := committee.NewNode(commonNode, computeNode, workerHostFactory) + node, err := committee.NewNode(commonNode, computeNode, workerHostFactory, w.checkTxEnabled) if err != nil { return err } @@ -172,20 +172,21 @@ func newWorker( commonWorker *workerCommon.Worker, compute *compute.Worker, registration *registration.Worker, + checkTxEnabled bool, ) (*Worker, error) { - ctx, cancelCtx := context.WithCancel(context.Background()) + ctx := context.Background() w := &Worker{ - enabled: enabled, - commonWorker: commonWorker, - registration: registration, - compute: compute, - runtimes: make(map[common.Namespace]*committee.Node), - ctx: ctx, - cancelCtx: cancelCtx, - quitCh: make(chan struct{}), - initCh: make(chan struct{}), - logger: logging.GetLogger("worker/txnscheduler"), + enabled: enabled, + checkTxEnabled: checkTxEnabled, + commonWorker: commonWorker, + registration: registration, + compute: compute, + runtimes: make(map[common.Namespace]*committee.Node), + ctx: ctx, + quitCh: make(chan struct{}), + initCh: make(chan struct{}), + logger: logging.GetLogger("worker/txnscheduler"), } if enabled { @@ -193,8 +194,9 @@ func newWorker( panic("common worker should have been enabled for transaction scheduler") } - // Create the runtime host worker. var err error + + // Create the runtime host worker. w.RuntimeHostWorker, err = workerCommon.NewRuntimeHostWorker(commonWorker) if err != nil { return nil, err @@ -212,48 +214,14 @@ func newWorker( // Register transaction scheduler worker role. if err = w.registration.RegisterRole(node.RoleTransactionScheduler, func(n *node.Node) error { - // Wait until all the runtimes are initialized. - for _, rt := range w.runtimes { - select { - case <-rt.Initialized(): - case <-w.ctx.Done(): - return w.ctx.Err() - } - } - - for _, rt := range n.Runtimes { - var grr error - - workerRT := w.runtimes[rt.ID] - if workerRT == nil { - continue - } - - workerHost := workerRT.GetWorkerHost() - if workerHost == nil { - w.logger.Debug("runtime has shut down", - "runtime", rt.ID, - ) - continue - } - if rt.Capabilities.TEE, grr = workerHost.WaitForCapabilityTEE(w.ctx); grr != nil { - w.logger.Error("failed to obtain CapabilityTEE", - "err", grr, - "runtime", rt.ID, - ) - continue - } - - runtimeVersion, grr := workerHost.WaitForRuntimeVersion(w.ctx) - if grr == nil && runtimeVersion != nil { - rt.Version = *runtimeVersion - } else { - w.logger.Error("failed to obtain RuntimeVersion", - "err", grr, - "runtime", rt.ID, - "runtime_version", runtimeVersion, - ) - continue + if w.checkTxEnabled { + // Wait until all the runtimes are initialized. + for _, rt := range w.runtimes { + select { + case <-rt.Initialized(): + case <-w.ctx.Done(): + return w.ctx.Err() + } } }