Skip to content

Commit

Permalink
Merge pull request #2957 from oasisprotocol/matevz/feature/waitready-rpc
Browse files Browse the repository at this point in the history
oasis-node: Add support for IsReady and WaitReady RPC methods
  • Loading branch information
kostko authored Jun 8, 2020
2 parents dc893d3 + d0551ab commit 26b9431
Show file tree
Hide file tree
Showing 14 changed files with 356 additions and 68 deletions.
10 changes: 10 additions & 0 deletions .changelog/2130.internal.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
go/control: Add IsReady() and WaitReady() RPC methods

Beside `IsSynced()` and `WaitSynced()` which are triggered when the consensus
backend is synced, new `IsReady()` and `WaitReady()` methods have been added
to the client protocol. These are triggered when all node workers have been
initialized (including the runtimes) and the hosted processes are ready to
process requests.

In addition new `oasis-node debug control wait-ready`
command was added which blocks the client until the node is ready.
5 changes: 5 additions & 0 deletions .changelog/2957.internal.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
go/worker/registration: Add SetAvailableWithCallback to RoleProvider

The new method allows the caller to register a callback that will be invoked
on a successful registration that includes the node descriptor updated by the
passed hook.
15 changes: 11 additions & 4 deletions go/control/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,17 @@ type NodeController interface {
RequestShutdown(ctx context.Context, wait bool) error

// WaitSync waits for the node to finish syncing.
// TODO: These should be replaced with WaitReady (see oasis-core#2130).
WaitSync(ctx context.Context) error

// IsSynced checks whether the node has finished syncing.
// TODO: These should be replaced with IsReady (see oasis-core#2130).
IsSynced(ctx context.Context) (bool, error)

// WaitReady waits for the node to accept runtime work.
WaitReady(ctx context.Context) error

// IsReady checks whether the node is ready to accept runtime work.
IsReady(ctx context.Context) (bool, error)

// UpgradeBinary submits an upgrade descriptor to a running node.
// The node will wait for the appropriate epoch, then update its binaries
// and shut down.
Expand All @@ -47,10 +51,13 @@ type Status struct {
Consensus consensus.Status `json:"consensus"`
}

// Shutdownable is an interface the node presents for shutting itself down.
type Shutdownable interface {
// ControlledNode is an interface the node presents for shutting itself down.
type ControlledNode interface {
// RequestShutdown is the method called by the control server to trigger node shutdown.
RequestShutdown() (<-chan struct{}, error)

// Ready returns a channel that is closed once node is ready.
Ready() <-chan struct{}
}

// DebugModuleName is the module name for the debug controller service.
Expand Down
62 changes: 62 additions & 0 deletions go/control/api/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@ var (
methodWaitSync = serviceName.NewMethod("WaitSync", nil)
// methodIsSynced is the IsSynced method.
methodIsSynced = serviceName.NewMethod("IsSynced", nil)
// methodWaitReady is the WaitReady method.
methodWaitReady = serviceName.NewMethod("WaitReady", nil)
// methodIsReady is the IsReady method.
methodIsReady = serviceName.NewMethod("IsReady", nil)
// methodUpgradeBinary is the UpgradeBinary method.
methodUpgradeBinary = serviceName.NewMethod("UpgradeBinary", upgradeApi.Descriptor{})
// methodCancelUpgrade is the CancelUpgrade method.
Expand All @@ -43,6 +47,14 @@ var (
MethodName: methodIsSynced.ShortName(),
Handler: handlerIsSynced,
},
{
MethodName: methodWaitReady.ShortName(),
Handler: handlerWaitReady,
},
{
MethodName: methodIsReady.ShortName(),
Handler: handlerIsReady,
},
{
MethodName: methodUpgradeBinary.ShortName(),
Handler: handlerUpgradeBinary,
Expand Down Expand Up @@ -121,6 +133,44 @@ func handlerIsSynced( // nolint: golint
return interceptor(ctx, nil, info, handler)
}

func handlerWaitReady( // nolint: golint
srv interface{},
ctx context.Context,
dec func(interface{}) error,
interceptor grpc.UnaryServerInterceptor,
) (interface{}, error) {
if interceptor == nil {
return nil, srv.(NodeController).WaitReady(ctx)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: methodWaitReady.FullName(),
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return nil, srv.(NodeController).WaitReady(ctx)
}
return interceptor(ctx, nil, info, handler)
}

func handlerIsReady( // nolint: golint
srv interface{},
ctx context.Context,
dec func(interface{}) error,
interceptor grpc.UnaryServerInterceptor,
) (interface{}, error) {
if interceptor == nil {
return srv.(NodeController).IsReady(ctx)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: methodIsSynced.FullName(),
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(NodeController).IsReady(ctx)
}
return interceptor(ctx, nil, info, handler)
}

func handlerUpgradeBinary( // nolint: golint
srv interface{},
ctx context.Context,
Expand Down Expand Up @@ -207,6 +257,18 @@ func (c *nodeControllerClient) IsSynced(ctx context.Context) (bool, error) {
return rsp, nil
}

func (c *nodeControllerClient) WaitReady(ctx context.Context) error {
return c.conn.Invoke(ctx, methodWaitReady.FullName(), nil, nil)
}

func (c *nodeControllerClient) IsReady(ctx context.Context) (bool, error) {
var rsp bool
if err := c.conn.Invoke(ctx, methodIsReady.FullName(), nil, &rsp); err != nil {
return false, err
}
return rsp, nil
}

func (c *nodeControllerClient) UpgradeBinary(ctx context.Context, descriptor *upgradeApi.Descriptor) error {
return c.conn.Invoke(ctx, methodUpgradeBinary.FullName(), descriptor, nil)
}
Expand Down
24 changes: 22 additions & 2 deletions go/control/control.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
)

type nodeController struct {
node control.Shutdownable
node control.ControlledNode
consensus consensus.Backend
upgrader upgrade.Backend
}
Expand Down Expand Up @@ -52,6 +52,26 @@ func (c *nodeController) IsSynced(ctx context.Context) (bool, error) {
}
}

func (c *nodeController) WaitReady(ctx context.Context) error {
select {
case <-ctx.Done():
return ctx.Err()
case <-c.node.Ready():
return nil
}
}

func (c *nodeController) IsReady(ctx context.Context) (bool, error) {
select {
case <-ctx.Done():
return false, ctx.Err()
case <-c.node.Ready():
return true, nil
default:
return false, nil
}
}

func (c *nodeController) UpgradeBinary(ctx context.Context, descriptor *upgrade.Descriptor) error {
return c.upgrader.SubmitDescriptor(ctx, descriptor)
}
Expand All @@ -73,7 +93,7 @@ func (c *nodeController) GetStatus(ctx context.Context) (*control.Status, error)
}

// New creates a new oasis-node controller.
func New(node control.Shutdownable, consensus consensus.Backend, upgrader upgrade.Backend) control.NodeController {
func New(node control.ControlledNode, consensus consensus.Backend, upgrader upgrade.Backend) control.NodeController {
return &nodeController{
node: node,
consensus: consensus,
Expand Down
21 changes: 17 additions & 4 deletions go/keymanager/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"encoding/base64"
"errors"
"fmt"
"strings"
"time"

"github.com/cenkalti/backoff/v4"
Expand Down Expand Up @@ -80,14 +81,26 @@ func (c *Client) CallRemote(ctx context.Context, data []byte) ([]byte, error) {
Endpoint: api.EnclaveRPCEndpoint,
Payload: data,
})
if status.Code(err) == codes.PermissionDenied {
switch {
case err == nil:
case status.Code(err) == codes.PermissionDenied:
// Calls can fail around epoch transitions, as the access policy
// is being updated, so we must retry.
return err
case status.Code(err) == codes.Unavailable:
// XXX: HACK: Find a better way to determine the root cause.
if strings.Contains(err.Error(), "tls: bad public key") {
// Retry as the access policy could be in the process of being updated.
return err
}

fallthrough
default:
// Request failed, communicate that to the node selection policy.
c.committeeClient.UpdateNodeSelectionPolicy(committee.NodeSelectionFeedback{Bad: err})
return backoff.Permanent(err)
}
// Request failed, communicate that to the node selection policy.
c.committeeClient.UpdateNodeSelectionPolicy(committee.NodeSelectionFeedback{Bad: err})
return backoff.Permanent(err)
return nil
}

retry := backoff.WithMaxRetries(backoff.NewConstantBackOff(retryInterval), maxRetries)
Expand Down
26 changes: 26 additions & 0 deletions go/oasis-node/cmd/debug/control/control.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
epochtime "github.com/oasisprotocol/oasis-core/go/epochtime/api"
cmdCommon "github.com/oasisprotocol/oasis-core/go/oasis-node/cmd/common"
cmdGrpc "github.com/oasisprotocol/oasis-core/go/oasis-node/cmd/common/grpc"
cmdControl "github.com/oasisprotocol/oasis-core/go/oasis-node/cmd/control"
)

var (
Expand All @@ -36,6 +37,14 @@ var (
Run: doWaitNodes,
}

controlWaitReadyCmd = &cobra.Command{
Use: "wait-ready",
Short: "wait for node to become ready",
Long: "Wait for the consensus backend to be synced and runtimes being registered, " +
"initialized, and ready to accept the workload.",
Run: doWaitReady,
}

logger = logging.GetLogger("cmd/debug/control")
)

Expand Down Expand Up @@ -90,6 +99,22 @@ func doWaitNodes(cmd *cobra.Command, args []string) {
logger.Info("enough nodes have been registered")
}

func doWaitReady(cmd *cobra.Command, args []string) {
conn, client := cmdControl.DoConnect(cmd)
defer conn.Close()

logger.Debug("waiting for ready status")

// Use background context to block until the result comes in.
err := client.WaitReady(context.Background())
if err != nil {
logger.Error("failed to wait for ready status",
"err", err,
)
os.Exit(1)
}
}

// Register registers the dummy sub-command and all of its children.
func Register(parentCmd *cobra.Command) {
controlCmd.PersistentFlags().AddFlagSet(cmdGrpc.ClientFlags)
Expand All @@ -98,5 +123,6 @@ func Register(parentCmd *cobra.Command) {

controlCmd.AddCommand(controlSetEpochCmd)
controlCmd.AddCommand(controlWaitNodesCmd)
controlCmd.AddCommand(controlWaitReadyCmd)
parentCmd.AddCommand(controlCmd)
}
Loading

0 comments on commit 26b9431

Please sign in to comment.