Skip to content

Commit

Permalink
go/registry/api: Add GetNodeByConesnusID method
Browse files Browse the repository at this point in the history
  • Loading branch information
ptrus committed Jun 30, 2020
1 parent b936814 commit 96ea2bd
Show file tree
Hide file tree
Showing 8 changed files with 99 additions and 22 deletions.
7 changes: 7 additions & 0 deletions .changelog/2961.feature.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
go/registry/api: Add `GetNodeByConesnusID` method

`GetNodeByConensusID` can be used to query nodes by their Consensus ID.

To query the node by tendermint consensus address use the
`oasis-core/go/consensus/tendermint/crypto.PublicKeyFromTendermint`
helper to first convert the tendermint address into a public key.
6 changes: 6 additions & 0 deletions go/consensus/tendermint/apps/registry/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/oasisprotocol/oasis-core/go/common/node"
abciAPI "github.com/oasisprotocol/oasis-core/go/consensus/tendermint/api"
registryState "github.com/oasisprotocol/oasis-core/go/consensus/tendermint/apps/registry/state"
tmcrypto "github.com/oasisprotocol/oasis-core/go/consensus/tendermint/crypto"
registry "github.com/oasisprotocol/oasis-core/go/registry/api"
)

Expand All @@ -18,6 +19,7 @@ type Query interface {
Entity(context.Context, signature.PublicKey) (*entity.Entity, error)
Entities(context.Context) ([]*entity.Entity, error)
Node(context.Context, signature.PublicKey) (*node.Node, error)
NodeByConsensusID(context.Context, signature.PublicKey) (*node.Node, error)
NodeStatus(context.Context, signature.PublicKey) (*registry.NodeStatus, error)
Nodes(context.Context) ([]*node.Node, error)
Runtime(context.Context, common.Namespace) (*registry.Runtime, error)
Expand Down Expand Up @@ -71,6 +73,10 @@ func (rq *registryQuerier) Node(ctx context.Context, id signature.PublicKey) (*n
return node, nil
}

func (rq *registryQuerier) NodeByConsensusID(ctx context.Context, id signature.PublicKey) (*node.Node, error) {
return rq.state.NodeByConsensusAddress(ctx, []byte(tmcrypto.PublicKeyToTendermint(&id).Address()))
}

func (rq *registryQuerier) NodeStatus(ctx context.Context, id signature.PublicKey) (*registry.NodeStatus, error) {
return rq.state.NodeStatus(ctx, id)
}
Expand Down
9 changes: 9 additions & 0 deletions go/consensus/tendermint/registry/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,15 @@ func (tb *tendermintBackend) GetNodes(ctx context.Context, height int64) ([]*nod
return q.Nodes(ctx)
}

func (tb *tendermintBackend) GetNodeByConsensusID(ctx context.Context, query *api.IDQuery) (*node.Node, error) {
q, err := tb.querier.QueryAt(ctx, query.Height)
if err != nil {
return nil, err
}

return q.NodeByConsensusID(ctx, query.ID)
}

func (tb *tendermintBackend) WatchNodes(ctx context.Context) (<-chan *api.NodeEvent, pubsub.ClosableSubscription, error) {
typedCh := make(chan *api.NodeEvent)
sub := tb.nodeNotifier.Subscribe()
Expand Down
10 changes: 10 additions & 0 deletions go/oasis-node/cmd/debug/txsource/workload/queries.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,16 @@ func (q *queries) doRegistryQueries(ctx context.Context, rng *rand.Rand, height
if err != nil {
return fmt.Errorf("GetNodeStatus error at height %d: %w", height, err)
}
node, err = q.registry.GetNodeByConsensusID(
ctx,
&registry.IDQuery{ID: nod.Consensus.ID, Height: height},
)
if err != nil {
return fmt.Errorf("GetNodeByConsensusID error at height %d: %w", height, err)
}
if !nod.ID.Equal(node.ID) {
return fmt.Errorf("GetNodeByConsensusID mismatch, expected: %s, got: %s", nod, node)
}
}

// Runtimes.
Expand Down
6 changes: 5 additions & 1 deletion go/registry/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,10 @@ type Backend interface {
// GetNodes gets a list of all registered nodes.
GetNodes(context.Context, int64) ([]*node.Node, error)

// GetNodeByConsensusID returns the Node by Consensus address at the
// specified block height.
GetNodeByConsensusID(context.Context, *IDQuery) (*node.Node, error)

// WatchNodes returns a channel that produces a stream of
// NodeEvent on node registration changes.
WatchNodes(context.Context) (<-chan *NodeEvent, pubsub.ClosableSubscription, error)
Expand All @@ -204,7 +208,7 @@ type Backend interface {
// Upon subscription, the node list for the current epoch will be sent
// immediately if available.
//
// Each node list will be sorted by node ID in lexographically ascending
// Each node list will be sorted by node ID in lexicographically ascending
// order.
WatchNodeList(context.Context) (<-chan *NodeList, pubsub.ClosableSubscription, error)

Expand Down
37 changes: 37 additions & 0 deletions go/registry/api/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ var (
methodGetEntities = serviceName.NewMethod("GetEntities", int64(0))
// methodGetNode is the GetNode method.
methodGetNode = serviceName.NewMethod("GetNode", IDQuery{})
// methodGetNodeByConsensusID is the GetNodeByConsensusID method.
methodGetNodeByConsensusID = serviceName.NewMethod("GetNodeByConsensusID", IDQuery{})
// methodGetNodeStatus is the GetNodeStatus method.
methodGetNodeStatus = serviceName.NewMethod("GetNodeStatus", IDQuery{})
// methodGetNodes is the GetNodes method.
Expand Down Expand Up @@ -60,6 +62,10 @@ var (
MethodName: methodGetNode.ShortName(),
Handler: handlerGetNode,
},
{
MethodName: methodGetNodeByConsensusID.ShortName(),
Handler: handlerGetNodeByConsensusID,
},
{
MethodName: methodGetNodeStatus.ShortName(),
Handler: handlerGetNodeStatus,
Expand Down Expand Up @@ -179,6 +185,29 @@ func handlerGetNode( // nolint: golint
return interceptor(ctx, &query, info, handler)
}

func handlerGetNodeByConsensusID( // nolint: golint
srv interface{},
ctx context.Context,
dec func(interface{}) error,
interceptor grpc.UnaryServerInterceptor,
) (interface{}, error) {
var query IDQuery
if err := dec(&query); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(Backend).GetNodeByConsensusID(ctx, &query)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: methodGetNodeByConsensusID.FullName(),
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(Backend).GetNodeByConsensusID(ctx, req.(*IDQuery))
}
return interceptor(ctx, &query, info, handler)
}

func handlerGetNodeStatus( // nolint: golint
srv interface{},
ctx context.Context,
Expand Down Expand Up @@ -497,6 +526,14 @@ func (c *registryClient) GetNode(ctx context.Context, query *IDQuery) (*node.Nod
return &rsp, nil
}

func (c *registryClient) GetNodeByConsensusID(ctx context.Context, query *IDQuery) (*node.Node, error) {
var rsp node.Node
if err := c.conn.Invoke(ctx, methodGetNodeByConsensusID.FullName(), query, &rsp); err != nil {
return nil, err
}
return &rsp, nil
}

func (c *registryClient) GetNodeStatus(ctx context.Context, query *IDQuery) (*NodeStatus, error) {
var rsp NodeStatus
if err := c.conn.Invoke(ctx, methodGetNodeStatus.FullName(), query, &rsp); err != nil {
Expand Down
44 changes: 24 additions & 20 deletions go/registry/tests/tester.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,18 +60,20 @@ func testRegistryEntityNodes( // nolint: gocyclo
runtimeID common.Namespace,
runtimeEWID common.Namespace,
) {
ctx := context.Background()

// Generate the entities used for the test cases.
entities, err := NewTestEntities(entityNodeSeed, 3)
require.NoError(t, err, "NewTestEntities")

timeSource := consensus.EpochTime().(epochtime.SetableBackend)
epoch, err := timeSource.GetEpoch(context.Background(), consensusAPI.HeightLatest)
epoch, err := timeSource.GetEpoch(ctx, consensusAPI.HeightLatest)
require.NoError(t, err, "GetEpoch")

// All of these tests are combined because the Entity and Node structures
// are linked togehter.

entityCh, entitySub, err := backend.WatchEntities(context.Background())
entityCh, entitySub, err := backend.WatchEntities(ctx)
require.NoError(t, err, "WatchEntities")
defer entitySub.Close()

Expand All @@ -94,7 +96,7 @@ func testRegistryEntityNodes( // nolint: gocyclo
require.True(ev.IsRegistration, "event is registration")

// Make sure that GetEvents also returns the registration event.
evts, grr := backend.GetEvents(context.Background(), consensusAPI.HeightLatest)
evts, grr := backend.GetEvents(ctx, consensusAPI.HeightLatest)
require.NoError(grr, "GetEvents")
var gotIt bool
for _, evt := range evts {
Expand All @@ -113,13 +115,13 @@ func testRegistryEntityNodes( // nolint: gocyclo

for _, v := range entities {
var ent *entity.Entity
ent, err = backend.GetEntity(context.Background(), &api.IDQuery{ID: v.Entity.ID, Height: consensusAPI.HeightLatest})
ent, err = backend.GetEntity(ctx, &api.IDQuery{ID: v.Entity.ID, Height: consensusAPI.HeightLatest})
require.NoError(err, "GetEntity")
require.EqualValues(v.Entity, ent, "retrieved entity")
}

var registeredEntities []*entity.Entity
registeredEntities, err = backend.GetEntities(context.Background(), consensusAPI.HeightLatest)
registeredEntities, err = backend.GetEntities(ctx, consensusAPI.HeightLatest)
require.NoError(err, "GetEntities")
// NOTE: The test entity is alway present as it controls a runtime and cannot be removed.
testEntity, _, _ := entity.TestEntity()
Expand Down Expand Up @@ -166,7 +168,7 @@ func testRegistryEntityNodes( // nolint: gocyclo
nonWhitelistedNodes, err := entities[0].NewTestNodes(1, 1, []byte("nonWhitelistedNodes"), nodeRuntimesEW, epoch+2)
require.NoError(t, err, "NewTestNodes non-whitelisted")

nodeCh, nodeSub, err := backend.WatchNodes(context.Background())
nodeCh, nodeSub, err := backend.WatchNodes(ctx)
require.NoError(t, err, "WatchNodes")
defer nodeSub.Close()

Expand All @@ -190,7 +192,7 @@ func testRegistryEntityNodes( // nolint: gocyclo
require.True(ev.IsRegistration, "event is registration")

// Make sure that GetEvents also returns the registration event.
evts, grr := backend.GetEvents(context.Background(), consensusAPI.HeightLatest)
evts, grr := backend.GetEvents(ctx, consensusAPI.HeightLatest)
require.NoError(grr, "GetEvents")
var gotIt bool
for _, evt := range evts {
Expand All @@ -207,10 +209,15 @@ func testRegistryEntityNodes( // nolint: gocyclo
}

var nod *node.Node
nod, err = backend.GetNode(context.Background(), &api.IDQuery{ID: tn.Node.ID, Height: consensusAPI.HeightLatest})
nod, err = backend.GetNode(ctx, &api.IDQuery{ID: tn.Node.ID, Height: consensusAPI.HeightLatest})
require.NoError(err, "GetNode")
require.EqualValues(tn.Node, nod, "retrieved node")

var nodeByConsensus *node.Node
nodeByConsensus, err = backend.GetNodeByConsensusID(ctx, &api.IDQuery{ID: tn.Node.Consensus.ID, Height: consensusAPI.HeightLatest})
require.NoError(err, "GetNodeByConsensusID")
require.EqualValues(tn.Node, nodeByConsensus, "retrieved node by Consensus ID")

for _, v := range tn.invalidAfter {
err = tn.Register(consensus, v.signed)
require.Error(err, v.descr)
Expand Down Expand Up @@ -274,7 +281,7 @@ func testRegistryEntityNodes( // nolint: gocyclo
expectedNodeList := getExpectedNodeList()
epoch = epochtimeTests.MustAdvanceEpoch(t, timeSource, 1)

registeredNodes, nerr := backend.GetNodes(context.Background(), consensusAPI.HeightLatest)
registeredNodes, nerr := backend.GetNodes(ctx, consensusAPI.HeightLatest)
require.NoError(nerr, "GetNodes")
require.EqualValues(expectedNodeList, registeredNodes, "node list")
})
Expand All @@ -285,9 +292,6 @@ func testRegistryEntityNodes( // nolint: gocyclo
entity := entities[0]
node := nodes[0][0]

ctx, cancel := context.WithTimeout(context.Background(), recvTimeout)
defer cancel()

// Get node status.
var nodeStatus *api.NodeStatus
nodeStatus, err = backend.GetNodeStatus(ctx, &api.IDQuery{ID: node.Node.ID, Height: consensusAPI.HeightLatest})
Expand All @@ -299,7 +303,7 @@ func testRegistryEntityNodes( // nolint: gocyclo
tx := api.NewUnfreezeNodeTx(0, nil, &api.UnfreezeNode{
NodeID: node.Node.ID,
})
err = consensusAPI.SignAndSubmitTx(context.Background(), consensus, entity.Signer, tx)
err = consensusAPI.SignAndSubmitTx(ctx, consensus, entity.Signer, tx)
require.NoError(err, "UnfreezeNode")

// Try to unfreeze an invalid node (should fail).
Expand All @@ -308,7 +312,7 @@ func testRegistryEntityNodes( // nolint: gocyclo
err = unfreeze.NodeID.UnmarshalHex("ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff")
require.NoError(err, "UnmarshalHex")
tx = api.NewUnfreezeNodeTx(0, nil, &unfreeze)
err = consensusAPI.SignAndSubmitTx(context.Background(), consensus, entity.Signer, tx)
err = consensusAPI.SignAndSubmitTx(ctx, consensus, entity.Signer, tx)
require.Error(err, "UnfreezeNode (with invalid node)")
require.Equal(err, api.ErrNoSuchNode)

Expand All @@ -317,7 +321,7 @@ func testRegistryEntityNodes( // nolint: gocyclo
tx = api.NewUnfreezeNodeTx(0, nil, &api.UnfreezeNode{
NodeID: node.Node.ID,
})
err = consensusAPI.SignAndSubmitTx(context.Background(), consensus, node.Signer, tx)
err = consensusAPI.SignAndSubmitTx(ctx, consensus, node.Signer, tx)
require.Error(err, "UnfreezeNode (with invalid signer)")
require.Equal(err, api.ErrBadEntityForNode)
})
Expand All @@ -339,7 +343,7 @@ func testRegistryEntityNodes( // nolint: gocyclo
deregisteredNodes[ev.Node.ID] = ev.Node

// Make sure that GetEvents also returns the deregistration event.
evts, grr := backend.GetEvents(context.Background(), consensusAPI.HeightLatest)
evts, grr := backend.GetEvents(ctx, consensusAPI.HeightLatest)
require.NoError(grr, "GetEvents")
var gotIt bool
for _, evt := range evts {
Expand Down Expand Up @@ -371,7 +375,7 @@ func testRegistryEntityNodes( // nolint: gocyclo

// Ensure the node list doesn't have the expired nodes.
expectedNodeList := getExpectedNodeList()
registeredNodes, nerr := backend.GetNodes(context.Background(), consensusAPI.HeightLatest)
registeredNodes, nerr := backend.GetNodes(ctx, consensusAPI.HeightLatest)
require.NoError(nerr, "GetNodes")
require.EqualValues(expectedNodeList, registeredNodes, "node list")

Expand Down Expand Up @@ -407,7 +411,7 @@ func testRegistryEntityNodes( // nolint: gocyclo
require.False(ev.IsRegistration, "event is deregistration")

// Make sure that GetEvents also returns the deregistration event.
evts, err := backend.GetEvents(context.Background(), consensusAPI.HeightLatest)
evts, err := backend.GetEvents(ctx, consensusAPI.HeightLatest)
require.NoError(err, "GetEvents")
var gotIt bool
for _, evt := range evts {
Expand Down Expand Up @@ -444,7 +448,7 @@ func testRegistryEntityNodes( // nolint: gocyclo
require.False(ev.IsRegistration, "event is deregistration")

// Make sure that GetEvents also returns the deregistration event.
evts, err := backend.GetEvents(context.Background(), consensusAPI.HeightLatest)
evts, err := backend.GetEvents(ctx, consensusAPI.HeightLatest)
require.NoError(err, "GetEvents")
var gotIt bool
for _, evt := range evts {
Expand All @@ -463,7 +467,7 @@ func testRegistryEntityNodes( // nolint: gocyclo

// There should be no more entities.
for _, v := range entities {
_, err := backend.GetEntity(context.Background(), &api.IDQuery{ID: v.Entity.ID, Height: consensusAPI.HeightLatest})
_, err := backend.GetEntity(ctx, &api.IDQuery{ID: v.Entity.ID, Height: consensusAPI.HeightLatest})
require.Equal(api.ErrNoSuchEntity, err, "GetEntity")
}
})
Expand Down
2 changes: 1 addition & 1 deletion go/worker/common/p2p/peermgmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ func (mgr *PeerManager) watchRegistryNodes(consensus consensus.Backend) {

nodeCh, nSub, err := consensus.Registry().WatchNodes(mgr.ctx)
if err != nil {
mgr.logger.Error("failed to watch registery for node changes",
mgr.logger.Error("failed to watch registry for node changes",
"err", err,
)
return
Expand Down

0 comments on commit 96ea2bd

Please sign in to comment.