From 92dac2451abba923922269b6e0ad64ac37fe08e5 Mon Sep 17 00:00:00 2001 From: ptrus Date: Fri, 10 Jul 2020 10:19:37 +0200 Subject: [PATCH] go/registry: Add support for querying suspended runtimes --- .changelog/3093.feature.md | 6 +++ .../tendermint/apps/registry/query.go | 7 +++- go/consensus/tendermint/registry/registry.go | 9 ++--- .../cmd/debug/txsource/workload/queries.go | 13 +++++-- go/oasis-node/cmd/registry/runtime/runtime.go | 21 ++++++++-- .../scenario/e2e/registry_cli.go | 38 ++++++++++++++++--- go/registry/api/api.go | 8 +++- go/registry/api/grpc.go | 14 +++---- go/registry/tests/tester.go | 14 +++++-- 9 files changed, 99 insertions(+), 31 deletions(-) create mode 100644 .changelog/3093.feature.md diff --git a/.changelog/3093.feature.md b/.changelog/3093.feature.md new file mode 100644 index 00000000000..0cf9d6b0d71 --- /dev/null +++ b/.changelog/3093.feature.md @@ -0,0 +1,6 @@ +go/registry: Add support for querying suspended runtimes + +Registry `GetRuntimes` method now accepts a parameter to enable querying for +suspended runtimes. +`WatchRuntimes` will now always include suspended runtimes in the initial +response. diff --git a/go/consensus/tendermint/apps/registry/query.go b/go/consensus/tendermint/apps/registry/query.go index f08ae9d61e0..7374909094e 100644 --- a/go/consensus/tendermint/apps/registry/query.go +++ b/go/consensus/tendermint/apps/registry/query.go @@ -22,7 +22,7 @@ type Query interface { NodeStatus(context.Context, signature.PublicKey) (*registry.NodeStatus, error) Nodes(context.Context) ([]*node.Node, error) Runtime(context.Context, common.Namespace) (*registry.Runtime, error) - Runtimes(context.Context) ([]*registry.Runtime, error) + Runtimes(ctx context.Context, includeSuspended bool) ([]*registry.Runtime, error) Genesis(context.Context) (*registry.Genesis, error) } @@ -106,7 +106,10 @@ func (rq *registryQuerier) Runtime(ctx context.Context, id common.Namespace) (*r return rq.state.Runtime(ctx, id) } -func (rq *registryQuerier) Runtimes(ctx context.Context) ([]*registry.Runtime, error) { +func (rq *registryQuerier) Runtimes(ctx context.Context, includeSuspended bool) ([]*registry.Runtime, error) { + if includeSuspended { + return rq.state.AllRuntimes(ctx) + } return rq.state.Runtimes(ctx) } diff --git a/go/consensus/tendermint/registry/registry.go b/go/consensus/tendermint/registry/registry.go index a979bfaa4ae..af70473d8a1 100644 --- a/go/consensus/tendermint/registry/registry.go +++ b/go/consensus/tendermint/registry/registry.go @@ -146,13 +146,12 @@ func (tb *tendermintBackend) WatchRuntimes(ctx context.Context) (<-chan *api.Run func (tb *tendermintBackend) Cleanup() { } -func (tb *tendermintBackend) GetRuntimes(ctx context.Context, height int64) ([]*api.Runtime, error) { - q, err := tb.querier.QueryAt(ctx, height) +func (tb *tendermintBackend) GetRuntimes(ctx context.Context, query *api.GetRuntimesQuery) ([]*api.Runtime, error) { + q, err := tb.querier.QueryAt(ctx, query.Height) if err != nil { return nil, err } - - return q.Runtimes(ctx) + return q.Runtimes(ctx, query.IncludeSuspended) } func (tb *tendermintBackend) StateToGenesis(ctx context.Context, height int64) (*api.Genesis, error) { @@ -455,7 +454,7 @@ func New(ctx context.Context, service service.TendermintService) (api.Backend, e } tb.runtimeNotifier = pubsub.NewBrokerEx(func(ch channels.Channel) { wr := ch.In() - runtimes, err := tb.GetRuntimes(ctx, consensus.HeightLatest) + runtimes, err := tb.GetRuntimes(ctx, &api.GetRuntimesQuery{Height: consensus.HeightLatest, IncludeSuspended: true}) if err != nil { tb.logger.Error("runtime notifier: unable to get a list of runtimes", "err", err, diff --git a/go/oasis-node/cmd/debug/txsource/workload/queries.go b/go/oasis-node/cmd/debug/txsource/workload/queries.go index 958239b9c41..117ab263b2a 100644 --- a/go/oasis-node/cmd/debug/txsource/workload/queries.go +++ b/go/oasis-node/cmd/debug/txsource/workload/queries.go @@ -390,12 +390,12 @@ func (q *queries) doRegistryQueries(ctx context.Context, rng *rand.Rand, height } // Runtimes. - runtimes, err := q.registry.GetRuntimes(ctx, height) + runtimes, err := q.registry.GetRuntimes(ctx, ®istry.GetRuntimesQuery{Height: height, IncludeSuspended: false}) if err != nil { - return fmt.Errorf("GetRuntimes error at height %d: %w", height, err) + return fmt.Errorf("GetRuntimes(IncludeSuspended=false) error at height %d: %w", height, err) } if len(runtimes) == 0 { - return fmt.Errorf("GetRuntimes empty response at height %d", height) + return fmt.Errorf("GetRuntimes(IncludeSuspended=false) empty response at height %d", height) } for _, rt := range runtimes { var runtime *registry.Runtime @@ -407,6 +407,13 @@ func (q *queries) doRegistryQueries(ctx context.Context, rng *rand.Rand, height return fmt.Errorf("GetRuntime mismatch, expected: %s, got: %s", rt, runtime) } } + allRuntimes, err := q.registry.GetRuntimes(ctx, ®istry.GetRuntimesQuery{Height: height, IncludeSuspended: true}) + if err != nil { + return fmt.Errorf("GetRuntimes(IncludeSuspended=true) error at height %d: %w", height, err) + } + if len(allRuntimes) < len(runtimes) { + return fmt.Errorf("GetRuntimes(IncludeSuspended=true) returned less runtimes than IncludeSuspended=false, at height %d", height) + } // Events. _, err = q.registry.GetEvents(ctx, height) diff --git a/go/oasis-node/cmd/registry/runtime/runtime.go b/go/oasis-node/cmd/registry/runtime/runtime.go index 0cf7bf17752..0db168b690c 100644 --- a/go/oasis-node/cmd/registry/runtime/runtime.go +++ b/go/oasis-node/cmd/registry/runtime/runtime.go @@ -86,13 +86,17 @@ const ( // Staking parameters flags. CfgStakingThreshold = "runtime.staking.threshold" + // List runtimes flags. + CfgIncludeSuspended = "include_suspended" + runtimeGenesisFilename = "runtime_genesis.json" ) var ( - outputFlags = flag.NewFlagSet("", flag.ContinueOnError) - runtimeFlags = flag.NewFlagSet("", flag.ContinueOnError) - registerFlags = flag.NewFlagSet("", flag.ContinueOnError) + outputFlags = flag.NewFlagSet("", flag.ContinueOnError) + runtimeFlags = flag.NewFlagSet("", flag.ContinueOnError) + runtimeListFlags = flag.NewFlagSet("", flag.ContinueOnError) + registerFlags = flag.NewFlagSet("", flag.ContinueOnError) runtimeCmd = &cobra.Command{ Use: "runtime", @@ -208,7 +212,11 @@ func doList(cmd *cobra.Command, args []string) { conn, client := doConnect(cmd) defer conn.Close() - runtimes, err := client.GetRuntimes(context.Background(), consensus.HeightLatest) + query := ®istry.GetRuntimesQuery{ + Height: consensus.HeightLatest, + IncludeSuspended: viper.GetBool(CfgIncludeSuspended), + } + runtimes, err := client.GetRuntimes(context.Background(), query) if err != nil { logger.Error("failed to query runtimes", "err", err, @@ -517,6 +525,7 @@ func Register(parentCmd *cobra.Command) { listCmd.Flags().AddFlagSet(cmdGrpc.ClientFlags) listCmd.Flags().AddFlagSet(cmdFlags.VerboseFlags) + listCmd.Flags().AddFlagSet(runtimeListFlags) registerCmd.Flags().AddFlagSet(registerFlags) @@ -581,4 +590,8 @@ func init() { registerFlags.AddFlagSet(cmdFlags.DebugTestEntityFlags) registerFlags.AddFlagSet(cmdConsensus.TxFlags) + + // List Runtimes flags. + runtimeListFlags.Bool(CfgIncludeSuspended, false, "Use to include suspended runtimes") + _ = viper.BindPFlags(runtimeListFlags) } diff --git a/go/oasis-test-runner/scenario/e2e/registry_cli.go b/go/oasis-test-runner/scenario/e2e/registry_cli.go index dbda0f5b785..27dc2107fdf 100644 --- a/go/oasis-test-runner/scenario/e2e/registry_cli.go +++ b/go/oasis-test-runner/scenario/e2e/registry_cli.go @@ -27,6 +27,7 @@ import ( cmdSigner "github.com/oasisprotocol/oasis-core/go/oasis-node/cmd/common/signer" cmdRegEnt "github.com/oasisprotocol/oasis-core/go/oasis-node/cmd/registry/entity" cmdRegNode "github.com/oasisprotocol/oasis-core/go/oasis-node/cmd/registry/node" + cmdRegRuntime "github.com/oasisprotocol/oasis-core/go/oasis-node/cmd/registry/runtime" "github.com/oasisprotocol/oasis-core/go/oasis-test-runner/env" "github.com/oasisprotocol/oasis-core/go/oasis-test-runner/oasis" "github.com/oasisprotocol/oasis-core/go/oasis-test-runner/oasis/cli" @@ -83,7 +84,7 @@ func (sc *registryCLIImpl) Run(childEnv *env.Env) error { } // registry runtime subcommands - if err := sc.testRuntime(childEnv, cli); err != nil { + if err := sc.testRuntime(ctx, childEnv, cli); err != nil { return fmt.Errorf("error while running registry runtime test: %w", err) } @@ -589,9 +590,9 @@ func (sc *registryCLIImpl) genDeregisterEntityTx(childEnv *env.Env, nonce int, t } // testRuntime tests registry runtime subcommands. -func (sc *registryCLIImpl) testRuntime(childEnv *env.Env, cli *cli.Helpers) error { +func (sc *registryCLIImpl) testRuntime(ctx context.Context, childEnv *env.Env, cli *cli.Helpers) error { // List runtimes. - runtimes, err := sc.listRuntimes(childEnv) + runtimes, err := sc.listRuntimes(childEnv, false) if err != nil { return err } @@ -674,7 +675,7 @@ func (sc *registryCLIImpl) testRuntime(childEnv *env.Env, cli *cli.Helpers) erro } // List runtimes. - runtimes, err = sc.listRuntimes(childEnv) + runtimes, err = sc.listRuntimes(childEnv, false) if err != nil { return err } @@ -691,17 +692,44 @@ func (sc *registryCLIImpl) testRuntime(childEnv *env.Env, cli *cli.Helpers) erro return fmt.Errorf("runtime %s does not match the test one. registry one: %s, test one: %s", testRuntime.ID.String(), rtStr, testRuntimeStr) } + // Wait for runtime to suspend. + if err = sc.Net.Controller().SetEpoch(ctx, 1); err != nil { + return fmt.Errorf("failed to set epoch to %d: %w", 1, err) + } + + // List runtimes. + runtimes, err = sc.listRuntimes(childEnv, false) + if err != nil { + return err + } + // Make sure runtime is suspended. + if len(runtimes) != 0 { + return fmt.Errorf("wrong number of runtimes: %d, expected: %d. Runtimes: %v", len(runtimes), 1, runtimes) + } + + allRuntimes, err := sc.listRuntimes(childEnv, true) + if err != nil { + return err + } + // Make sure suspended runtime is included. + if len(allRuntimes) != 1 { + return fmt.Errorf("wrong number of runtimes: %d, expected: %d. Runtimes: %v", len(runtimes), 1, runtimes) + } + return nil } // listRuntimes lists currently registered runtimes. -func (sc *registryCLIImpl) listRuntimes(childEnv *env.Env) (map[common.Namespace]registry.Runtime, error) { +func (sc *registryCLIImpl) listRuntimes(childEnv *env.Env, includeSuspended bool) (map[common.Namespace]registry.Runtime, error) { sc.Logger.Info("listing all runtimes") args := []string{ "registry", "runtime", "list", "-v", "--" + grpc.CfgAddress, "unix:" + sc.Net.Validators()[0].SocketPath(), } + if includeSuspended { + args = append(args, "--"+cmdRegRuntime.CfgIncludeSuspended) + } out, err := cli.RunSubCommandWithOutput(childEnv, sc.Logger, "list", sc.Net.Config().NodeBinary, args) if err != nil { return nil, fmt.Errorf("failed to list runtimes: error: %w output: %s", err, out.String()) diff --git a/go/registry/api/api.go b/go/registry/api/api.go index af9b7c00a34..cd54b41cee3 100644 --- a/go/registry/api/api.go +++ b/go/registry/api/api.go @@ -219,7 +219,7 @@ type Backend interface { // GetRuntimes returns the registered Runtimes at the specified // block height. - GetRuntimes(context.Context, int64) ([]*Runtime, error) + GetRuntimes(context.Context, *GetRuntimesQuery) ([]*Runtime, error) // WatchRuntimes returns a stream of Runtime. Upon subscription, // all runtimes will be sent immediately. @@ -247,6 +247,12 @@ type NamespaceQuery struct { ID common.Namespace `json:"id"` } +// GetRuntimesQuery is a registry get runtimes query. +type GetRuntimesQuery struct { + Height int64 `json:"height"` + IncludeSuspended bool `json:"include_suspended"` +} + // ConsensusAddressQuery is a registry query by consensus address. // The nature and format of the consensus address depends on the specific // consensus backend implementation used. diff --git a/go/registry/api/grpc.go b/go/registry/api/grpc.go index 49eb99b5385..27971b1e67d 100644 --- a/go/registry/api/grpc.go +++ b/go/registry/api/grpc.go @@ -283,21 +283,21 @@ func handlerGetRuntimes( // nolint: golint dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor, ) (interface{}, error) { - var height int64 - if err := dec(&height); err != nil { + var query GetRuntimesQuery + if err := dec(&query); err != nil { return nil, err } if interceptor == nil { - return srv.(Backend).GetRuntimes(ctx, height) + return srv.(Backend).GetRuntimes(ctx, &query) } info := &grpc.UnaryServerInfo{ Server: srv, FullMethod: methodGetRuntimes.FullName(), } handler := func(ctx context.Context, req interface{}) (interface{}, error) { - return srv.(Backend).GetRuntimes(ctx, req.(int64)) + return srv.(Backend).GetRuntimes(ctx, req.(*GetRuntimesQuery)) } - return interceptor(ctx, height, info, handler) + return interceptor(ctx, &query, info, handler) } func handlerStateToGenesis( // nolint: golint @@ -628,9 +628,9 @@ func (c *registryClient) GetRuntime(ctx context.Context, query *NamespaceQuery) return &rsp, nil } -func (c *registryClient) GetRuntimes(ctx context.Context, height int64) ([]*Runtime, error) { +func (c *registryClient) GetRuntimes(ctx context.Context, query *GetRuntimesQuery) ([]*Runtime, error) { var rsp []*Runtime - if err := c.conn.Invoke(ctx, methodGetRuntimes.FullName(), height, &rsp); err != nil { + if err := c.conn.Invoke(ctx, methodGetRuntimes.FullName(), query, &rsp); err != nil { return nil, err } return rsp, nil diff --git a/go/registry/tests/tester.go b/go/registry/tests/tester.go index d3d19e4597e..efe7e0054ac 100644 --- a/go/registry/tests/tester.go +++ b/go/registry/tests/tester.go @@ -527,8 +527,13 @@ func testRegistryEntityNodes( // nolint: gocyclo func testRegistryRuntime(t *testing.T, backend api.Backend, consensus consensusAPI.Backend) (common.Namespace, common.Namespace) { require := require.New(t) - existingRuntimes, err := backend.GetRuntimes(context.Background(), consensusAPI.HeightLatest) - require.NoError(err, "GetRuntimes") + query := &api.GetRuntimesQuery{Height: consensusAPI.HeightLatest, IncludeSuspended: false} + existingRuntimes, err := backend.GetRuntimes(context.Background(), query) + require.NoError(err, "GetRuntimes(includeSuspended=false)") + query.IncludeSuspended = true + existingAllRuntimes, err := backend.GetRuntimes(context.Background(), query) + require.NoError(err, "GetRuntimes(includeSuspended=true)") + require.ElementsMatch(existingRuntimes, existingAllRuntimes, "no suspended runtimes") // We must use the test entity for runtime registrations as registering a runtime will prevent // the entity from being deregistered and the other node tests already use the test entity for @@ -702,7 +707,7 @@ func testRegistryRuntime(t *testing.T, backend api.Backend, consensus consensusA rtMapByName[tc.name] = rt.Runtime } - registeredRuntimes, err := backend.GetRuntimes(context.Background(), consensusAPI.HeightLatest) + registeredRuntimes, err := backend.GetRuntimes(context.Background(), query) require.NoError(err, "GetRuntimes") require.Len(registeredRuntimes, len(existingRuntimes)+len(rtMap), "registry has all the new runtimes") for _, regRuntime := range registeredRuntimes { @@ -1112,7 +1117,8 @@ func (ent *TestEntity) NewTestNodes(nCompute, nStorage int, idNonce []byte, runt // Add another Re-Registration with different address field and more runtimes. moreRuntimes := append([]*node.Runtime(nil), runtimes...) - registeredRuntimes, err := consensus.Registry().GetRuntimes(context.Background(), consensusAPI.HeightLatest) + q := &api.GetRuntimesQuery{Height: consensusAPI.HeightLatest, IncludeSuspended: false} + registeredRuntimes, err := consensus.Registry().GetRuntimes(context.Background(), q) if err != nil { return nil, err }