diff --git a/.changelog/2961.feature.md b/.changelog/2961.feature.md new file mode 100644 index 00000000000..741cdbd10b9 --- /dev/null +++ b/.changelog/2961.feature.md @@ -0,0 +1,4 @@ +go/registry/api: Add `GetNodeByConsensusAddress` method + +`GetNodeByConsensusAddress` can be used to query nodes by their Consensus +address. diff --git a/go/consensus/tendermint/apps/registry/query.go b/go/consensus/tendermint/apps/registry/query.go index 94eb5e5d707..f08ae9d61e0 100644 --- a/go/consensus/tendermint/apps/registry/query.go +++ b/go/consensus/tendermint/apps/registry/query.go @@ -18,6 +18,7 @@ type Query interface { Entity(context.Context, signature.PublicKey) (*entity.Entity, error) Entities(context.Context) ([]*entity.Entity, error) Node(context.Context, signature.PublicKey) (*node.Node, error) + NodeByConsensusAddress(context.Context, []byte) (*node.Node, error) NodeStatus(context.Context, signature.PublicKey) (*registry.NodeStatus, error) Nodes(context.Context) ([]*node.Node, error) Runtime(context.Context, common.Namespace) (*registry.Runtime, error) @@ -71,6 +72,10 @@ func (rq *registryQuerier) Node(ctx context.Context, id signature.PublicKey) (*n return node, nil } +func (rq *registryQuerier) NodeByConsensusAddress(ctx context.Context, address []byte) (*node.Node, error) { + return rq.state.NodeByConsensusAddress(ctx, address) +} + func (rq *registryQuerier) NodeStatus(ctx context.Context, id signature.PublicKey) (*registry.NodeStatus, error) { return rq.state.NodeStatus(ctx, id) } diff --git a/go/consensus/tendermint/registry/registry.go b/go/consensus/tendermint/registry/registry.go index 1674e5019f9..1ed2ec5ebf6 100644 --- a/go/consensus/tendermint/registry/registry.go +++ b/go/consensus/tendermint/registry/registry.go @@ -94,6 +94,15 @@ func (tb *tendermintBackend) GetNodes(ctx context.Context, height int64) ([]*nod return q.Nodes(ctx) } +func (tb *tendermintBackend) GetNodeByConsensusAddress(ctx context.Context, query *api.ConsensusAddressQuery) (*node.Node, error) { + q, err := tb.querier.QueryAt(ctx, query.Height) + if err != nil { + return nil, err + } + + return q.NodeByConsensusAddress(ctx, query.Address) +} + func (tb *tendermintBackend) WatchNodes(ctx context.Context) (<-chan *api.NodeEvent, pubsub.ClosableSubscription, error) { typedCh := make(chan *api.NodeEvent) sub := tb.nodeNotifier.Subscribe() diff --git a/go/oasis-node/cmd/debug/txsource/workload/queries.go b/go/oasis-node/cmd/debug/txsource/workload/queries.go index fcad946dafb..e8fab5a48c5 100644 --- a/go/oasis-node/cmd/debug/txsource/workload/queries.go +++ b/go/oasis-node/cmd/debug/txsource/workload/queries.go @@ -17,6 +17,7 @@ import ( "github.com/oasisprotocol/oasis-core/go/common/node" "github.com/oasisprotocol/oasis-core/go/common/quantity" consensus "github.com/oasisprotocol/oasis-core/go/consensus/api" + tmcrypto "github.com/oasisprotocol/oasis-core/go/consensus/tendermint/crypto" control "github.com/oasisprotocol/oasis-core/go/control/api" epochtime "github.com/oasisprotocol/oasis-core/go/epochtime/api" registry "github.com/oasisprotocol/oasis-core/go/registry/api" @@ -213,6 +214,17 @@ func (q *queries) doRegistryQueries(ctx context.Context, rng *rand.Rand, height if err != nil { return fmt.Errorf("GetNodeStatus error at height %d: %w", height, err) } + node, err = q.registry.GetNodeByConsensusAddress( + ctx, + ®istry.ConsensusAddressQuery{ + Address: []byte(tmcrypto.PublicKeyToTendermint(&nod.Consensus.ID).Address()), Height: height}, + ) + if err != nil { + return fmt.Errorf("GetNodeByConsensusAddress error at height %d: %w", height, err) + } + if !nod.ID.Equal(node.ID) { + return fmt.Errorf("GetNodeByConsensusAddress mismatch, expected: %s, got: %s", nod, node) + } } // Runtimes. diff --git a/go/registry/api/api.go b/go/registry/api/api.go index faf8fe1793e..3fba22ef257 100644 --- a/go/registry/api/api.go +++ b/go/registry/api/api.go @@ -196,6 +196,10 @@ type Backend interface { // GetNodes gets a list of all registered nodes. GetNodes(context.Context, int64) ([]*node.Node, error) + // GetNodeByConsensusAddress returns the Node by Consensus address at the + // specified block height. + GetNodeByConsensusAddress(context.Context, *ConsensusAddressQuery) (*node.Node, error) + // WatchNodes returns a channel that produces a stream of // NodeEvent on node registration changes. WatchNodes(context.Context) (<-chan *NodeEvent, pubsub.ClosableSubscription, error) @@ -204,7 +208,7 @@ type Backend interface { // Upon subscription, the node list for the current epoch will be sent // immediately if available. // - // Each node list will be sorted by node ID in lexographically ascending + // Each node list will be sorted by node ID in lexicographically ascending // order. WatchNodeList(context.Context) (<-chan *NodeList, pubsub.ClosableSubscription, error) @@ -241,6 +245,14 @@ type NamespaceQuery struct { ID common.Namespace `json:"id"` } +// ConsensusAddressQuery is a registry query by consensus address. +// Address is a passed as binary blob and it depends on the actual consensus +// implementation. +type ConsensusAddressQuery struct { + Height int64 `json:"height"` + Address []byte `json:"address"` +} + // NewRegisterEntityTx creates a new register entity transaction. func NewRegisterEntityTx(nonce uint64, fee *transaction.Fee, sigEnt *entity.SignedEntity) *transaction.Transaction { return transaction.NewTransaction(nonce, fee, MethodRegisterEntity, sigEnt) diff --git a/go/registry/api/grpc.go b/go/registry/api/grpc.go index b6e1a26e6b7..7f830ddbc78 100644 --- a/go/registry/api/grpc.go +++ b/go/registry/api/grpc.go @@ -21,6 +21,8 @@ var ( methodGetEntities = serviceName.NewMethod("GetEntities", int64(0)) // methodGetNode is the GetNode method. methodGetNode = serviceName.NewMethod("GetNode", IDQuery{}) + // methodGetNodeByConsensusAddress is the GetNodeByConsensusAddress method. + methodGetNodeByConsensusAddress = serviceName.NewMethod("GetNodeByConsensusAddress", ConsensusAddressQuery{}) // methodGetNodeStatus is the GetNodeStatus method. methodGetNodeStatus = serviceName.NewMethod("GetNodeStatus", IDQuery{}) // methodGetNodes is the GetNodes method. @@ -60,6 +62,10 @@ var ( MethodName: methodGetNode.ShortName(), Handler: handlerGetNode, }, + { + MethodName: methodGetNodeByConsensusAddress.ShortName(), + Handler: handlerGetNodeByConsensusAddress, + }, { MethodName: methodGetNodeStatus.ShortName(), Handler: handlerGetNodeStatus, @@ -179,6 +185,29 @@ func handlerGetNode( // nolint: golint return interceptor(ctx, &query, info, handler) } +func handlerGetNodeByConsensusAddress( // nolint: golint + srv interface{}, + ctx context.Context, + dec func(interface{}) error, + interceptor grpc.UnaryServerInterceptor, +) (interface{}, error) { + var query ConsensusAddressQuery + if err := dec(&query); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(Backend).GetNodeByConsensusAddress(ctx, &query) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: methodGetNodeByConsensusAddress.FullName(), + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(Backend).GetNodeByConsensusAddress(ctx, req.(*ConsensusAddressQuery)) + } + return interceptor(ctx, &query, info, handler) +} + func handlerGetNodeStatus( // nolint: golint srv interface{}, ctx context.Context, @@ -497,6 +526,14 @@ func (c *registryClient) GetNode(ctx context.Context, query *IDQuery) (*node.Nod return &rsp, nil } +func (c *registryClient) GetNodeByConsensusAddress(ctx context.Context, query *ConsensusAddressQuery) (*node.Node, error) { + var rsp node.Node + if err := c.conn.Invoke(ctx, methodGetNodeByConsensusAddress.FullName(), query, &rsp); err != nil { + return nil, err + } + return &rsp, nil +} + func (c *registryClient) GetNodeStatus(ctx context.Context, query *IDQuery) (*NodeStatus, error) { var rsp NodeStatus if err := c.conn.Invoke(ctx, methodGetNodeStatus.FullName(), query, &rsp); err != nil { diff --git a/go/registry/tests/tester.go b/go/registry/tests/tester.go index 8b5419d5fd2..d6fdb04c815 100644 --- a/go/registry/tests/tester.go +++ b/go/registry/tests/tester.go @@ -21,6 +21,7 @@ import ( "github.com/oasisprotocol/oasis-core/go/common/node" "github.com/oasisprotocol/oasis-core/go/common/quantity" consensusAPI "github.com/oasisprotocol/oasis-core/go/consensus/api" + tmcrypto "github.com/oasisprotocol/oasis-core/go/consensus/tendermint/crypto" epochtime "github.com/oasisprotocol/oasis-core/go/epochtime/api" epochtimeTests "github.com/oasisprotocol/oasis-core/go/epochtime/tests" "github.com/oasisprotocol/oasis-core/go/registry/api" @@ -60,18 +61,20 @@ func testRegistryEntityNodes( // nolint: gocyclo runtimeID common.Namespace, runtimeEWID common.Namespace, ) { + ctx := context.Background() + // Generate the entities used for the test cases. entities, err := NewTestEntities(entityNodeSeed, 3) require.NoError(t, err, "NewTestEntities") timeSource := consensus.EpochTime().(epochtime.SetableBackend) - epoch, err := timeSource.GetEpoch(context.Background(), consensusAPI.HeightLatest) + epoch, err := timeSource.GetEpoch(ctx, consensusAPI.HeightLatest) require.NoError(t, err, "GetEpoch") // All of these tests are combined because the Entity and Node structures // are linked togehter. - entityCh, entitySub, err := backend.WatchEntities(context.Background()) + entityCh, entitySub, err := backend.WatchEntities(ctx) require.NoError(t, err, "WatchEntities") defer entitySub.Close() @@ -94,7 +97,7 @@ func testRegistryEntityNodes( // nolint: gocyclo require.True(ev.IsRegistration, "event is registration") // Make sure that GetEvents also returns the registration event. - evts, grr := backend.GetEvents(context.Background(), consensusAPI.HeightLatest) + evts, grr := backend.GetEvents(ctx, consensusAPI.HeightLatest) require.NoError(grr, "GetEvents") var gotIt bool for _, evt := range evts { @@ -113,13 +116,13 @@ func testRegistryEntityNodes( // nolint: gocyclo for _, v := range entities { var ent *entity.Entity - ent, err = backend.GetEntity(context.Background(), &api.IDQuery{ID: v.Entity.ID, Height: consensusAPI.HeightLatest}) + ent, err = backend.GetEntity(ctx, &api.IDQuery{ID: v.Entity.ID, Height: consensusAPI.HeightLatest}) require.NoError(err, "GetEntity") require.EqualValues(v.Entity, ent, "retrieved entity") } var registeredEntities []*entity.Entity - registeredEntities, err = backend.GetEntities(context.Background(), consensusAPI.HeightLatest) + registeredEntities, err = backend.GetEntities(ctx, consensusAPI.HeightLatest) require.NoError(err, "GetEntities") // NOTE: The test entity is alway present as it controls a runtime and cannot be removed. testEntity, _, _ := entity.TestEntity() @@ -166,7 +169,7 @@ func testRegistryEntityNodes( // nolint: gocyclo nonWhitelistedNodes, err := entities[0].NewTestNodes(1, 1, []byte("nonWhitelistedNodes"), nodeRuntimesEW, epoch+2) require.NoError(t, err, "NewTestNodes non-whitelisted") - nodeCh, nodeSub, err := backend.WatchNodes(context.Background()) + nodeCh, nodeSub, err := backend.WatchNodes(ctx) require.NoError(t, err, "WatchNodes") defer nodeSub.Close() @@ -190,7 +193,7 @@ func testRegistryEntityNodes( // nolint: gocyclo require.True(ev.IsRegistration, "event is registration") // Make sure that GetEvents also returns the registration event. - evts, grr := backend.GetEvents(context.Background(), consensusAPI.HeightLatest) + evts, grr := backend.GetEvents(ctx, consensusAPI.HeightLatest) require.NoError(grr, "GetEvents") var gotIt bool for _, evt := range evts { @@ -207,10 +210,20 @@ func testRegistryEntityNodes( // nolint: gocyclo } var nod *node.Node - nod, err = backend.GetNode(context.Background(), &api.IDQuery{ID: tn.Node.ID, Height: consensusAPI.HeightLatest}) + nod, err = backend.GetNode(ctx, &api.IDQuery{ID: tn.Node.ID, Height: consensusAPI.HeightLatest}) require.NoError(err, "GetNode") require.EqualValues(tn.Node, nod, "retrieved node") + var nodeByConsensus *node.Node + nodeByConsensus, err = backend.GetNodeByConsensusAddress( + ctx, + &api.ConsensusAddressQuery{ + Address: []byte(tmcrypto.PublicKeyToTendermint(&tn.Node.Consensus.ID).Address()), + Height: consensusAPI.HeightLatest}, + ) + require.NoError(err, "GetNodeByConsensusAddress") + require.EqualValues(tn.Node, nodeByConsensus, "retrieved node by Consensus Address") + for _, v := range tn.invalidAfter { err = tn.Register(consensus, v.signed) require.Error(err, v.descr) @@ -274,7 +287,7 @@ func testRegistryEntityNodes( // nolint: gocyclo expectedNodeList := getExpectedNodeList() epoch = epochtimeTests.MustAdvanceEpoch(t, timeSource, 1) - registeredNodes, nerr := backend.GetNodes(context.Background(), consensusAPI.HeightLatest) + registeredNodes, nerr := backend.GetNodes(ctx, consensusAPI.HeightLatest) require.NoError(nerr, "GetNodes") require.EqualValues(expectedNodeList, registeredNodes, "node list") }) @@ -285,9 +298,6 @@ func testRegistryEntityNodes( // nolint: gocyclo entity := entities[0] node := nodes[0][0] - ctx, cancel := context.WithTimeout(context.Background(), recvTimeout) - defer cancel() - // Get node status. var nodeStatus *api.NodeStatus nodeStatus, err = backend.GetNodeStatus(ctx, &api.IDQuery{ID: node.Node.ID, Height: consensusAPI.HeightLatest}) @@ -299,7 +309,7 @@ func testRegistryEntityNodes( // nolint: gocyclo tx := api.NewUnfreezeNodeTx(0, nil, &api.UnfreezeNode{ NodeID: node.Node.ID, }) - err = consensusAPI.SignAndSubmitTx(context.Background(), consensus, entity.Signer, tx) + err = consensusAPI.SignAndSubmitTx(ctx, consensus, entity.Signer, tx) require.NoError(err, "UnfreezeNode") // Try to unfreeze an invalid node (should fail). @@ -308,7 +318,7 @@ func testRegistryEntityNodes( // nolint: gocyclo err = unfreeze.NodeID.UnmarshalHex("ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff") require.NoError(err, "UnmarshalHex") tx = api.NewUnfreezeNodeTx(0, nil, &unfreeze) - err = consensusAPI.SignAndSubmitTx(context.Background(), consensus, entity.Signer, tx) + err = consensusAPI.SignAndSubmitTx(ctx, consensus, entity.Signer, tx) require.Error(err, "UnfreezeNode (with invalid node)") require.Equal(err, api.ErrNoSuchNode) @@ -317,7 +327,7 @@ func testRegistryEntityNodes( // nolint: gocyclo tx = api.NewUnfreezeNodeTx(0, nil, &api.UnfreezeNode{ NodeID: node.Node.ID, }) - err = consensusAPI.SignAndSubmitTx(context.Background(), consensus, node.Signer, tx) + err = consensusAPI.SignAndSubmitTx(ctx, consensus, node.Signer, tx) require.Error(err, "UnfreezeNode (with invalid signer)") require.Equal(err, api.ErrBadEntityForNode) }) @@ -339,7 +349,7 @@ func testRegistryEntityNodes( // nolint: gocyclo deregisteredNodes[ev.Node.ID] = ev.Node // Make sure that GetEvents also returns the deregistration event. - evts, grr := backend.GetEvents(context.Background(), consensusAPI.HeightLatest) + evts, grr := backend.GetEvents(ctx, consensusAPI.HeightLatest) require.NoError(grr, "GetEvents") var gotIt bool for _, evt := range evts { @@ -371,7 +381,7 @@ func testRegistryEntityNodes( // nolint: gocyclo // Ensure the node list doesn't have the expired nodes. expectedNodeList := getExpectedNodeList() - registeredNodes, nerr := backend.GetNodes(context.Background(), consensusAPI.HeightLatest) + registeredNodes, nerr := backend.GetNodes(ctx, consensusAPI.HeightLatest) require.NoError(nerr, "GetNodes") require.EqualValues(expectedNodeList, registeredNodes, "node list") @@ -407,7 +417,7 @@ func testRegistryEntityNodes( // nolint: gocyclo require.False(ev.IsRegistration, "event is deregistration") // Make sure that GetEvents also returns the deregistration event. - evts, err := backend.GetEvents(context.Background(), consensusAPI.HeightLatest) + evts, err := backend.GetEvents(ctx, consensusAPI.HeightLatest) require.NoError(err, "GetEvents") var gotIt bool for _, evt := range evts { @@ -444,7 +454,7 @@ func testRegistryEntityNodes( // nolint: gocyclo require.False(ev.IsRegistration, "event is deregistration") // Make sure that GetEvents also returns the deregistration event. - evts, err := backend.GetEvents(context.Background(), consensusAPI.HeightLatest) + evts, err := backend.GetEvents(ctx, consensusAPI.HeightLatest) require.NoError(err, "GetEvents") var gotIt bool for _, evt := range evts { @@ -463,7 +473,7 @@ func testRegistryEntityNodes( // nolint: gocyclo // There should be no more entities. for _, v := range entities { - _, err := backend.GetEntity(context.Background(), &api.IDQuery{ID: v.Entity.ID, Height: consensusAPI.HeightLatest}) + _, err := backend.GetEntity(ctx, &api.IDQuery{ID: v.Entity.ID, Height: consensusAPI.HeightLatest}) require.Equal(api.ErrNoSuchEntity, err, "GetEntity") } }) diff --git a/go/worker/common/p2p/peermgmt.go b/go/worker/common/p2p/peermgmt.go index 262c54137ea..f7f5d978f26 100644 --- a/go/worker/common/p2p/peermgmt.go +++ b/go/worker/common/p2p/peermgmt.go @@ -144,7 +144,7 @@ func (mgr *PeerManager) watchRegistryNodes(consensus consensus.Backend) { nodeCh, nSub, err := consensus.Registry().WatchNodes(mgr.ctx) if err != nil { - mgr.logger.Error("failed to watch registery for node changes", + mgr.logger.Error("failed to watch registry for node changes", "err", err, ) return