Skip to content

Commit

Permalink
go/worker/txnscheduler: Run runtimes
Browse files Browse the repository at this point in the history
  • Loading branch information
abukosek committed Jan 13, 2020
1 parent d29e701 commit 46c3e9d
Show file tree
Hide file tree
Showing 3 changed files with 97 additions and 11 deletions.
11 changes: 6 additions & 5 deletions go/registry/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,8 @@ var (

// RuntimesRequiredRoles are the Node roles that require runtimes.
RuntimesRequiredRoles = node.RoleComputeWorker |
node.RoleKeyManager
node.RoleKeyManager |
node.RoleTransactionScheduler

// ConsensusAddressRequiredRoles are the Node roles that require Consensus Address.
ConsensusAddressRequiredRoles = node.RoleValidator
Expand Down Expand Up @@ -1228,12 +1229,12 @@ func SanityCheckNodes(nodes []*node.SignedNode, seenEntities map[signature.Publi
return fmt.Errorf("registry: sanity check failed: key manager node must have runtime(s)")
}

if n.HasRoles(node.RoleStorageWorker) && !n.HasRoles(node.RoleComputeWorker) && !n.HasRoles(node.RoleKeyManager) && len(n.Runtimes) > 0 {
return fmt.Errorf("registry: sanity check failed: storage worker node shouldn't have any runtimes")
if n.HasRoles(node.RoleTransactionScheduler) && len(n.Runtimes) == 0 {
return fmt.Errorf("registry: sanity check failed: transaction scheduler node must have runtime(s)")
}

if n.HasRoles(node.RoleTransactionScheduler) && !n.HasRoles(node.RoleComputeWorker) && !n.HasRoles(node.RoleKeyManager) && len(n.Runtimes) > 0 {
return fmt.Errorf("registry: sanity check failed: transaction scheduler node shouldn't have any runtimes")
if n.HasRoles(node.RoleStorageWorker) && !n.HasRoles(node.RoleComputeWorker) && !n.HasRoles(node.RoleKeyManager) && len(n.Runtimes) > 0 {
return fmt.Errorf("registry: sanity check failed: storage worker node shouldn't have any runtimes")
}

if n.HasRoles(node.RoleMergeWorker) && !n.HasRoles(node.RoleComputeWorker) && !n.HasRoles(node.RoleKeyManager) && len(n.Runtimes) > 0 {
Expand Down
15 changes: 15 additions & 0 deletions go/worker/txnscheduler/committee/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@ import (
"github.com/oasislabs/oasis-core/go/roothash/api/block"
"github.com/oasislabs/oasis-core/go/runtime/transaction"
storage "github.com/oasislabs/oasis-core/go/storage/api"
commonWorker "github.com/oasislabs/oasis-core/go/worker/common"
"github.com/oasislabs/oasis-core/go/worker/common/committee"
"github.com/oasislabs/oasis-core/go/worker/common/host"
"github.com/oasislabs/oasis-core/go/worker/common/p2p"
computeCommittee "github.com/oasislabs/oasis-core/go/worker/compute/committee"
txnSchedulerAlgorithm "github.com/oasislabs/oasis-core/go/worker/txnscheduler/algorithm"
Expand Down Expand Up @@ -50,6 +52,8 @@ var (

// Node is a committee node.
type Node struct {
*commonWorker.RuntimeHostNode

commonNode *committee.Node
computeNode *computeCommittee.Node

Expand Down Expand Up @@ -385,6 +389,15 @@ func (n *Node) worker() {

n.logger.Info("starting committee node")

// Initialize worker host for the new runtime.
if err := n.InitializeRuntimeWorkerHost(n.ctx); err != nil {
n.logger.Error("failed to initialize worker host",
"err", err,
)
return
}
defer n.StopRuntimeWorkerHost()

// Initialize transaction scheduler's algorithm.
runtime, err := n.commonNode.Runtime.RegistryDescriptor(n.ctx)
if err != nil {
Expand Down Expand Up @@ -437,6 +450,7 @@ func (n *Node) worker() {
func NewNode(
commonNode *committee.Node,
computeNode *computeCommittee.Node,
workerHostFactory host.Factory,
) (*Node, error) {
metricsOnce.Do(func() {
prometheus.MustRegister(nodeCollectors...)
Expand All @@ -445,6 +459,7 @@ func NewNode(
ctx, cancel := context.WithCancel(context.Background())

n := &Node{
RuntimeHostNode: commonWorker.NewRuntimeHostNode(commonNode, workerHostFactory),
commonNode: commonNode,
computeNode: computeNode,
ctx: ctx,
Expand Down
82 changes: 76 additions & 6 deletions go/worker/txnscheduler/worker.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package txnscheduler

import (
"context"

"github.com/oasislabs/oasis-core/go/common"
"github.com/oasislabs/oasis-core/go/common/logging"
"github.com/oasislabs/oasis-core/go/common/node"
Expand All @@ -14,6 +16,8 @@ import (

// Worker is a transaction scheduler handling many runtimes.
type Worker struct {
*workerCommon.RuntimeHostWorker

enabled bool

commonWorker *workerCommon.Worker
Expand All @@ -22,8 +26,10 @@ type Worker struct {

runtimes map[common.Namespace]*committee.Node

quitCh chan struct{}
initCh chan struct{}
ctx context.Context
cancelCtx context.CancelFunc
quitCh chan struct{}
initCh chan struct{}

logger *logging.Logger
}
Expand Down Expand Up @@ -139,7 +145,14 @@ func (w *Worker) registerRuntime(commonNode *committeeCommon.Node) error {
// Get other nodes from this runtime.
computeNode := w.compute.GetRuntime(id)

node, err := committee.NewNode(commonNode, computeNode)
// Create worker host for the given runtime.
workerHostFactory, err := w.NewRuntimeWorkerHostFactory(node.RoleTransactionScheduler, id)
if err != nil {
return err
}

// Create committee node for the given runtime.
node, err := committee.NewNode(commonNode, computeNode, workerHostFactory)
if err != nil {
return err
}
Expand All @@ -160,12 +173,16 @@ func newWorker(
compute *compute.Worker,
registration *registration.Worker,
) (*Worker, error) {
ctx, cancelCtx := context.WithCancel(context.Background())

w := &Worker{
enabled: enabled,
commonWorker: commonWorker,
registration: registration,
compute: compute,
runtimes: make(map[common.Namespace]*committee.Node),
ctx: ctx,
cancelCtx: cancelCtx,
quitCh: make(chan struct{}),
initCh: make(chan struct{}),
logger: logging.GetLogger("worker/txnscheduler"),
Expand All @@ -176,19 +193,72 @@ func newWorker(
panic("common worker should have been enabled for transaction scheduler")
}

// Create the runtime host worker.
var err error
w.RuntimeHostWorker, err = workerCommon.NewRuntimeHostWorker(commonWorker)
if err != nil {
return nil, err
}

// Use existing gRPC server passed from the node.
api.RegisterService(commonWorker.Grpc.Server(), w)

// Register all configured runtimes.
for _, rt := range commonWorker.GetRuntimes() {
if err := w.registerRuntime(rt); err != nil {
if err = w.registerRuntime(rt); err != nil {
return nil, err
}
}

// Register transaction scheduler worker role.
if err := w.registration.RegisterRole(node.RoleTransactionScheduler,
func(n *node.Node) error { return nil }); err != nil {
if err = w.registration.RegisterRole(node.RoleTransactionScheduler, func(n *node.Node) error {
// Wait until all the runtimes are initialized.
for _, rt := range w.runtimes {
select {
case <-rt.Initialized():
case <-w.ctx.Done():
return w.ctx.Err()
}
}

for _, rt := range n.Runtimes {
var grr error

workerRT := w.runtimes[rt.ID]
if workerRT == nil {
continue
}

workerHost := workerRT.GetWorkerHost()
if workerHost == nil {
w.logger.Debug("runtime has shut down",
"runtime", rt.ID,
)
continue
}
if rt.Capabilities.TEE, grr = workerHost.WaitForCapabilityTEE(w.ctx); grr != nil {
w.logger.Error("failed to obtain CapabilityTEE",
"err", grr,
"runtime", rt.ID,
)
continue
}

runtimeVersion, grr := workerHost.WaitForRuntimeVersion(w.ctx)
if grr == nil && runtimeVersion != nil {
rt.Version = *runtimeVersion
} else {
w.logger.Error("failed to obtain RuntimeVersion",
"err", grr,
"runtime", rt.ID,
"runtime_version", runtimeVersion,
)
continue
}
}

return nil
}); err != nil {
return nil, err
}
}
Expand Down

0 comments on commit 46c3e9d

Please sign in to comment.