Skip to content

Commit

Permalink
Merge pull request #3099 from oasisprotocol/ptrus/fix/runtimes-query-…
Browse files Browse the repository at this point in the history
…suspended

go/registry: Add support for querying suspended runtimes
  • Loading branch information
ptrus authored Jul 15, 2020
2 parents 03d5d65 + 92dac24 commit 3263a4d
Show file tree
Hide file tree
Showing 9 changed files with 99 additions and 31 deletions.
6 changes: 6 additions & 0 deletions .changelog/3093.feature.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
go/registry: Add support for querying suspended runtimes

Registry `GetRuntimes` method now accepts a parameter to enable querying for
suspended runtimes.
`WatchRuntimes` will now always include suspended runtimes in the initial
response.
7 changes: 5 additions & 2 deletions go/consensus/tendermint/apps/registry/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ type Query interface {
NodeStatus(context.Context, signature.PublicKey) (*registry.NodeStatus, error)
Nodes(context.Context) ([]*node.Node, error)
Runtime(context.Context, common.Namespace) (*registry.Runtime, error)
Runtimes(context.Context) ([]*registry.Runtime, error)
Runtimes(ctx context.Context, includeSuspended bool) ([]*registry.Runtime, error)
Genesis(context.Context) (*registry.Genesis, error)
}

Expand Down Expand Up @@ -106,7 +106,10 @@ func (rq *registryQuerier) Runtime(ctx context.Context, id common.Namespace) (*r
return rq.state.Runtime(ctx, id)
}

