Skip to content

Commit

Permalink
keymanager: Preregister on updateStatus() to enable replication
Browse files Browse the repository at this point in the history
  • Loading branch information
matevz committed Jun 2, 2020
1 parent 2660506 commit fc7b0ea
Show file tree
Hide file tree
Showing 9 changed files with 38 additions and 44 deletions.
11 changes: 5 additions & 6 deletions .changelog/2130.internal.md
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
client protocol: Add IsReady() and WaitReady() RPC methods
go/control: Add IsReady() and WaitReady() RPC methods

Beside `IsSynced()` and `WaitSynced()` which are triggered when a node is
registered, new `IsReady()` and `WaitReady()` methods have been added to
client protocol. These are triggered when all node workers have been
Beside `IsSynced()` and `WaitSynced()` which are triggered when the consensus
backend is synced, new `IsReady()` and `WaitReady()` methods have been added
to the client protocol. These are triggered when all node workers have been
initialized (including the runtimes) and the hosted processes are ready to
process requests. Some oasis-test-runner scenarios have been updated
accordingly.
process requests.

In addition new `oasis-node debug control wait-ready`
command was added which blocks the client until the node is ready.
6 changes: 3 additions & 3 deletions go/control/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,11 @@ type Status struct {
Consensus consensus.Status `json:"consensus"`
}

// Shutdownable is an interface the node presents for shutting itself down.
// TODO: Rename Shutdownable to Node? And Node to NodeImpl?
type Shutdownable interface {
// ControlledNode is an interface the node presents for shutting itself down.
type ControlledNode interface {
// RequestShutdown is the method called by the control server to trigger node shutdown.
RequestShutdown() (<-chan struct{}, error)

// Ready returns a channel that is closed once node is ready.
Ready() <-chan struct{}
}
Expand Down
4 changes: 0 additions & 4 deletions go/control/api/grpc_debug.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,10 +99,6 @@ func (c *debugControllerClient) WaitNodesRegistered(ctx context.Context, count i
return c.conn.Invoke(ctx, methodWaitNodesRegistered.FullName(), count, nil)
}

func (c *debugControllerClient) WaitReady(ctx context.Context, count int) error {
return c.conn.Invoke(ctx, methodWaitReady.FullName(), count, nil)
}

// NewDebugControllerClient creates a new gRPC debug controller client service.
func NewDebugControllerClient(c *grpc.ClientConn) DebugController {
return &debugControllerClient{c}
Expand Down
4 changes: 2 additions & 2 deletions go/control/control.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
)

type nodeController struct {
node control.Shutdownable
node control.ControlledNode
consensus consensus.Backend
upgrader upgrade.Backend
}
Expand Down Expand Up @@ -93,7 +93,7 @@ func (c *nodeController) GetStatus(ctx context.Context) (*control.Status, error)
}

