From fc7b0eaedf4f463fd473268df701d772e384a9d7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Matev=C5=BE=20Jekovec?= Date: Tue, 2 Jun 2020 13:27:11 +0200 Subject: [PATCH] keymanager: Preregister on updateStatus() to enable replication --- .changelog/2130.internal.md | 11 +++++------ go/control/api/api.go | 6 +++--- go/control/api/grpc_debug.go | 4 ---- go/control/control.go | 4 ++-- go/oasis-node/cmd/debug/control/control.go | 2 +- go/oasis-node/cmd/node/node.go | 16 ++++++---------- go/oasis-test-runner/oasis/oasis.go | 4 ++-- go/oasis-test-runner/scenario/e2e/runtime.go | 17 ++++++++++------- go/worker/keymanager/worker.go | 18 +++++++++--------- 9 files changed, 38 insertions(+), 44 deletions(-) diff --git a/.changelog/2130.internal.md b/.changelog/2130.internal.md index 7634d46e8ab..44f476995ed 100644 --- a/.changelog/2130.internal.md +++ b/.changelog/2130.internal.md @@ -1,11 +1,10 @@ -client protocol: Add IsReady() and WaitReady() RPC methods +go/control: Add IsReady() and WaitReady() RPC methods -Beside `IsSynced()` and `WaitSynced()` which are triggered when a node is -registered, new `IsReady()` and `WaitReady()` methods have been added to -client protocol. These are triggered when all node workers have been +Beside `IsSynced()` and `WaitSynced()` which are triggered when the consensus +backend is synced, new `IsReady()` and `WaitReady()` methods have been added +to the client protocol. These are triggered when all node workers have been initialized (including the runtimes) and the hosted processes are ready to -process requests. Some oasis-test-runner scenarios have been updated -accordingly. +process requests. In addition new `oasis-node debug control wait-ready` command was added which blocks the client until the node is ready. diff --git a/go/control/api/api.go b/go/control/api/api.go index 4677ae3c575..b4f444b979f 100644 --- a/go/control/api/api.go +++ b/go/control/api/api.go @@ -51,11 +51,11 @@ type Status struct { Consensus consensus.Status `json:"consensus"` } -// Shutdownable is an interface the node presents for shutting itself down. -// TODO: Rename Shutdownable to Node? And Node to NodeImpl? -type Shutdownable interface { +// ControlledNode is an interface the node presents for shutting itself down. +type ControlledNode interface { // RequestShutdown is the method called by the control server to trigger node shutdown. RequestShutdown() (<-chan struct{}, error) + // Ready returns a channel that is closed once node is ready. Ready() <-chan struct{} } diff --git a/go/control/api/grpc_debug.go b/go/control/api/grpc_debug.go index 0d81c8ab012..14f78cf4ab3 100644 --- a/go/control/api/grpc_debug.go +++ b/go/control/api/grpc_debug.go @@ -99,10 +99,6 @@ func (c *debugControllerClient) WaitNodesRegistered(ctx context.Context, count i return c.conn.Invoke(ctx, methodWaitNodesRegistered.FullName(), count, nil) } -func (c *debugControllerClient) WaitReady(ctx context.Context, count int) error { - return c.conn.Invoke(ctx, methodWaitReady.FullName(), count, nil) -} - // NewDebugControllerClient creates a new gRPC debug controller client service. func NewDebugControllerClient(c *grpc.ClientConn) DebugController { return &debugControllerClient{c} diff --git a/go/control/control.go b/go/control/control.go index 0826fe1f091..f5bb58fa999 100644 --- a/go/control/control.go +++ b/go/control/control.go @@ -11,7 +11,7 @@ import ( ) type nodeController struct { - node control.Shutdownable + node control.ControlledNode consensus consensus.Backend upgrader upgrade.Backend } @@ -93,7 +93,7 @@ func (c *nodeController) GetStatus(ctx context.Context) (*control.Status, error) } // New creates a new oasis-node controller. -func New(node control.Shutdownable, consensus consensus.Backend, upgrader upgrade.Backend) control.NodeController { +func New(node control.ControlledNode, consensus consensus.Backend, upgrader upgrade.Backend) control.NodeController { return &nodeController{ node: node, consensus: consensus, diff --git a/go/oasis-node/cmd/debug/control/control.go b/go/oasis-node/cmd/debug/control/control.go index 80b353f6c8a..83e8f4ddf36 100644 --- a/go/oasis-node/cmd/debug/control/control.go +++ b/go/oasis-node/cmd/debug/control/control.go @@ -40,7 +40,7 @@ var ( controlWaitReadyCmd = &cobra.Command{ Use: "wait-ready", Short: "wait for node to become ready", - Long: "Wait for node consensus to be synced and runtimes being registered, " + + Long: "Wait for the consensus backend to be synced and runtimes being registered, " + "initialized, and ready to accept the workload.", Run: doWaitReady, } diff --git a/go/oasis-node/cmd/node/node.go b/go/oasis-node/cmd/node/node.go index cf3dad03d40..1e05538bcd1 100644 --- a/go/oasis-node/cmd/node/node.go +++ b/go/oasis-node/cmd/node/node.go @@ -69,7 +69,7 @@ import ( ) var ( - _ controlAPI.Shutdownable = (*Node)(nil) + _ controlAPI.ControlledNode = (*Node)(nil) // Flags has the configuration flags. Flags = flag.NewFlagSet("", flag.ContinueOnError) @@ -166,13 +166,15 @@ func (n *Node) RequestShutdown() (<-chan struct{}, error) { return n.RegistrationWorker.Quit(), nil } +// Ready returns the ready channel which gets closed once the node is ready. func (n *Node) Ready() <-chan struct{} { return n.readyCh } -func (n *Node) WaitReady() error { +func (n *Node) waitReady(logger *logging.Logger) { if err := n.NodeController.WaitSync(context.Background()); err != nil { - return err + logger.Error("failed while waiting for node consensus sync", "err", err) + return } // Wait for storage worker. @@ -206,8 +208,6 @@ func (n *Node) WaitReady() error { } close(n.readyCh) - - return nil } func (n *Node) RegistrationStopped() { @@ -458,11 +458,7 @@ func (n *Node) startWorkers(logger *logging.Logger) error { } // Close readyCh once all workers and runtimes are initialized. - go func() { - if err := n.WaitReady(); err != nil { - logger.Error("failed waiting for ready channel", "err", err) - } - }() + go n.waitReady(logger) return nil } diff --git a/go/oasis-test-runner/oasis/oasis.go b/go/oasis-test-runner/oasis/oasis.go index 2f9ae5dac8d..dd87540e968 100644 --- a/go/oasis-test-runner/oasis/oasis.go +++ b/go/oasis-test-runner/oasis/oasis.go @@ -149,11 +149,11 @@ func (n *Node) BinaryPath() string { // WaitReady is a helper for creating a controller and calling node's WaitReady. func (n *Node) WaitReady(ctx context.Context) error { - kmCtrl, err := NewController(n.SocketPath()) + nodeCtrl, err := NewController(n.SocketPath()) if err != nil { return err } - if err = kmCtrl.WaitReady(ctx); err != nil { + if err = nodeCtrl.WaitReady(ctx); err != nil { return err } diff --git a/go/oasis-test-runner/scenario/e2e/runtime.go b/go/oasis-test-runner/scenario/e2e/runtime.go index 0ce07e93d5c..e5dcdb106e8 100644 --- a/go/oasis-test-runner/scenario/e2e/runtime.go +++ b/go/oasis-test-runner/scenario/e2e/runtime.go @@ -630,7 +630,7 @@ func (sc *runtimeImpl) initialEpochTransitions() error { sc.logger.Info("epoch transition done") } - // Wait for storage workers, compute workers, and byzantine nodes to become ready. + // Wait for storage workers and compute workers to become ready. sc.logger.Info("waiting for storage workers to initialize", "num_storage_workers", len(sc.net.StorageWorkers()), ) @@ -647,12 +647,15 @@ func (sc *runtimeImpl) initialEpochTransitions() error { return fmt.Errorf("failed to wait for a compute worker: %w", err) } } - sc.logger.Info("waiting for byzantine nodes to initialize", - "num_byzantine", len(sc.net.Byzantine()), - ) - for _, n := range sc.net.Byzantine() { - if err := n.WaitReady(ctx); err != nil { - return fmt.Errorf("failed to wait for a byzantine node: %w", err) + + // Byzantine nodes can only registered. If defined, since we cannot control them directly, wait + // for all nodes to become registered. + if len(sc.net.Byzantine()) > 0 { + sc.logger.Info("waiting for (all) nodes to register", + "num_nodes", sc.net.NumRegisterNodes(), + ) + if err := sc.net.Controller().WaitNodesRegistered(ctx, sc.net.NumRegisterNodes()); err != nil { + return fmt.Errorf("failed to wait for nodes: %w", err) } } diff --git a/go/worker/keymanager/worker.go b/go/worker/keymanager/worker.go index 097b50da70c..da71f5a610d 100644 --- a/go/worker/keymanager/worker.go +++ b/go/worker/keymanager/worker.go @@ -185,15 +185,6 @@ func (w *Worker) callLocal(ctx context.Context, data []byte) ([]byte, error) { func (w *Worker) updateStatus(status *api.Status, startedEvent *host.StartedEvent) error { defer func() { - <-w.Initialized() - w.roleProvider.SetAvailable(func(n *node.Node) error { - rt := n.AddOrUpdateRuntime(w.runtime.ID()) - rt.Version = startedEvent.Version - rt.ExtraInfo = nil - rt.Capabilities.TEE = startedEvent.CapabilityTEE - return nil - }) - // If initialization failed setup a retry ticker. if w.initTicker == nil { w.initTicker = backoff.NewTicker(backoff.NewExponentialBackOff()) @@ -201,6 +192,15 @@ func (w *Worker) updateStatus(status *api.Status, startedEvent *host.StartedEven } }() + // Pre-register to allow replication. + w.roleProvider.SetAvailable(func(n *node.Node) error { + rt := n.AddOrUpdateRuntime(w.runtime.ID()) + rt.Version = startedEvent.Version + rt.ExtraInfo = nil + rt.Capabilities.TEE = startedEvent.CapabilityTEE + return nil + }) + // Initialize the key manager. type InitRequest struct { Checksum []byte `json:"checksum"`