func (rq *registryQuerier) Runtimes(ctx context.Context) ([]*registry.Runtime, error) {
func (rq *registryQuerier) Runtimes(ctx context.Context, includeSuspended bool) ([]*registry.Runtime, error) {
if includeSuspended {
return rq.state.AllRuntimes(ctx)
}
return rq.state.Runtimes(ctx)
}

Expand Down
9 changes: 4 additions & 5 deletions go/consensus/tendermint/registry/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,13 +146,12 @@ func (tb *tendermintBackend) WatchRuntimes(ctx context.Context) (<-chan *api.Run
func (tb *tendermintBackend) Cleanup() {
}

func (tb *tendermintBackend) GetRuntimes(ctx context.Context, height int64) ([]*api.Runtime, error) {
q, err := tb.querier.QueryAt(ctx, height)
func (tb *tendermintBackend) GetRuntimes(ctx context.Context, query *api.GetRuntimesQuery) ([]*api.Runtime, error) {
q, err := tb.querier.QueryAt(ctx, query.Height)
if err != nil {
return nil, err
}

return q.Runtimes(ctx)
return q.Runtimes(ctx, query.IncludeSuspended)
}

func (tb *tendermintBackend) StateToGenesis(ctx context.Context, height int64) (*api.Genesis, error) {
Expand Down Expand Up @@ -455,7 +454,7 @@ func New(ctx context.Context, service service.TendermintService) (api.Backend, e
}
tb.runtimeNotifier = pubsub.NewBrokerEx(func(ch channels.Channel) {
wr := ch.In()
runtimes, err := tb.GetRuntimes(ctx, consensus.HeightLatest)
runtimes, err := tb.GetRuntimes(ctx, &api.GetRuntimesQuery{Height: consensus.HeightLatest, IncludeSuspended: true})
if err != nil {
tb.logger.Error("runtime notifier: unable to get a list of runtimes",
"err", err,
Expand Down
13 changes: 10 additions & 3 deletions go/oasis-node/cmd/debug/txsource/workload/queries.go
Original file line number Diff line number Diff line change
Expand Up @@ -390,12 +390,12 @@ func (q *queries) doRegistryQueries(ctx context.Context, rng *rand.Rand, height
}

// Runtimes.
runtimes, err := q.registry.GetRuntimes(ctx, height)
runtimes, err := q.registry.GetRuntimes(ctx, &registry.GetRuntimesQuery{Height: height, IncludeSuspended: false})
if err != nil {
return fmt.Errorf("GetRuntimes error at height %d: %w", height, err)
return fmt.Errorf("GetRuntimes(IncludeSuspended=false) error at height %d: %w", height, err)
}
if len(runtimes) == 0 {
return fmt.Errorf("GetRuntimes empty response at height %d", height)
return fmt.Errorf("GetRuntimes(IncludeSuspended=false) empty response at height %d", height)
}
for _, rt := range runtimes {
var runtime *registry.Runtime
Expand All @@ -407,6 +407,13 @@ func (q *queries) doRegistryQueries(ctx context.Context, rng *rand.Rand, height
return fmt.Errorf("GetRuntime mismatch, expected: %s, got: %s", rt, runtime)
}
}
allRuntimes, err := q.registry.GetRuntimes(ctx, &registry.GetRuntimesQuery{Height: height, IncludeSuspended: true})
if err != nil {
return fmt.Errorf("GetRuntimes(IncludeSuspended=true) error at height %d: %w", height, err)
}
if len(allRuntimes) < len(runtimes) {
return fmt.Errorf("GetRuntimes(IncludeSuspended=true) returned less runtimes than IncludeSuspended=false, at height %d", height)
}

// Events.
_, err = q.registry.GetEvents(ctx, height)
Expand Down
21 changes: 17 additions & 4 deletions go/oasis-node/cmd/registry/runtime/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,13 +86,17 @@ const (
// Staking parameters flags.
CfgStakingThreshold = "runtime.staking.threshold"

// List runtimes flags.
CfgIncludeSuspended = "include_suspended"

runtimeGenesisFilename = "runtime_genesis.json"
)

var (
outputFlags = flag.NewFlagSet("", flag.ContinueOnError)
runtimeFlags = flag.NewFlagSet("", flag.ContinueOnError)
registerFlags = flag.NewFlagSet("", flag.ContinueOnError)
outputFlags = flag.NewFlagSet("", flag.ContinueOnError)
runtimeFlags = flag.NewFlagSet("", flag.ContinueOnError)
runtimeListFlags = flag.NewFlagSet("", flag.ContinueOnError)
registerFlags = flag.NewFlagSet("", flag.ContinueOnError)

runtimeCmd = &cobra.Command{
Use: "runtime",
Expand Down Expand Up @@ -208,7 +212,11 @@ func doList(cmd *cobra.Command, args []string) {
conn, client := doConnect(cmd)
defer conn.Close()

runtimes, err := client.GetRuntimes(context.Background(), consensus.HeightLatest)
query := &registry.GetRuntimesQuery{
Height: consensus.HeightLatest,
IncludeSuspended: viper.GetBool(CfgIncludeSuspended),
}
runtimes, err := client.GetRuntimes(context.Background(), query)
if err != nil {
logger.Error("failed to query runtimes",
"err", err,
Expand Down Expand Up @@ -517,6 +525,7 @@ func Register(parentCmd *cobra.Command) {

listCmd.Flags().AddFlagSet(cmdGrpc.ClientFlags)
listCmd.Flags().AddFlagSet(cmdFlags.VerboseFlags)
listCmd.Flags().AddFlagSet(runtimeListFlags)

registerCmd.Flags().AddFlagSet(registerFlags)

Expand Down Expand Up @@ -581,4 +590,8 @@ func init() {

registerFlags.AddFlagSet(cmdFlags.DebugTestEntityFlags)
registerFlags.AddFlagSet(cmdConsensus.TxFlags)

// List Runtimes flags.
runtimeListFlags.Bool(CfgIncludeSuspended, false, "Use to include suspended runtimes")
_ = viper.BindPFlags(runtimeListFlags)
}
38 changes: 33 additions & 5 deletions go/oasis-test-runner/scenario/e2e/registry_cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
cmdSigner "github.com/oasisprotocol/oasis-core/go/oasis-node/cmd/common/signer"
cmdRegEnt "github.com/oasisprotocol/oasis-core/go/oasis-node/cmd/registry/entity"
cmdRegNode "github.com/oasisprotocol/oasis-core/go/oasis-node/cmd/registry/node"
cmdRegRuntime "github.com/oasisprotocol/oasis-core/go/oasis-node/cmd/registry/runtime"
"github.com/oasisprotocol/oasis-core/go/oasis-test-runner/env"
"github.com/oasisprotocol/oasis-core/go/oasis-test-runner/oasis"
"github.com/oasisprotocol/oasis-core/go/oasis-test-runner/oasis/cli"
Expand Down Expand Up @@ -83,7 +84,7 @@ func (sc *registryCLIImpl) Run(childEnv *env.Env) error {
}

// registry runtime subcommands
if err := sc.testRuntime(childEnv, cli); err != nil {
if err := sc.testRuntime(ctx, childEnv, cli); err != nil {
return fmt.Errorf("error while running registry runtime test: %w", err)
}

Expand Down Expand Up @@ -589,9 +590,9 @@ func (sc *registryCLIImpl) genDeregisterEntityTx(childEnv *env.Env, nonce int, t
}

// testRuntime tests registry runtime subcommands.
func (sc *registryCLIImpl) testRuntime(childEnv *env.Env, cli *cli.Helpers) error {
func (sc *registryCLIImpl) testRuntime(ctx context.Context, childEnv *env.Env, cli *cli.Helpers) error {
// List runtimes.
runtimes, err := sc.listRuntimes(childEnv)
runtimes, err := sc.listRuntimes(childEnv, false)
if err != nil {
return err
}
Expand Down Expand Up @@ -674,7 +675,7 @@ func (sc *registryCLIImpl) testRuntime(childEnv *env.Env, cli *cli.Helpers) erro
}

// List runtimes.
runtimes, err = sc.listRuntimes(childEnv)
runtimes, err = sc.listRuntimes(childEnv, false)
if err != nil {
return err
}
Expand All @@ -691,17 +692,44 @@ func (sc *registryCLIImpl) testRuntime(childEnv *env.Env, cli *cli.Helpers) erro
return fmt.Errorf("runtime %s does not match the test one. registry one: %s, test one: %s", testRuntime.ID.String(), rtStr, testRuntimeStr)
}

// Wait for runtime to suspend.
if err = sc.Net.Controller().SetEpoch(ctx, 1); err != nil {
return fmt.Errorf("failed to set epoch to %d: %w", 1, err)
}

// List runtimes.
runtimes, err = sc.listRuntimes(childEnv, false)
if err != nil {
return err
}
// Make sure runtime is suspended.
if len(runtimes) != 0 {
return fmt.Errorf("wrong number of runtimes: %d, expected: %d. Runtimes: %v", len(runtimes), 1, runtimes)
}

allRuntimes, err := sc.listRuntimes(childEnv, true)
if err != nil {
return err
}
// Make sure suspended runtime is included.
if len(allRuntimes) != 1 {
return fmt.Errorf("wrong number of runtimes: %d, expected: %d. Runtimes: %v", len(runtimes), 1, runtimes)
}

return nil
}

// listRuntimes lists currently registered runtimes.
func (sc *registryCLIImpl) listRuntimes(childEnv *env.Env) (map[common.Namespace]registry.Runtime, error) {
func (sc *registryCLIImpl) listRuntimes(childEnv *env.Env, includeSuspended bool) (map[common.Namespace]registry.Runtime, error) {
sc.Logger.Info("listing all runtimes")
args := []string{
"registry", "runtime", "list",
"-v",
"--" + grpc.CfgAddress, "unix:" + sc.Net.Validators()[0].SocketPath(),
}
if includeSuspended {
args = append(args, "--"+cmdRegRuntime.CfgIncludeSuspended)
}
out, err := cli.RunSubCommandWithOutput(childEnv, sc.Logger, "list", sc.Net.Config().NodeBinary, args)
if err != nil {
return nil, fmt.Errorf("failed to list runtimes: error: %w output: %s", err, out.String())
Expand Down
8 changes: 7 additions & 1 deletion go/registry/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ type Backend interface {

// GetRuntimes returns the registered Runtimes at the specified
// block height.
GetRuntimes(context.Context, int64) ([]*Runtime, error)
GetRuntimes(context.Context, *GetRuntimesQuery) ([]*Runtime, error)

// WatchRuntimes returns a stream of Runtime. Upon subscription,
// all runtimes will be sent immediately.
Expand Down Expand Up @@ -247,6 +247,12 @@ type NamespaceQuery struct {
ID common.Namespace `json:"id"`
}

// GetRuntimesQuery is a registry get runtimes query.
type GetRuntimesQuery struct {
Height int64 `json:"height"`
IncludeSuspended bool `json:"include_suspended"`
}

// ConsensusAddressQuery is a registry query by consensus address.
// The nature and format of the consensus address depends on the specific
// consensus backend implementation used.
Expand Down
14 changes: 7 additions & 7 deletions go/registry/api/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -283,21 +283,21 @@ func handlerGetRuntimes( // nolint: golint
dec func(interface{}) error,
interceptor grpc.UnaryServerInterceptor,
) (interface{}, error) {
var height int64
if err := dec(&height); err != nil {
var query GetRuntimesQuery
if err := dec(&query); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(Backend).GetRuntimes(ctx, height)
return srv.(Backend).GetRuntimes(ctx, &query)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: methodGetRuntimes.FullName(),
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(Backend).GetRuntimes(ctx, req.(int64))
return srv.(Backend).GetRuntimes(ctx, req.(*GetRuntimesQuery))
}
return interceptor(ctx, height, info, handler)
return interceptor(ctx, &query, info, handler)
}

func handlerStateToGenesis( // nolint: golint
Expand Down Expand Up @@ -628,9 +628,9 @@ func (c *registryClient) GetRuntime(ctx context.Context, query *NamespaceQuery)
return &rsp, nil
}

func (c *registryClient) GetRuntimes(ctx context.Context, height int64) ([]*Runtime, error) {
func (c *registryClient) GetRuntimes(ctx context.Context, query *GetRuntimesQuery) ([]*Runtime, error) {
var rsp []*Runtime
if err := c.conn.Invoke(ctx, methodGetRuntimes.FullName(), height, &rsp); err != nil {
if err := c.conn.Invoke(ctx, methodGetRuntimes.FullName(), query, &rsp); err != nil {
return nil, err
}
return rsp, nil
Expand Down
14 changes: 10 additions & 4 deletions go/registry/tests/tester.go
Original file line number Diff line number Diff line change
Expand Up @@ -527,8 +527,13 @@ func testRegistryEntityNodes( // nolint: gocyclo
func testRegistryRuntime(t *testing.T, backend api.Backend, consensus consensusAPI.Backend) (common.Namespace, common.Namespace) {
require := require.New(t)

existingRuntimes, err := backend.GetRuntimes(context.Background(), consensusAPI.HeightLatest)
require.NoError(err, "GetRuntimes")
query := &api.GetRuntimesQuery{Height: consensusAPI.HeightLatest, IncludeSuspended: false}
existingRuntimes, err := backend.GetRuntimes(context.Background(), query)
require.NoError(err, "GetRuntimes(includeSuspended=false)")
query.IncludeSuspended = true
existingAllRuntimes, err := backend.GetRuntimes(context.Background(), query)
require.NoError(err, "GetRuntimes(includeSuspended=true)")
require.ElementsMatch(existingRuntimes, existingAllRuntimes, "no suspended runtimes")

// We must use the test entity for runtime registrations as registering a runtime will prevent
// the entity from being deregistered and the other node tests already use the test entity for
Expand Down Expand Up @@ -702,7 +707,7 @@ func testRegistryRuntime(t *testing.T, backend api.Backend, consensus consensusA
rtMapByName[tc.name] = rt.Runtime
}

registeredRuntimes, err := backend.GetRuntimes(context.Background(), consensusAPI.HeightLatest)
registeredRuntimes, err := backend.GetRuntimes(context.Background(), query)
require.NoError(err, "GetRuntimes")
require.Len(registeredRuntimes, len(existingRuntimes)+len(rtMap), "registry has all the new runtimes")
for _, regRuntime := range registeredRuntimes {
Expand Down Expand Up @@ -1112,7 +1117,8 @@ func (ent *TestEntity) NewTestNodes(nCompute, nStorage int, idNonce []byte, runt

// Add another Re-Registration with different address field and more runtimes.
moreRuntimes := append([]*node.Runtime(nil), runtimes...)
registeredRuntimes, err := consensus.Registry().GetRuntimes(context.Background(), consensusAPI.HeightLatest)
q := &api.GetRuntimesQuery{Height: consensusAPI.HeightLatest, IncludeSuspended: false}
registeredRuntimes, err := consensus.Registry().GetRuntimes(context.Background(), q)
if err != nil {
return nil, err
}
Expand Down

0 comments on commit 3263a4d

Please sign in to comment.