diff --git a/.changelog/2961.feature.md b/.changelog/2961.feature.md new file mode 100644 index 00000000000..bc461515256 --- /dev/null +++ b/.changelog/2961.feature.md @@ -0,0 +1,3 @@ +go/registry/api: Add `GetNodeByConsensusID` method + +`GetNodeByConsensusID` can be used to query nodes by their Consensus ID. diff --git a/go/consensus/tendermint/apps/registry/query.go b/go/consensus/tendermint/apps/registry/query.go index 94eb5e5d707..ef0f49b79df 100644 --- a/go/consensus/tendermint/apps/registry/query.go +++ b/go/consensus/tendermint/apps/registry/query.go @@ -10,6 +10,7 @@ import ( "github.com/oasisprotocol/oasis-core/go/common/node" abciAPI "github.com/oasisprotocol/oasis-core/go/consensus/tendermint/api" registryState "github.com/oasisprotocol/oasis-core/go/consensus/tendermint/apps/registry/state" + tmcrypto "github.com/oasisprotocol/oasis-core/go/consensus/tendermint/crypto" registry "github.com/oasisprotocol/oasis-core/go/registry/api" ) @@ -18,6 +19,7 @@ type Query interface { Entity(context.Context, signature.PublicKey) (*entity.Entity, error) Entities(context.Context) ([]*entity.Entity, error) Node(context.Context, signature.PublicKey) (*node.Node, error) + NodeByConsensusID(context.Context, signature.PublicKey) (*node.Node, error) NodeStatus(context.Context, signature.PublicKey) (*registry.NodeStatus, error) Nodes(context.Context) ([]*node.Node, error) Runtime(context.Context, common.Namespace) (*registry.Runtime, error) @@ -71,6 +73,10 @@ func (rq *registryQuerier) Node(ctx context.Context, id signature.PublicKey) (*n return node, nil } +func (rq *registryQuerier) NodeByConsensusID(ctx context.Context, id signature.PublicKey) (*node.Node, error) { + return rq.state.NodeByConsensusAddress(ctx, []byte(tmcrypto.PublicKeyToTendermint(&id).Address())) +} + func (rq *registryQuerier) NodeStatus(ctx context.Context, id signature.PublicKey) (*registry.NodeStatus, error) { return rq.state.NodeStatus(ctx, id) } diff --git a/go/consensus/tendermint/registry/registry.go b/go/consensus/tendermint/registry/registry.go index 1674e5019f9..d53f825f27e 100644 --- a/go/consensus/tendermint/registry/registry.go +++ b/go/consensus/tendermint/registry/registry.go @@ -94,6 +94,15 @@ func (tb *tendermintBackend) GetNodes(ctx context.Context, height int64) ([]*nod return q.Nodes(ctx) } +func (tb *tendermintBackend) GetNodeByConsensusID(ctx context.Context, query *api.IDQuery) (*node.Node, error) { + q, err := tb.querier.QueryAt(ctx, query.Height) + if err != nil { + return nil, err + } + + return q.NodeByConsensusID(ctx, query.ID) +} + func (tb *tendermintBackend) WatchNodes(ctx context.Context) (<-chan *api.NodeEvent, pubsub.ClosableSubscription, error) { typedCh := make(chan *api.NodeEvent) sub := tb.nodeNotifier.Subscribe() diff --git a/go/oasis-node/cmd/debug/txsource/workload/queries.go b/go/oasis-node/cmd/debug/txsource/workload/queries.go index fcad946dafb..84ba435dc49 100644 --- a/go/oasis-node/cmd/debug/txsource/workload/queries.go +++ b/go/oasis-node/cmd/debug/txsource/workload/queries.go @@ -213,6 +213,16 @@ func (q *queries) doRegistryQueries(ctx context.Context, rng *rand.Rand, height if err != nil { return fmt.Errorf("GetNodeStatus error at height %d: %w", height, err) } + node, err = q.registry.GetNodeByConsensusID( + ctx, + ®istry.IDQuery{ID: nod.Consensus.ID, Height: height}, + ) + if err != nil { + return fmt.Errorf("GetNodeByConsensusID error at height %d: %w", height, err) + } + if !nod.ID.Equal(node.ID) { + return fmt.Errorf("GetNodeByConsensusID mismatch, expected: %s, got: %s", nod, node) + } } // Runtimes. diff --git a/go/registry/api/api.go b/go/registry/api/api.go index faf8fe1793e..84cb1021ffa 100644 --- a/go/registry/api/api.go +++ b/go/registry/api/api.go @@ -196,6 +196,10 @@ type Backend interface { // GetNodes gets a list of all registered nodes. GetNodes(context.Context, int64) ([]*node.Node, error) + // GetNodeByConsensusID returns the Node by Consensus address at the + // specified block height. + GetNodeByConsensusID(context.Context, *IDQuery) (*node.Node, error) + // WatchNodes returns a channel that produces a stream of // NodeEvent on node registration changes. WatchNodes(context.Context) (<-chan *NodeEvent, pubsub.ClosableSubscription, error) @@ -204,7 +208,7 @@ type Backend interface { // Upon subscription, the node list for the current epoch will be sent // immediately if available. // - // Each node list will be sorted by node ID in lexographically ascending + // Each node list will be sorted by node ID in lexicographically ascending // order. WatchNodeList(context.Context) (<-chan *NodeList, pubsub.ClosableSubscription, error) diff --git a/go/registry/api/grpc.go b/go/registry/api/grpc.go index b6e1a26e6b7..5f1dcb44722 100644 --- a/go/registry/api/grpc.go +++ b/go/registry/api/grpc.go @@ -21,6 +21,8 @@ var ( methodGetEntities = serviceName.NewMethod("GetEntities", int64(0)) // methodGetNode is the GetNode method. methodGetNode = serviceName.NewMethod("GetNode", IDQuery{}) + // methodGetNodeByConsensusID is the GetNodeByConsensusID method. + methodGetNodeByConsensusID = serviceName.NewMethod("GetNodeByConsensusID", IDQuery{}) // methodGetNodeStatus is the GetNodeStatus method. methodGetNodeStatus = serviceName.NewMethod("GetNodeStatus", IDQuery{}) // methodGetNodes is the GetNodes method. @@ -60,6 +62,10 @@ var ( MethodName: methodGetNode.ShortName(), Handler: handlerGetNode, }, + { + MethodName: methodGetNodeByConsensusID.ShortName(), + Handler: handlerGetNodeByConsensusID, + }, { MethodName: methodGetNodeStatus.ShortName(), Handler: handlerGetNodeStatus, @@ -179,6 +185,29 @@ func handlerGetNode( // nolint: golint return interceptor(ctx, &query, info, handler) } +func handlerGetNodeByConsensusID( // nolint: golint + srv interface{}, + ctx context.Context, + dec func(interface{}) error, + interceptor grpc.UnaryServerInterceptor, +) (interface{}, error) { + var query IDQuery + if err := dec(&query); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(Backend).GetNodeByConsensusID(ctx, &query) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: methodGetNodeByConsensusID.FullName(), + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(Backend).GetNodeByConsensusID(ctx, req.(*IDQuery)) + } + return interceptor(ctx, &query, info, handler) +} + func handlerGetNodeStatus( // nolint: golint srv interface{}, ctx context.Context, @@ -497,6 +526,14 @@ func (c *registryClient) GetNode(ctx context.Context, query *IDQuery) (*node.Nod return &rsp, nil } +func (c *registryClient) GetNodeByConsensusID(ctx context.Context, query *IDQuery) (*node.Node, error) { + var rsp node.Node + if err := c.conn.Invoke(ctx, methodGetNodeByConsensusID.FullName(), query, &rsp); err != nil { + return nil, err + } + return &rsp, nil +} + func (c *registryClient) GetNodeStatus(ctx context.Context, query *IDQuery) (*NodeStatus, error) { var rsp NodeStatus if err := c.conn.Invoke(ctx, methodGetNodeStatus.FullName(), query, &rsp); err != nil { diff --git a/go/registry/tests/tester.go b/go/registry/tests/tester.go index 8b5419d5fd2..93ff18f70ae 100644 --- a/go/registry/tests/tester.go +++ b/go/registry/tests/tester.go @@ -60,18 +60,20 @@ func testRegistryEntityNodes( // nolint: gocyclo runtimeID common.Namespace, runtimeEWID common.Namespace, ) { + ctx := context.Background() + // Generate the entities used for the test cases. entities, err := NewTestEntities(entityNodeSeed, 3) require.NoError(t, err, "NewTestEntities") timeSource := consensus.EpochTime().(epochtime.SetableBackend) - epoch, err := timeSource.GetEpoch(context.Background(), consensusAPI.HeightLatest) + epoch, err := timeSource.GetEpoch(ctx, consensusAPI.HeightLatest) require.NoError(t, err, "GetEpoch") // All of these tests are combined because the Entity and Node structures // are linked togehter. - entityCh, entitySub, err := backend.WatchEntities(context.Background()) + entityCh, entitySub, err := backend.WatchEntities(ctx) require.NoError(t, err, "WatchEntities") defer entitySub.Close() @@ -94,7 +96,7 @@ func testRegistryEntityNodes( // nolint: gocyclo require.True(ev.IsRegistration, "event is registration") // Make sure that GetEvents also returns the registration event. - evts, grr := backend.GetEvents(context.Background(), consensusAPI.HeightLatest) + evts, grr := backend.GetEvents(ctx, consensusAPI.HeightLatest) require.NoError(grr, "GetEvents") var gotIt bool for _, evt := range evts { @@ -113,13 +115,13 @@ func testRegistryEntityNodes( // nolint: gocyclo for _, v := range entities { var ent *entity.Entity - ent, err = backend.GetEntity(context.Background(), &api.IDQuery{ID: v.Entity.ID, Height: consensusAPI.HeightLatest}) + ent, err = backend.GetEntity(ctx, &api.IDQuery{ID: v.Entity.ID, Height: consensusAPI.HeightLatest}) require.NoError(err, "GetEntity") require.EqualValues(v.Entity, ent, "retrieved entity") } var registeredEntities []*entity.Entity - registeredEntities, err = backend.GetEntities(context.Background(), consensusAPI.HeightLatest) + registeredEntities, err = backend.GetEntities(ctx, consensusAPI.HeightLatest) require.NoError(err, "GetEntities") // NOTE: The test entity is alway present as it controls a runtime and cannot be removed. testEntity, _, _ := entity.TestEntity() @@ -166,7 +168,7 @@ func testRegistryEntityNodes( // nolint: gocyclo nonWhitelistedNodes, err := entities[0].NewTestNodes(1, 1, []byte("nonWhitelistedNodes"), nodeRuntimesEW, epoch+2) require.NoError(t, err, "NewTestNodes non-whitelisted") - nodeCh, nodeSub, err := backend.WatchNodes(context.Background()) + nodeCh, nodeSub, err := backend.WatchNodes(ctx) require.NoError(t, err, "WatchNodes") defer nodeSub.Close() @@ -190,7 +192,7 @@ func testRegistryEntityNodes( // nolint: gocyclo require.True(ev.IsRegistration, "event is registration") // Make sure that GetEvents also returns the registration event. - evts, grr := backend.GetEvents(context.Background(), consensusAPI.HeightLatest) + evts, grr := backend.GetEvents(ctx, consensusAPI.HeightLatest) require.NoError(grr, "GetEvents") var gotIt bool for _, evt := range evts { @@ -207,10 +209,15 @@ func testRegistryEntityNodes( // nolint: gocyclo } var nod *node.Node - nod, err = backend.GetNode(context.Background(), &api.IDQuery{ID: tn.Node.ID, Height: consensusAPI.HeightLatest}) + nod, err = backend.GetNode(ctx, &api.IDQuery{ID: tn.Node.ID, Height: consensusAPI.HeightLatest}) require.NoError(err, "GetNode") require.EqualValues(tn.Node, nod, "retrieved node") + var nodeByConsensus *node.Node + nodeByConsensus, err = backend.GetNodeByConsensusID(ctx, &api.IDQuery{ID: tn.Node.Consensus.ID, Height: consensusAPI.HeightLatest}) + require.NoError(err, "GetNodeByConsensusID") + require.EqualValues(tn.Node, nodeByConsensus, "retrieved node by Consensus ID") + for _, v := range tn.invalidAfter { err = tn.Register(consensus, v.signed) require.Error(err, v.descr) @@ -274,7 +281,7 @@ func testRegistryEntityNodes( // nolint: gocyclo expectedNodeList := getExpectedNodeList() epoch = epochtimeTests.MustAdvanceEpoch(t, timeSource, 1) - registeredNodes, nerr := backend.GetNodes(context.Background(), consensusAPI.HeightLatest) + registeredNodes, nerr := backend.GetNodes(ctx, consensusAPI.HeightLatest) require.NoError(nerr, "GetNodes") require.EqualValues(expectedNodeList, registeredNodes, "node list") }) @@ -285,9 +292,6 @@ func testRegistryEntityNodes( // nolint: gocyclo entity := entities[0] node := nodes[0][0] - ctx, cancel := context.WithTimeout(context.Background(), recvTimeout) - defer cancel() - // Get node status. var nodeStatus *api.NodeStatus nodeStatus, err = backend.GetNodeStatus(ctx, &api.IDQuery{ID: node.Node.ID, Height: consensusAPI.HeightLatest}) @@ -299,7 +303,7 @@ func testRegistryEntityNodes( // nolint: gocyclo tx := api.NewUnfreezeNodeTx(0, nil, &api.UnfreezeNode{ NodeID: node.Node.ID, }) - err = consensusAPI.SignAndSubmitTx(context.Background(), consensus, entity.Signer, tx) + err = consensusAPI.SignAndSubmitTx(ctx, consensus, entity.Signer, tx) require.NoError(err, "UnfreezeNode") // Try to unfreeze an invalid node (should fail). @@ -308,7 +312,7 @@ func testRegistryEntityNodes( // nolint: gocyclo err = unfreeze.NodeID.UnmarshalHex("ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff") require.NoError(err, "UnmarshalHex") tx = api.NewUnfreezeNodeTx(0, nil, &unfreeze) - err = consensusAPI.SignAndSubmitTx(context.Background(), consensus, entity.Signer, tx) + err = consensusAPI.SignAndSubmitTx(ctx, consensus, entity.Signer, tx) require.Error(err, "UnfreezeNode (with invalid node)") require.Equal(err, api.ErrNoSuchNode) @@ -317,7 +321,7 @@ func testRegistryEntityNodes( // nolint: gocyclo tx = api.NewUnfreezeNodeTx(0, nil, &api.UnfreezeNode{ NodeID: node.Node.ID, }) - err = consensusAPI.SignAndSubmitTx(context.Background(), consensus, node.Signer, tx) + err = consensusAPI.SignAndSubmitTx(ctx, consensus, node.Signer, tx) require.Error(err, "UnfreezeNode (with invalid signer)") require.Equal(err, api.ErrBadEntityForNode) }) @@ -339,7 +343,7 @@ func testRegistryEntityNodes( // nolint: gocyclo deregisteredNodes[ev.Node.ID] = ev.Node // Make sure that GetEvents also returns the deregistration event. - evts, grr := backend.GetEvents(context.Background(), consensusAPI.HeightLatest) + evts, grr := backend.GetEvents(ctx, consensusAPI.HeightLatest) require.NoError(grr, "GetEvents") var gotIt bool for _, evt := range evts { @@ -371,7 +375,7 @@ func testRegistryEntityNodes( // nolint: gocyclo // Ensure the node list doesn't have the expired nodes. expectedNodeList := getExpectedNodeList() - registeredNodes, nerr := backend.GetNodes(context.Background(), consensusAPI.HeightLatest) + registeredNodes, nerr := backend.GetNodes(ctx, consensusAPI.HeightLatest) require.NoError(nerr, "GetNodes") require.EqualValues(expectedNodeList, registeredNodes, "node list") @@ -407,7 +411,7 @@ func testRegistryEntityNodes( // nolint: gocyclo require.False(ev.IsRegistration, "event is deregistration") // Make sure that GetEvents also returns the deregistration event. - evts, err := backend.GetEvents(context.Background(), consensusAPI.HeightLatest) + evts, err := backend.GetEvents(ctx, consensusAPI.HeightLatest) require.NoError(err, "GetEvents") var gotIt bool for _, evt := range evts { @@ -444,7 +448,7 @@ func testRegistryEntityNodes( // nolint: gocyclo require.False(ev.IsRegistration, "event is deregistration") // Make sure that GetEvents also returns the deregistration event. - evts, err := backend.GetEvents(context.Background(), consensusAPI.HeightLatest) + evts, err := backend.GetEvents(ctx, consensusAPI.HeightLatest) require.NoError(err, "GetEvents") var gotIt bool for _, evt := range evts { @@ -463,7 +467,7 @@ func testRegistryEntityNodes( // nolint: gocyclo // There should be no more entities. for _, v := range entities { - _, err := backend.GetEntity(context.Background(), &api.IDQuery{ID: v.Entity.ID, Height: consensusAPI.HeightLatest}) + _, err := backend.GetEntity(ctx, &api.IDQuery{ID: v.Entity.ID, Height: consensusAPI.HeightLatest}) require.Equal(api.ErrNoSuchEntity, err, "GetEntity") } }) diff --git a/go/worker/common/p2p/peermgmt.go b/go/worker/common/p2p/peermgmt.go index 262c54137ea..f7f5d978f26 100644 --- a/go/worker/common/p2p/peermgmt.go +++ b/go/worker/common/p2p/peermgmt.go @@ -144,7 +144,7 @@ func (mgr *PeerManager) watchRegistryNodes(consensus consensus.Backend) { nodeCh, nSub, err := consensus.Registry().WatchNodes(mgr.ctx) if err != nil { - mgr.logger.Error("failed to watch registery for node changes", + mgr.logger.Error("failed to watch registry for node changes", "err", err, ) return