Skip to content

Commit

Permalink
go/oasis-node/cmd/node: Tidy up worker initialization and start up
Browse files Browse the repository at this point in the history
  • Loading branch information
tjanez committed Nov 13, 2019
1 parent 24e0df8 commit 459578f
Showing 1 changed file with 43 additions and 28 deletions.
71 changes: 43 additions & 28 deletions go/oasis-node/cmd/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ import (
workerCommon "github.com/oasislabs/oasis-core/go/worker/common"
"github.com/oasislabs/oasis-core/go/worker/common/p2p"
"github.com/oasislabs/oasis-core/go/worker/compute"
keymanagerWorker "github.com/oasislabs/oasis-core/go/worker/keymanager"
workerKeymanager "github.com/oasislabs/oasis-core/go/worker/keymanager"
"github.com/oasislabs/oasis-core/go/worker/merge"
"github.com/oasislabs/oasis-core/go/worker/registration"
workerSentry "github.com/oasislabs/oasis-core/go/worker/sentry"
Expand Down Expand Up @@ -124,6 +124,7 @@ type Node struct {
SentryWorker *workerSentry.Worker
P2P *p2p.P2P
RegistrationWorker *registration.Worker
KeymanagerWorker *workerKeymanager.Worker
}

// Cleanup cleans up after the node has terminated.
Expand Down Expand Up @@ -186,21 +187,23 @@ func (n *Node) initBackends() error {
return nil
}

func (n *Node) initAndStartWorkers(logger *logging.Logger) error {
func (n *Node) initWorkers(logger *logging.Logger) error {
dataDir := cmdCommon.DataDir()

var err error

genesisDoc, errr := n.Genesis.GetGenesisDocument()
if errr != nil {
return errr
genesisDoc, err := n.Genesis.GetGenesisDocument()
if err != nil {
return err
}

// Initialize the worker P2P if any workers are enabled. Since the P2P layer
// does not have a separate Start method and starts listening immediately
// when created, make sure that we don't start if if not needed.
// Initialize the P2P worker if any workers are enabled. Since the P2P
// layer does not have a separate Start method and starts listening
// immediately when created, make sure that we don't start it if it is not
// needed.
//
// Only compute, txn scheduler and merge workers need P2P transport.
// Currently, only compute, txn scheduler and merge workers need P2P
// transport.
if compute.Enabled() || txnscheduler.Enabled() || merge.Enabled() {
p2pCtx, p2pSvc := service.NewContextCleanup(context.Background())
if genesisDoc.Registry.Parameters.DebugAllowUnroutableAddresses {
Expand All @@ -213,10 +216,10 @@ func (n *Node) initAndStartWorkers(logger *logging.Logger) error {
n.svcMgr.RegisterCleanupOnly(p2pSvc, "worker p2p")
}

// Start common worker.
// Initialize the common worker.
n.CommonWorker, err = workerCommon.New(
dataDir,
compute.Enabled() || workerStorage.Enabled() || txnscheduler.Enabled() || merge.Enabled() || keymanagerWorker.Enabled(),
compute.Enabled() || workerStorage.Enabled() || txnscheduler.Enabled() || merge.Enabled() || workerKeymanager.Enabled(),
n.Identity,
n.Storage,
n.RootHash,
Expand All @@ -240,7 +243,7 @@ func (n *Node) initAndStartWorkers(logger *logging.Logger) error {

workerCommonCfg := n.CommonWorker.GetConfig()

// Initialize the worker registration.
// Initialize the registration worker.
n.RegistrationWorker, err = registration.New(
dataDir,
n.Epochtime,
Expand All @@ -260,8 +263,8 @@ func (n *Node) initAndStartWorkers(logger *logging.Logger) error {
}
n.svcMgr.Register(n.RegistrationWorker)

// Initialize the key manager worker service.
kmSvc, err := keymanagerWorker.New(
// Initialize the key manager worker.
n.KeymanagerWorker, err = workerKeymanager.New(
dataDir,
n.CommonWorker,
n.IAS,
Expand All @@ -271,7 +274,7 @@ func (n *Node) initAndStartWorkers(logger *logging.Logger) error {
if err != nil {
return err
}
n.svcMgr.Register(kmSvc)
n.svcMgr.Register(n.KeymanagerWorker)

// Initialize the storage worker.
n.StorageWorker, err = workerStorage.New(
Expand Down Expand Up @@ -330,49 +333,53 @@ func (n *Node) initAndStartWorkers(logger *logging.Logger) error {
}
n.svcMgr.Register(n.TransactionSchedulerWorker)

return nil
}

func (n *Node) startWorkers(logger *logging.Logger) error {
// Start the storage worker.
if err = n.StorageWorker.Start(); err != nil {
if err := n.StorageWorker.Start(); err != nil {
return err
}

// Start the compute worker.
if err = n.ComputeWorker.Start(); err != nil {
if err := n.ComputeWorker.Start(); err != nil {
return err
}

// Start the transaction scheduler.
if err = n.TransactionSchedulerWorker.Start(); err != nil {
if err := n.TransactionSchedulerWorker.Start(); err != nil {
return err
}

// Start the merge worker.
if err = n.MergeWorker.Start(); err != nil {
if err := n.MergeWorker.Start(); err != nil {
return err
}

// Start the common worker.
if err = n.CommonWorker.Start(); err != nil {
if err := n.CommonWorker.Start(); err != nil {
return err
}

// Start the key manager worker.
if err = kmSvc.Start(); err != nil {
if err := n.KeymanagerWorker.Start(); err != nil {
return err
}

// Start the sentry worker.
if err = n.SentryWorker.Start(); err != nil {
if err := n.SentryWorker.Start(); err != nil {
return err
}

// Start the worker registration service.
if err = n.RegistrationWorker.Start(); err != nil {
if err := n.RegistrationWorker.Start(); err != nil {
return err
}

// Only start the external gRPC server if any workers are enabled.
if n.StorageWorker.Enabled() || n.TransactionSchedulerWorker.Enabled() || n.MergeWorker.Enabled() || kmSvc.Enabled() {
if err = n.CommonWorker.Grpc.Start(); err != nil {
if n.StorageWorker.Enabled() || n.TransactionSchedulerWorker.Enabled() || n.MergeWorker.Enabled() || n.KeymanagerWorker.Enabled() {
if err := n.CommonWorker.Grpc.Start(); err != nil {
logger.Error("failed to start external gRPC server",
"err", err,
)
Expand Down Expand Up @@ -672,14 +679,22 @@ func newNode(testNode bool) (*Node, error) {
return nil, err
}

// Initialize and start workers.
if err = node.initAndStartWorkers(logger); err != nil {
// Initialize workers.
if err = node.initWorkers(logger); err != nil {
logger.Error("failed to initialize workers",
"err", err,
)
return nil, err
}

// Start workers.
if err = node.startWorkers(logger); err != nil {
logger.Error("failed to start workers",
"err", err,
)
return nil, err
}

// Start the node control server.
control.NewGRPCServer(node.grpcInternal, node, node.Client)

Expand Down Expand Up @@ -723,7 +738,7 @@ func init() {
tendermint.Flags,
ias.Flags,
keymanagerClient.Flags,
keymanagerWorker.Flags,
workerKeymanager.Flags,
client.Flags,
compute.Flags,
p2p.Flags,
Expand Down

0 comments on commit 459578f

Please sign in to comment.