// New creates a new oasis-node controller.
func New(node control.Shutdownable, consensus consensus.Backend, upgrader upgrade.Backend) control.NodeController {
func New(node control.ControlledNode, consensus consensus.Backend, upgrader upgrade.Backend) control.NodeController {
return &nodeController{
node: node,
consensus: consensus,
Expand Down
2 changes: 1 addition & 1 deletion go/oasis-node/cmd/debug/control/control.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ var (
controlWaitReadyCmd = &cobra.Command{
Use: "wait-ready",
Short: "wait for node to become ready",
Long: "Wait for node consensus to be synced and runtimes being registered, " +
Long: "Wait for the consensus backend to be synced and runtimes being registered, " +
"initialized, and ready to accept the workload.",
Run: doWaitReady,
}
Expand Down
16 changes: 6 additions & 10 deletions go/oasis-node/cmd/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ import (
)

var (
_ controlAPI.Shutdownable = (*Node)(nil)
_ controlAPI.ControlledNode = (*Node)(nil)

// Flags has the configuration flags.
Flags = flag.NewFlagSet("", flag.ContinueOnError)
Expand Down Expand Up @@ -166,13 +166,15 @@ func (n *Node) RequestShutdown() (<-chan struct{}, error) {
return n.RegistrationWorker.Quit(), nil
}

// Ready returns the ready channel which gets closed once the node is ready.
func (n *Node) Ready() <-chan struct{} {
return n.readyCh
}

func (n *Node) WaitReady() error {
func (n *Node) waitReady(logger *logging.Logger) {
if err := n.NodeController.WaitSync(context.Background()); err != nil {
return err
logger.Error("failed while waiting for node consensus sync", "err", err)
return
}

// Wait for storage worker.
Expand Down Expand Up @@ -206,8 +208,6 @@ func (n *Node) WaitReady() error {
}

close(n.readyCh)

return nil
}

func (n *Node) RegistrationStopped() {
Expand Down Expand Up @@ -458,11 +458,7 @@ func (n *Node) startWorkers(logger *logging.Logger) error {
}

// Close readyCh once all workers and runtimes are initialized.
go func() {
if err := n.WaitReady(); err != nil {
logger.Error("failed waiting for ready channel", "err", err)
}
}()
go n.waitReady(logger)

return nil
}
Expand Down
4 changes: 2 additions & 2 deletions go/oasis-test-runner/oasis/oasis.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,11 +149,11 @@ func (n *Node) BinaryPath() string {

// WaitReady is a helper for creating a controller and calling node's WaitReady.
func (n *Node) WaitReady(ctx context.Context) error {
kmCtrl, err := NewController(n.SocketPath())
nodeCtrl, err := NewController(n.SocketPath())
if err != nil {
return err
}
if err = kmCtrl.WaitReady(ctx); err != nil {
if err = nodeCtrl.WaitReady(ctx); err != nil {
return err
}

Expand Down
17 changes: 10 additions & 7 deletions go/oasis-test-runner/scenario/e2e/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -630,7 +630,7 @@ func (sc *runtimeImpl) initialEpochTransitions() error {
sc.logger.Info("epoch transition done")
}

// Wait for storage workers, compute workers, and byzantine nodes to become ready.
// Wait for storage workers and compute workers to become ready.
sc.logger.Info("waiting for storage workers to initialize",
"num_storage_workers", len(sc.net.StorageWorkers()),
)
Expand All @@ -647,12 +647,15 @@ func (sc *runtimeImpl) initialEpochTransitions() error {
return fmt.Errorf("failed to wait for a compute worker: %w", err)
}
}
sc.logger.Info("waiting for byzantine nodes to initialize",
"num_byzantine", len(sc.net.Byzantine()),
)
for _, n := range sc.net.Byzantine() {
if err := n.WaitReady(ctx); err != nil {
return fmt.Errorf("failed to wait for a byzantine node: %w", err)

// Byzantine nodes can only registered. If defined, since we cannot control them directly, wait
// for all nodes to become registered.
if len(sc.net.Byzantine()) > 0 {
sc.logger.Info("waiting for (all) nodes to register",
"num_nodes", sc.net.NumRegisterNodes(),
)
if err := sc.net.Controller().WaitNodesRegistered(ctx, sc.net.NumRegisterNodes()); err != nil {
return fmt.Errorf("failed to wait for nodes: %w", err)
}
}

Expand Down
18 changes: 9 additions & 9 deletions go/worker/keymanager/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,22 +185,22 @@ func (w *Worker) callLocal(ctx context.Context, data []byte) ([]byte, error) {

func (w *Worker) updateStatus(status *api.Status, startedEvent *host.StartedEvent) error {
defer func() {
<-w.Initialized()
w.roleProvider.SetAvailable(func(n *node.Node) error {
rt := n.AddOrUpdateRuntime(w.runtime.ID())
rt.Version = startedEvent.Version
rt.ExtraInfo = nil
rt.Capabilities.TEE = startedEvent.CapabilityTEE
return nil
})

// If initialization failed setup a retry ticker.
if w.initTicker == nil {
w.initTicker = backoff.NewTicker(backoff.NewExponentialBackOff())
w.initTickerCh = w.initTicker.C
}
}()

// Pre-register to allow replication.
w.roleProvider.SetAvailable(func(n *node.Node) error {
rt := n.AddOrUpdateRuntime(w.runtime.ID())
rt.Version = startedEvent.Version
rt.ExtraInfo = nil
rt.Capabilities.TEE = startedEvent.CapabilityTEE
return nil
})

// Initialize the key manager.
type InitRequest struct {
Checksum []byte `json:"checksum"`
Expand Down

0 comments on commit fc7b0ea

Please sign in to comment.