Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

go/registry: Add support for querying suspended runtimes #3099

Merged
merged 1 commit into from
Jul 15, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changelog/3093.feature.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
go/registry: Add support for querying suspended runtimes

Registry `GetRuntimes` method now accepts a parameter to enable querying for
suspended runtimes.
`WatchRuntimes` will now always include suspended runtimes in the initial
response.
7 changes: 5 additions & 2 deletions go/consensus/tendermint/apps/registry/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ type Query interface {
NodeStatus(context.Context, signature.PublicKey) (*registry.NodeStatus, error)
Nodes(context.Context) ([]*node.Node, error)
Runtime(context.Context, common.Namespace) (*registry.Runtime, error)
Runtimes(context.Context) ([]*registry.Runtime, error)
Runtimes(ctx context.Context, includeSuspended bool) ([]*registry.Runtime, error)
Genesis(context.Context) (*registry.Genesis, error)
}

Expand Down Expand Up @@ -106,7 +106,10 @@ func (rq *registryQuerier) Runtime(ctx context.Context, id common.Namespace) (*r
return rq.state.Runtime(ctx, id)
}

func (rq *registryQuerier) Runtimes(ctx context.Context) ([]*registry.Runtime, error) {
func (rq *registryQuerier) Runtimes(ctx context.Context, includeSuspended bool) ([]*registry.Runtime, error) {
if includeSuspended {
return rq.state.AllRuntimes(ctx)
}
return rq.state.Runtimes(ctx)
}

Expand Down
9 changes: 4 additions & 5 deletions go/consensus/tendermint/registry/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,13 +146,12 @@ func (tb *tendermintBackend) WatchRuntimes(ctx context.Context) (<-chan *api.Run
func (tb *tendermintBackend) Cleanup() {
}

func (tb *tendermintBackend) GetRuntimes(ctx context.Context, height int64) ([]*api.Runtime, error) {
q, err := tb.querier.QueryAt(ctx, height)
func (tb *tendermintBackend) GetRuntimes(ctx context.Context, query *api.GetRuntimesQuery) ([]*api.Runtime, error) {
q, err := tb.querier.QueryAt(ctx, query.Height)
if err != nil {
return nil, err
}

return q.Runtimes(ctx)
return q.Runtimes(ctx, query.IncludeSuspended)
}

func (tb *tendermintBackend) StateToGenesis(ctx context.Context, height int64) (*api.Genesis, error) {
Expand Down Expand Up @@ -455,7 +454,7 @@ func New(ctx context.Context, service service.TendermintService) (api.Backend, e
}
tb.runtimeNotifier = pubsub.NewBrokerEx(func(ch channels.Channel) {
wr := ch.In()
runtimes, err := tb.GetRuntimes(ctx, consensus.HeightLatest)
runtimes, err := tb.GetRuntimes(ctx, &api.GetRuntimesQuery{Height: consensus.HeightLatest, IncludeSuspended: true})
if err != nil {
tb.logger.Error("runtime notifier: unable to get a list of runtimes",
"err", err,
Expand Down
13 changes: 10 additions & 3 deletions go/oasis-node/cmd/debug/txsource/workload/queries.go
Original file line number Diff line number Diff line change
Expand Up @@ -390,12 +390,12 @@ func (q *queries) doRegistryQueries(ctx context.Context, rng *rand.Rand, height
}

// Runtimes.
runtimes, err := q.registry.GetRuntimes(ctx, height)
runtimes, err := q.registry.GetRuntimes(ctx, &registry.GetRuntimesQuery{Height: height, IncludeSuspended: false})
if err != nil {
return fmt.Errorf("GetRuntimes error at height %d: %w", height, err)
return fmt.Errorf("GetRuntimes(IncludeSuspended=false) error at height %d: %w", height, err)
}
if len(runtimes) == 0 {
return fmt.Errorf("GetRuntimes empty response at height %d", height)
return fmt.Errorf("GetRuntimes(IncludeSuspended=false) empty response at height %d", height)
}
for _, rt := range runtimes {
var runtime *registry.Runtime
Expand All @@ -407,6 +407,13 @@ func (q *queries) doRegistryQueries(ctx context.Context, rng *rand.Rand, height
return fmt.Errorf("GetRuntime mismatch, expected: %s, got: %s", rt, runtime)
}
}
allRuntimes, err := q.registry.GetRuntimes(ctx, &registry.GetRuntimesQuery{Height: height, IncludeSuspended: true})
if err != nil {
return fmt.Errorf("GetRuntimes(IncludeSuspended=true) error at height %d: %w", height, err)
}
if len(allRuntimes) < len(runtimes) {
return fmt.Errorf("GetRuntimes(IncludeSuspended=true) returned less runtimes than IncludeSuspended=false, at height %d", height)
}

// Events.
_, err = q.registry.GetEvents(ctx, height)
Expand Down
21 changes: 17 additions & 4 deletions go/oasis-node/cmd/registry/runtime/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,13 +86,17 @@ const (
// Staking parameters flags.
CfgStakingThreshold = "runtime.staking.threshold"

// List runtimes flags.
CfgIncludeSuspended = "include_suspended"

runtimeGenesisFilename = "runtime_genesis.json"
)

var (
outputFlags = flag.NewFlagSet("", flag.ContinueOnError)
runtimeFlags = flag.NewFlagSet("", flag.ContinueOnError)
registerFlags = flag.NewFlagSet("", flag.ContinueOnError)
outputFlags = flag.NewFlagSet("", flag.ContinueOnError)
runtimeFlags = flag.NewFlagSet("", flag.ContinueOnError)
runtimeListFlags = flag.NewFlagSet("", flag.ContinueOnError)
registerFlags = flag.NewFlagSet("", flag.ContinueOnError)

runtimeCmd = &cobra.Command{
Use: "runtime",
Expand Down Expand Up @@ -208,7 +212,11 @@ func doList(cmd *cobra.Command, args []string) {
conn, client := doConnect(cmd)
defer conn.Close()

runtimes, err := client.GetRuntimes(context.Background(), consensus.HeightLatest)
query := &registry.GetRuntimesQuery{
Height: consensus.HeightLatest,
IncludeSuspended: viper.GetBool(CfgIncludeSuspended),
}
runtimes, err := client.GetRuntimes(context.Background(), query)
if err != nil {
logger.Error("failed to query runtimes",
"err", err,
Expand Down Expand Up @@ -517,6 +525,7 @@ func Register(parentCmd *cobra.Command) {

listCmd.Flags().AddFlagSet(cmdGrpc.ClientFlags)
listCmd.Flags().AddFlagSet(cmdFlags.VerboseFlags)
listCmd.Flags().AddFlagSet(runtimeListFlags)

registerCmd.Flags().AddFlagSet(registerFlags)

Expand Down Expand Up @@ -581,4 +590,8 @@ func init() {

registerFlags.AddFlagSet(cmdFlags.DebugTestEntityFlags)
registerFlags.AddFlagSet(cmdConsensus.TxFlags)

// List Runtimes flags.
runtimeListFlags.Bool(CfgIncludeSuspended, false, "Use to include suspended runtimes")
_ = viper.BindPFlags(runtimeListFlags)
}
38 changes: 33 additions & 5 deletions go/oasis-test-runner/scenario/e2e/registry_cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
cmdSigner "github.com/oasisprotocol/oasis-core/go/oasis-node/cmd/common/signer"
cmdRegEnt "github.com/oasisprotocol/oasis-core/go/oasis-node/cmd/registry/entity"
cmdRegNode "github.com/oasisprotocol/oasis-core/go/oasis-node/cmd/registry/node"
cmdRegRuntime "github.com/oasisprotocol/oasis-core/go/oasis-node/cmd/registry/runtime"
"github.com/oasisprotocol/oasis-core/go/oasis-test-runner/env"
"github.com/oasisprotocol/oasis-core/go/oasis-test-runner/oasis"
"github.com/oasisprotocol/oasis-core/go/oasis-test-runner/oasis/cli"
Expand Down Expand Up @@ -83,7 +84,7 @@ func (sc *registryCLIImpl) Run(childEnv *env.Env) error {
}

// registry runtime subcommands
if err := sc.testRuntime(childEnv, cli); err != nil {
if err := sc.testRuntime(ctx, childEnv, cli); err != nil {
return fmt.Errorf("error while running registry runtime test: %w", err)
}

Expand Down Expand Up @@ -589,9 +590,9 @@ func (sc *registryCLIImpl) genDeregisterEntityTx(childEnv *env.Env, nonce int, t
}

// testRuntime tests registry runtime subcommands.
func (sc *registryCLIImpl) testRuntime(childEnv *env.Env, cli *cli.Helpers) error {
func (sc *registryCLIImpl) testRuntime(ctx context.Context, childEnv *env.Env, cli *cli.Helpers) error {
// List runtimes.
runtimes, err := sc.listRuntimes(childEnv)
runtimes, err := sc.listRuntimes(childEnv, false)
if err != nil {
return err
}
Expand Down Expand Up @@ -674,7 +675,7 @@ func (sc *registryCLIImpl) testRuntime(childEnv *env.Env, cli *cli.Helpers) erro
}

// List runtimes.
runtimes, err = sc.listRuntimes(childEnv)
runtimes, err = sc.listRuntimes(childEnv, false)
if err != nil {
return err
}
Expand All @@ -691,17 +692,44 @@ func (sc *registryCLIImpl) testRuntime(childEnv *env.Env, cli *cli.Helpers) erro
return fmt.Errorf("runtime %s does not match the test one. registry one: %s, test one: %s", testRuntime.ID.String(), rtStr, testRuntimeStr)
}

// Wait for runtime to suspend.
if err = sc.Net.Controller().SetEpoch(ctx, 1); err != nil {
return fmt.Errorf("failed to set epoch to %d: %w", 1, err)
}

// List runtimes.
runtimes, err = sc.listRuntimes(childEnv, false)
if err != nil {
return err
}
// Make sure runtime is suspended.
if len(runtimes) != 0 {
return fmt.Errorf("wrong number of runtimes: %d, expected: %d. Runtimes: %v", len(runtimes), 1, runtimes)
}

allRuntimes, err := sc.listRuntimes(childEnv, true)
if err != nil {
return err
}
// Make sure suspended runtime is included.
if len(allRuntimes) != 1 {
return fmt.Errorf("wrong number of runtimes: %d, expected: %d. Runtimes: %v", len(runtimes), 1, runtimes)
}

return nil
}

// listRuntimes lists currently registered runtimes.
func (sc *registryCLIImpl) listRuntimes(childEnv *env.Env) (map[common.Namespace]registry.Runtime, error) {
func (sc *registryCLIImpl) listRuntimes(childEnv *env.Env, includeSuspended bool) (map[common.Namespace]registry.Runtime, error) {
sc.Logger.Info("listing all runtimes")
args := []string{
"registry", "runtime", "list",
"-v",
"--" + grpc.CfgAddress, "unix:" + sc.Net.Validators()[0].SocketPath(),
}
if includeSuspended {
args = append(args, "--"+cmdRegRuntime.CfgIncludeSuspended)
}
out, err := cli.RunSubCommandWithOutput(childEnv, sc.Logger, "list", sc.Net.Config().NodeBinary, args)
if err != nil {
return nil, fmt.Errorf("failed to list runtimes: error: %w output: %s", err, out.String())
Expand Down
8 changes: 7 additions & 1 deletion go/registry/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ type Backend interface {

// GetRuntimes returns the registered Runtimes at the specified
// block height.
GetRuntimes(context.Context, int64) ([]*Runtime, error)
GetRuntimes(context.Context, *GetRuntimesQuery) ([]*Runtime, error)

// WatchRuntimes returns a stream of Runtime. Upon subscription,
// all runtimes will be sent immediately.
Expand Down Expand Up @@ -247,6 +247,12 @@ type NamespaceQuery struct {
ID common.Namespace `json:"id"`
}

// GetRuntimesQuery is a registry get runtimes query.
type GetRuntimesQuery struct {
Height int64 `json:"height"`
IncludeSuspended bool `json:"include_suspended"`
}

// ConsensusAddressQuery is a registry query by consensus address.
// The nature and format of the consensus address depends on the specific
// consensus backend implementation used.
Expand Down
14 changes: 7 additions & 7 deletions go/registry/api/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -283,21 +283,21 @@ func handlerGetRuntimes( // nolint: golint
dec func(interface{}) error,
interceptor grpc.UnaryServerInterceptor,
) (interface{}, error) {
var height int64
if err := dec(&height); err != nil {
var query GetRuntimesQuery
if err := dec(&query); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(Backend).GetRuntimes(ctx, height)
return srv.(Backend).GetRuntimes(ctx, &query)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: methodGetRuntimes.FullName(),
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(Backend).GetRuntimes(ctx, req.(int64))
return srv.(Backend).GetRuntimes(ctx, req.(*GetRuntimesQuery))
}
return interceptor(ctx, height, info, handler)
return interceptor(ctx, &query, info, handler)
}

func handlerStateToGenesis( // nolint: golint
Expand Down Expand Up @@ -628,9 +628,9 @@ func (c *registryClient) GetRuntime(ctx context.Context, query *NamespaceQuery)
return &rsp, nil
}

func (c *registryClient) GetRuntimes(ctx context.Context, height int64) ([]*Runtime, error) {
func (c *registryClient) GetRuntimes(ctx context.Context, query *GetRuntimesQuery) ([]*Runtime, error) {
var rsp []*Runtime
if err := c.conn.Invoke(ctx, methodGetRuntimes.FullName(), height, &rsp); err != nil {
if err := c.conn.Invoke(ctx, methodGetRuntimes.FullName(), query, &rsp); err != nil {
return nil, err
}
return rsp, nil
Expand Down
14 changes: 10 additions & 4 deletions go/registry/tests/tester.go
Original file line number Diff line number Diff line change
Expand Up @@ -527,8 +527,13 @@ func testRegistryEntityNodes( // nolint: gocyclo
func testRegistryRuntime(t *testing.T, backend api.Backend, consensus consensusAPI.Backend) (common.Namespace, common.Namespace) {
require := require.New(t)

existingRuntimes, err := backend.GetRuntimes(context.Background(), consensusAPI.HeightLatest)
require.NoError(err, "GetRuntimes")
query := &api.GetRuntimesQuery{Height: consensusAPI.HeightLatest, IncludeSuspended: false}
existingRuntimes, err := backend.GetRuntimes(context.Background(), query)
require.NoError(err, "GetRuntimes(includeSuspended=false)")
query.IncludeSuspended = true
existingAllRuntimes, err := backend.GetRuntimes(context.Background(), query)
require.NoError(err, "GetRuntimes(includeSuspended=true)")
require.ElementsMatch(existingRuntimes, existingAllRuntimes, "no suspended runtimes")

// We must use the test entity for runtime registrations as registering a runtime will prevent
// the entity from being deregistered and the other node tests already use the test entity for
Expand Down Expand Up @@ -702,7 +707,7 @@ func testRegistryRuntime(t *testing.T, backend api.Backend, consensus consensusA
rtMapByName[tc.name] = rt.Runtime
}

registeredRuntimes, err := backend.GetRuntimes(context.Background(), consensusAPI.HeightLatest)
registeredRuntimes, err := backend.GetRuntimes(context.Background(), query)
require.NoError(err, "GetRuntimes")
require.Len(registeredRuntimes, len(existingRuntimes)+len(rtMap), "registry has all the new runtimes")
for _, regRuntime := range registeredRuntimes {
Expand Down Expand Up @@ -1112,7 +1117,8 @@ func (ent *TestEntity) NewTestNodes(nCompute, nStorage int, idNonce []byte, runt

// Add another Re-Registration with different address field and more runtimes.
moreRuntimes := append([]*node.Runtime(nil), runtimes...)
registeredRuntimes, err := consensus.Registry().GetRuntimes(context.Background(), consensusAPI.HeightLatest)
q := &api.GetRuntimesQuery{Height: consensusAPI.HeightLatest, IncludeSuspended: false}
registeredRuntimes, err := consensus.Registry().GetRuntimes(context.Background(), q)
if err != nil {
return nil, err
}
Expand Down