diff --git a/go/registry/api/api.go b/go/registry/api/api.go index b86756d3db3..0ac02b3b692 100644 --- a/go/registry/api/api.go +++ b/go/registry/api/api.go @@ -143,7 +143,8 @@ var ( // RuntimesRequiredRoles are the Node roles that require runtimes. RuntimesRequiredRoles = node.RoleComputeWorker | - node.RoleKeyManager + node.RoleKeyManager | + node.RoleTransactionScheduler // ConsensusAddressRequiredRoles are the Node roles that require Consensus Address. ConsensusAddressRequiredRoles = node.RoleValidator @@ -1228,12 +1229,12 @@ func SanityCheckNodes(nodes []*node.SignedNode, seenEntities map[signature.Publi return fmt.Errorf("registry: sanity check failed: key manager node must have runtime(s)") } - if n.HasRoles(node.RoleStorageWorker) && !n.HasRoles(node.RoleComputeWorker) && !n.HasRoles(node.RoleKeyManager) && len(n.Runtimes) > 0 { - return fmt.Errorf("registry: sanity check failed: storage worker node shouldn't have any runtimes") + if n.HasRoles(node.RoleTransactionScheduler) && len(n.Runtimes) == 0 { + return fmt.Errorf("registry: sanity check failed: transaction scheduler node must have runtime(s)") } - if n.HasRoles(node.RoleTransactionScheduler) && !n.HasRoles(node.RoleComputeWorker) && !n.HasRoles(node.RoleKeyManager) && len(n.Runtimes) > 0 { - return fmt.Errorf("registry: sanity check failed: transaction scheduler node shouldn't have any runtimes") + if n.HasRoles(node.RoleStorageWorker) && !n.HasRoles(node.RoleComputeWorker) && !n.HasRoles(node.RoleKeyManager) && len(n.Runtimes) > 0 { + return fmt.Errorf("registry: sanity check failed: storage worker node shouldn't have any runtimes") } if n.HasRoles(node.RoleMergeWorker) && !n.HasRoles(node.RoleComputeWorker) && !n.HasRoles(node.RoleKeyManager) && len(n.Runtimes) > 0 { diff --git a/go/worker/txnscheduler/committee/node.go b/go/worker/txnscheduler/committee/node.go index a434939887f..7763da932f3 100644 --- a/go/worker/txnscheduler/committee/node.go +++ b/go/worker/txnscheduler/committee/node.go @@ -20,7 +20,9 @@ import ( "github.com/oasislabs/oasis-core/go/roothash/api/block" "github.com/oasislabs/oasis-core/go/runtime/transaction" storage "github.com/oasislabs/oasis-core/go/storage/api" + commonWorker "github.com/oasislabs/oasis-core/go/worker/common" "github.com/oasislabs/oasis-core/go/worker/common/committee" + "github.com/oasislabs/oasis-core/go/worker/common/host" "github.com/oasislabs/oasis-core/go/worker/common/p2p" computeCommittee "github.com/oasislabs/oasis-core/go/worker/compute/committee" txnSchedulerAlgorithm "github.com/oasislabs/oasis-core/go/worker/txnscheduler/algorithm" @@ -50,6 +52,8 @@ var ( // Node is a committee node. type Node struct { + *commonWorker.RuntimeHostNode + commonNode *committee.Node computeNode *computeCommittee.Node @@ -385,6 +389,15 @@ func (n *Node) worker() { n.logger.Info("starting committee node") + // Initialize worker host for the new runtime. + if err := n.InitializeRuntimeWorkerHost(n.ctx); err != nil { + n.logger.Error("failed to initialize worker host", + "err", err, + ) + return + } + defer n.StopRuntimeWorkerHost() + // Initialize transaction scheduler's algorithm. runtime, err := n.commonNode.Runtime.RegistryDescriptor(n.ctx) if err != nil { @@ -437,6 +450,7 @@ func (n *Node) worker() { func NewNode( commonNode *committee.Node, computeNode *computeCommittee.Node, + workerHostFactory host.Factory, ) (*Node, error) { metricsOnce.Do(func() { prometheus.MustRegister(nodeCollectors...) @@ -445,6 +459,7 @@ func NewNode( ctx, cancel := context.WithCancel(context.Background()) n := &Node{ + RuntimeHostNode: commonWorker.NewRuntimeHostNode(commonNode, workerHostFactory), commonNode: commonNode, computeNode: computeNode, ctx: ctx, diff --git a/go/worker/txnscheduler/worker.go b/go/worker/txnscheduler/worker.go index e8cf728ae8f..8a7992c3b14 100644 --- a/go/worker/txnscheduler/worker.go +++ b/go/worker/txnscheduler/worker.go @@ -1,6 +1,8 @@ package txnscheduler import ( + "context" + "github.com/oasislabs/oasis-core/go/common" "github.com/oasislabs/oasis-core/go/common/logging" "github.com/oasislabs/oasis-core/go/common/node" @@ -14,6 +16,8 @@ import ( // Worker is a transaction scheduler handling many runtimes. type Worker struct { + *workerCommon.RuntimeHostWorker + enabled bool commonWorker *workerCommon.Worker @@ -22,8 +26,10 @@ type Worker struct { runtimes map[common.Namespace]*committee.Node - quitCh chan struct{} - initCh chan struct{} + ctx context.Context + cancelCtx context.CancelFunc + quitCh chan struct{} + initCh chan struct{} logger *logging.Logger } @@ -139,7 +145,14 @@ func (w *Worker) registerRuntime(commonNode *committeeCommon.Node) error { // Get other nodes from this runtime. computeNode := w.compute.GetRuntime(id) - node, err := committee.NewNode(commonNode, computeNode) + // Create worker host for the given runtime. + workerHostFactory, err := w.NewRuntimeWorkerHostFactory(node.RoleTransactionScheduler, id) + if err != nil { + return err + } + + // Create committee node for the given runtime. + node, err := committee.NewNode(commonNode, computeNode, workerHostFactory) if err != nil { return err } @@ -160,12 +173,16 @@ func newWorker( compute *compute.Worker, registration *registration.Worker, ) (*Worker, error) { + ctx, cancelCtx := context.WithCancel(context.Background()) + w := &Worker{ enabled: enabled, commonWorker: commonWorker, registration: registration, compute: compute, runtimes: make(map[common.Namespace]*committee.Node), + ctx: ctx, + cancelCtx: cancelCtx, quitCh: make(chan struct{}), initCh: make(chan struct{}), logger: logging.GetLogger("worker/txnscheduler"), @@ -176,19 +193,72 @@ func newWorker( panic("common worker should have been enabled for transaction scheduler") } + // Create the runtime host worker. + var err error + w.RuntimeHostWorker, err = workerCommon.NewRuntimeHostWorker(commonWorker) + if err != nil { + return nil, err + } + // Use existing gRPC server passed from the node. api.RegisterService(commonWorker.Grpc.Server(), w) // Register all configured runtimes. for _, rt := range commonWorker.GetRuntimes() { - if err := w.registerRuntime(rt); err != nil { + if err = w.registerRuntime(rt); err != nil { return nil, err } } // Register transaction scheduler worker role. - if err := w.registration.RegisterRole(node.RoleTransactionScheduler, - func(n *node.Node) error { return nil }); err != nil { + if err = w.registration.RegisterRole(node.RoleTransactionScheduler, func(n *node.Node) error { + // Wait until all the runtimes are initialized. + for _, rt := range w.runtimes { + select { + case <-rt.Initialized(): + case <-w.ctx.Done(): + return w.ctx.Err() + } + } + + for _, rt := range n.Runtimes { + var grr error + + workerRT := w.runtimes[rt.ID] + if workerRT == nil { + continue + } + + workerHost := workerRT.GetWorkerHost() + if workerHost == nil { + w.logger.Debug("runtime has shut down", + "runtime", rt.ID, + ) + continue + } + if rt.Capabilities.TEE, grr = workerHost.WaitForCapabilityTEE(w.ctx); grr != nil { + w.logger.Error("failed to obtain CapabilityTEE", + "err", grr, + "runtime", rt.ID, + ) + continue + } + + runtimeVersion, grr := workerHost.WaitForRuntimeVersion(w.ctx) + if grr == nil && runtimeVersion != nil { + rt.Version = *runtimeVersion + } else { + w.logger.Error("failed to obtain RuntimeVersion", + "err", grr, + "runtime", rt.ID, + "runtime_version", runtimeVersion, + ) + continue + } + } + + return nil + }); err != nil { return nil, err } }